Add histogram chunk encoding with Start Timestamp support

Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
This commit is contained in:
Carrie Edwards 2026-04-06 12:47:53 -07:00
parent 1ec24a3295
commit 27b65ab2ad
12 changed files with 1982 additions and 106 deletions

View File

@ -649,17 +649,18 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
return errors.New("ChunkOrIterable should not return an iterable when reading a block")
}
switch chk.Encoding() {
case chunkenc.EncXOR:
case chunkenc.EncXOR, chunkenc.EncXOR2:
floatChunkSamplesCount = append(floatChunkSamplesCount, chk.NumSamples())
floatChunkSize = append(floatChunkSize, len(chk.Bytes()))
case chunkenc.EncFloatHistogram:
case chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST:
histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples())
histogramChunkSize = append(histogramChunkSize, len(chk.Bytes()))
fhchk, ok := chk.(*chunkenc.FloatHistogramChunk)
if !ok {
return errors.New("chunk is not FloatHistogramChunk")
if _, ok := chk.(*chunkenc.FloatHistogramChunk); !ok {
if _, ok := chk.(*chunkenc.FloatHistogramSTChunk); !ok {
return errors.New("chunk is not FloatHistogramChunk or FloatHistogramSTChunk")
}
}
it := fhchk.Iterator(nil)
it := chk.Iterator(nil)
bucketCount := 0
for it.Next() == chunkenc.ValFloatHistogram {
_, f := it.AtFloatHistogram(nil)
@ -667,14 +668,15 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
bucketCount += len(f.NegativeBuckets)
}
histogramChunkBucketsCount = append(histogramChunkBucketsCount, bucketCount)
case chunkenc.EncHistogram:
case chunkenc.EncHistogram, chunkenc.EncHistogramST:
histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples())
histogramChunkSize = append(histogramChunkSize, len(chk.Bytes()))
hchk, ok := chk.(*chunkenc.HistogramChunk)
if !ok {
return errors.New("chunk is not HistogramChunk")
if _, ok := chk.(*chunkenc.HistogramChunk); !ok {
if _, ok := chk.(*chunkenc.HistogramSTChunk); !ok {
return errors.New("chunk is not HistogramChunk or HistogramSTChunk")
}
}
it := hchk.Iterator(nil)
it := chk.Iterator(nil)
bucketCount := 0
for it.Next() == chunkenc.ValHistogram {
_, f := it.AtHistogram(nil)

View File

@ -31,6 +31,8 @@ const (
EncHistogram
EncFloatHistogram
EncXOR2
EncHistogramST
EncFloatHistogramST
)
func (e Encoding) String() string {
@ -45,13 +47,17 @@ func (e Encoding) String() string {
return "floathistogram"
case EncXOR2:
return "XOR2"
case EncHistogramST:
return "histogramST"
case EncFloatHistogramST:
return "floathistogramST"
}
return "<unknown>"
}
// IsValidEncoding returns true for supported encodings.
func IsValidEncoding(e Encoding) bool {
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOR2
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOR2 || e == EncHistogramST || e == EncFloatHistogramST
}
const (
@ -199,8 +205,14 @@ func (v ValueType) ChunkEncoding(useXOR2 bool) Encoding {
}
return EncXOR
case ValHistogram:
if useXOR2 {
return EncHistogramST
}
return EncHistogram
case ValFloatHistogram:
if useXOR2 {
return EncFloatHistogramST
}
return EncFloatHistogram
default:
return EncNone
@ -296,10 +308,12 @@ type Pool interface {
// pool is a memory pool of chunk objects.
type pool struct {
xor sync.Pool
histogram sync.Pool
floatHistogram sync.Pool
xo2 sync.Pool
xor sync.Pool
histogram sync.Pool
floatHistogram sync.Pool
xo2 sync.Pool
histogramST sync.Pool
floatHistogramST sync.Pool
}
// NewPool returns a new pool.
@ -325,6 +339,16 @@ func NewPool() Pool {
return &XOR2Chunk{b: bstream{}}
},
},
histogramST: sync.Pool{
New: func() any {
return &HistogramSTChunk{b: bstream{}}
},
},
floatHistogramST: sync.Pool{
New: func() any {
return &FloatHistogramSTChunk{b: bstream{}}
},
},
}
}
@ -339,6 +363,10 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
c = p.floatHistogram.Get().(*FloatHistogramChunk)
case EncXOR2:
c = p.xo2.Get().(*XOR2Chunk)
case EncHistogramST:
c = p.histogramST.Get().(*HistogramSTChunk)
case EncFloatHistogramST:
c = p.floatHistogramST.Get().(*FloatHistogramSTChunk)
default:
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}
@ -363,6 +391,12 @@ func (p *pool) Put(c Chunk) error {
case EncXOR2:
_, ok = c.(*XOR2Chunk)
sp = &p.xo2
case EncHistogramST:
_, ok = c.(*HistogramSTChunk)
sp = &p.histogramST
case EncFloatHistogramST:
_, ok = c.(*FloatHistogramSTChunk)
sp = &p.floatHistogramST
default:
return fmt.Errorf("invalid chunk encoding %q", c.Encoding())
}
@ -391,6 +425,10 @@ func FromData(e Encoding, d []byte) (Chunk, error) {
return &FloatHistogramChunk{b: bstream{count: 0, stream: d}}, nil
case EncXOR2:
return &XOR2Chunk{b: bstream{count: 0, stream: d}}, nil
case EncHistogramST:
return &HistogramSTChunk{b: bstream{count: 0, stream: d}}, nil
case EncFloatHistogramST:
return &FloatHistogramSTChunk{b: bstream{count: 0, stream: d}}, nil
}
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}
@ -406,6 +444,10 @@ func NewEmptyChunk(e Encoding) (Chunk, error) {
return NewFloatHistogramChunk(), nil
case EncXOR2:
return NewXOR2Chunk(), nil
case EncHistogramST:
return NewHistogramSTChunk(), nil
case EncFloatHistogramST:
return NewFloatHistogramSTChunk(), nil
}
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}

View File

@ -146,6 +146,14 @@ func TestPool(t *testing.T) {
name: "xor opt st",
encoding: EncXOR2,
},
{
name: "histogram st",
encoding: EncHistogramST,
},
{
name: "float histogram st",
encoding: EncFloatHistogramST,
},
{
name: "invalid encoding",
encoding: EncNone,
@ -169,6 +177,10 @@ func TestPool(t *testing.T) {
b = &c.(*FloatHistogramChunk).b
case EncXOR2:
b = &c.(*XOR2Chunk).b
case EncHistogramST:
b = &c.(*HistogramSTChunk).b
case EncFloatHistogramST:
b = &c.(*FloatHistogramSTChunk).b
default:
b = &c.(*XORChunk).b
}

View File

@ -0,0 +1,513 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"encoding/binary"
"errors"
"fmt"
"math"
"github.com/prometheus/prometheus/model/histogram"
)
// FloatHistogramSTChunk is a chunk for float histogram samples with start timestamp (ST) support.
// It extends the FloatHistogramChunk format with a 1-byte ST header after the flags byte.
//
// Header layout (4 bytes):
//
// bytes 0-1: sample count (big-endian uint16)
// byte 2: flags (bits 7-6 = counter reset header)
// byte 3: ST header (bit 7 = firstSTKnown, bits 6-0 = firstSTChangeOn)
type FloatHistogramSTChunk struct {
b bstream
}
// NewFloatHistogramSTChunk returns a new empty FloatHistogramSTChunk.
func NewFloatHistogramSTChunk() *FloatHistogramSTChunk {
b := make([]byte, histogramSTHeaderSize, chunkAllocationSize)
return &FloatHistogramSTChunk{b: bstream{stream: b, count: 0}}
}
// Reset resets the chunk given stream.
func (c *FloatHistogramSTChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (*FloatHistogramSTChunk) Encoding() Encoding { return EncFloatHistogramST }
// Bytes returns the underlying byte slice of the chunk.
func (c *FloatHistogramSTChunk) Bytes() []byte {
return c.b.bytes()
}
// NumSamples returns the number of samples in the chunk.
func (c *FloatHistogramSTChunk) NumSamples() int {
return int(binary.BigEndian.Uint16(c.b.bytes()))
}
// GetCounterResetHeader returns the counter reset header from the flags byte.
func (c *FloatHistogramSTChunk) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(c.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
}
// Compact implements the Chunk interface.
func (c *FloatHistogramSTChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
buf := make([]byte, l)
copy(buf, c.b.stream)
c.b.stream = buf
}
}
// Appender implements the Chunk interface.
func (c *FloatHistogramSTChunk) Appender() (Appender, error) {
if len(c.b.stream) == histogramSTHeaderSize {
return &FloatHistogramSTAppender{
FloatHistogramAppender: FloatHistogramAppender{
b: &c.b,
t: math.MinInt64,
sum: xorValue{leading: 0xff},
cnt: xorValue{leading: 0xff},
zCnt: xorValue{leading: 0xff},
},
}, nil
}
it := c.iterator(nil)
for it.Next() == ValFloatHistogram {
}
if err := it.Err(); err != nil {
return nil, err
}
// Set the bit position for continuing writes.
c.b.count = it.br.valid
pBuckets := make([]xorValue, len(it.pBuckets))
for i := 0; i < len(it.pBuckets); i++ {
pBuckets[i] = xorValue{
value: it.pBuckets[i],
leading: it.pBucketsLeading[i],
trailing: it.pBucketsTrailing[i],
}
}
nBuckets := make([]xorValue, len(it.nBuckets))
for i := 0; i < len(it.nBuckets); i++ {
nBuckets[i] = xorValue{
value: it.nBuckets[i],
leading: it.nBucketsLeading[i],
trailing: it.nBucketsTrailing[i],
}
}
a := &FloatHistogramSTAppender{
FloatHistogramAppender: FloatHistogramAppender{
b: &c.b,
schema: it.schema,
zThreshold: it.zThreshold,
pSpans: it.pSpans,
nSpans: it.nSpans,
customValues: it.customValues,
t: it.t,
tDelta: it.tDelta,
cnt: it.cnt,
zCnt: it.zCnt,
pBuckets: pBuckets,
nBuckets: nBuckets,
sum: it.sum,
},
st: it.st,
stDiff: it.stDiff,
firstSTKnown: it.firstSTKnown,
firstSTChangeOn: uint16(it.firstSTChangeOn),
}
return a, nil
}
func newFloatHistogramSTIterator(b []byte) *floatHistogramSTIterator {
it := &floatHistogramSTIterator{
floatHistogramIterator: floatHistogramIterator{
br: newBReader(b[histogramSTHeaderSize:]),
numTotal: binary.BigEndian.Uint16(b),
t: math.MinInt64,
},
}
it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask)
it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:])
return it
}
func (c *FloatHistogramSTChunk) iterator(it Iterator) *floatHistogramSTIterator {
if fhIter, ok := it.(*floatHistogramSTIterator); ok {
fhIter.Reset(c.b.bytes())
return fhIter
}
return newFloatHistogramSTIterator(c.b.bytes())
}
// Iterator implements the Chunk interface.
func (c *FloatHistogramSTChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}
// FloatHistogramSTAppender is an Appender for float histogram samples with start timestamp support.
// It embeds FloatHistogramAppender and adds ST encoding after each sample.
type FloatHistogramSTAppender struct {
FloatHistogramAppender
st int64
stDiff int64
firstSTChangeOn uint16
firstSTKnown bool
}
// encodeST encodes the start timestamp for the current sample.
// It must be called after appendFloatHistogram() which increments the sample count.
// prevT is the timestamp of the previous sample (before appendFloatHistogram updated a.t).
// For sample 0, prevT is unused.
func (a *FloatHistogramSTAppender) encodeST(prevT, st int64) {
num := binary.BigEndian.Uint16(a.b.bytes())
switch {
case num == 1: // First sample (count was just incremented from 0).
if st != 0 {
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, a.t-st)] {
a.b.writeByte(b)
}
a.firstSTKnown = true
writeHeaderFirstSTKnown(a.b.bytes()[histogramSTHeaderSize-1:])
}
case num == 2: // Second sample.
if st != a.st {
stDiff := prevT - st
a.firstSTChangeOn = 1
writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], 1)
putVarbitInt(a.b, stDiff)
a.stDiff = stDiff
}
default: // Sample N >= 2.
// Fast path: no ST data to write.
if st == 0 && num-1 != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown {
break
}
if a.firstSTChangeOn == 0 {
if st != a.st || num-1 == maxFirstSTChangeOn {
stDiff := prevT - st
a.firstSTChangeOn = num - 1
writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], num-1)
putVarbitInt(a.b, stDiff)
a.stDiff = stDiff
}
} else {
stDiff := prevT - st
putVarbitInt(a.b, stDiff-a.stDiff)
a.stDiff = stDiff
}
}
a.st = st
}
// appendFloatHistogramST encodes a float histogram sample with start timestamp.
func (a *FloatHistogramSTAppender) appendFloatHistogramST(st, t int64, fh *histogram.FloatHistogram) {
prevT := a.t
a.appendFloatHistogram(t, fh)
a.encodeST(prevT, st)
}
// Append implements Appender. This implementation panics because normal float
// samples must never be appended to a float histogram chunk.
func (*FloatHistogramSTAppender) Append(int64, int64, float64) {
panic("appended a float sample to a float histogram chunk")
}
// AppendHistogram implements Appender. This implementation panics because integer
// histogram samples must never be appended to a float histogram chunk.
func (*FloatHistogramSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float histogram chunk")
}
// AppendFloatHistogram implements Appender for FloatHistogramSTAppender.
func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev *FloatHistogramAppender, st, t int64, fh *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) {
if a.NumSamples() == 0 {
a.appendFloatHistogramST(st, t, fh)
if fh.CounterResetHint == histogram.GaugeType {
a.setCounterResetHeader(GaugeType)
return nil, false, a, nil
}
switch {
case fh.CounterResetHint == histogram.CounterReset:
a.setCounterResetHeader(CounterReset)
case prev != nil:
_, _, _, _, _, counterReset := prev.appendable(fh)
if counterReset {
a.setCounterResetHeader(CounterReset)
} else {
a.setCounterResetHeader(NotCounterReset)
}
}
return nil, false, a, nil
}
// Adding counter-like histogram.
if fh.CounterResetHint != histogram.GaugeType {
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(fh)
if !okToAppend || counterReset {
if appendOnly {
if counterReset {
return nil, false, a, errors.New("float histogram counter reset")
}
return nil, false, a, errors.New("float histogram schema change")
}
newChunk := NewFloatHistogramSTChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err)
}
happ := app.(*FloatHistogramSTAppender)
if counterReset {
happ.setCounterResetHeader(CounterReset)
}
happ.appendFloatHistogramST(st, t, fh)
return newChunk, false, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
fh.PositiveSpans = make([]histogram.Span, len(a.pSpans))
copy(fh.PositiveSpans, a.pSpans)
fh.NegativeSpans = make([]histogram.Span, len(a.nSpans))
copy(fh.NegativeSpans, a.nSpans)
} else {
fh.PositiveSpans = adjustForInserts(fh.PositiveSpans, pBackwardInserts)
fh.NegativeSpans = adjustForInserts(fh.NegativeSpans, nBackwardInserts)
}
a.recodeHistogram(fh, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recodeST(
pForwardInserts, nForwardInserts,
fh.PositiveSpans, fh.NegativeSpans,
)
app.(*FloatHistogramSTAppender).appendFloatHistogramST(st, t, fh)
return chk, true, app, nil
}
a.appendFloatHistogramST(st, t, fh)
return nil, false, a, nil
}
// Adding gauge histogram.
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend := a.appendableGauge(fh)
if !okToAppend {
if appendOnly {
return nil, false, a, errors.New("float gauge histogram schema change")
}
newChunk := NewFloatHistogramSTChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err)
}
happ := app.(*FloatHistogramSTAppender)
happ.setCounterResetHeader(GaugeType)
happ.appendFloatHistogramST(st, t, fh)
return newChunk, false, app, nil
}
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("float gauge histogram layout change with %d positive and %d negative backwards inserts", len(pBackwardInserts), len(nBackwardInserts))
}
fh.PositiveSpans = pMergedSpans
fh.NegativeSpans = nMergedSpans
a.recodeHistogram(fh, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("float gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recodeST(
pForwardInserts, nForwardInserts,
fh.PositiveSpans, fh.NegativeSpans,
)
app.(*FloatHistogramSTAppender).appendFloatHistogramST(st, t, fh)
return chk, true, app, nil
}
a.appendFloatHistogramST(st, t, fh)
return nil, false, a, nil
}
// recodeST is like FloatHistogramAppender.recode but creates FloatHistogramSTChunk and preserves ST.
func (a *FloatHistogramSTAppender) recodeST(
positiveInserts, negativeInserts []Insert,
positiveSpans, negativeSpans []histogram.Span,
) (Chunk, Appender) {
byts := a.b.bytes()
it := newFloatHistogramSTIterator(byts)
hc := NewFloatHistogramSTChunk()
app, err := hc.Appender()
if err != nil {
panic(err)
}
happ := app.(*FloatHistogramSTAppender)
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
for it.Next() == ValFloatHistogram {
tOld, fhOld := it.AtFloatHistogram(nil)
stOld := it.AtST()
var positiveBuckets, negativeBuckets []float64
if numPositiveBuckets > 0 {
positiveBuckets = make([]float64, numPositiveBuckets)
}
if numNegativeBuckets > 0 {
negativeBuckets = make([]float64, numNegativeBuckets)
}
fhOld.PositiveSpans, fhOld.NegativeSpans = positiveSpans, negativeSpans
if len(positiveInserts) > 0 {
fhOld.PositiveBuckets = insert(fhOld.PositiveBuckets, positiveBuckets, positiveInserts, false)
}
if len(negativeInserts) > 0 {
fhOld.NegativeBuckets = insert(fhOld.NegativeBuckets, negativeBuckets, negativeInserts, false)
}
happ.appendFloatHistogramST(stOld, tOld, fhOld)
}
happ.setCounterResetHeader(CounterResetHeader(byts[histogramFlagPos] & CounterResetHeaderMask))
return hc, app
}
// floatHistogramSTIterator is an iterator for FloatHistogramSTChunk that decodes ST after each sample.
type floatHistogramSTIterator struct {
floatHistogramIterator
// ST fields.
st int64
stDiff int64
firstSTKnown bool
firstSTChangeOn uint8
}
// AtST returns the start timestamp for the current sample.
func (it *floatHistogramSTIterator) AtST() int64 {
return it.st
}
// Reset resets the iterator for reuse.
func (it *floatHistogramSTIterator) Reset(b []byte) {
it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:])
it.st = 0
it.stDiff = 0
// Reset the embedded floatHistogramIterator but with the correct header offset.
it.br = newBReader(b[histogramSTHeaderSize:])
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask)
it.t, it.tDelta = 0, 0
it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{}
if it.atFloatHistogramCalled {
it.atFloatHistogramCalled = false
it.pBuckets, it.nBuckets = nil, nil
it.pSpans, it.nSpans = nil, nil
it.customValues = nil
} else {
it.pBuckets, it.nBuckets = it.pBuckets[:0], it.nBuckets[:0]
}
it.pBucketsLeading, it.pBucketsTrailing = it.pBucketsLeading[:0], it.pBucketsTrailing[:0]
it.nBucketsLeading, it.nBucketsTrailing = it.nBucketsLeading[:0], it.nBucketsTrailing[:0]
it.err = nil
}
// Next advances the iterator by one sample.
// It calls the embedded floatHistogramIterator.Next() to decode the float histogram sample,
// then decodes the ST data that follows in the bitstream.
func (it *floatHistogramSTIterator) Next() ValueType {
prevT := it.t
vt := it.floatHistogramIterator.Next()
if vt == ValNone {
return ValNone
}
if err := it.decodeST(it.numRead, prevT); err != nil {
it.err = err
return ValNone
}
return vt
}
// Seek advances the iterator forward to the first sample with timestamp >= t.
func (it *floatHistogramSTIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone
}
for t > it.t || it.numRead == 0 {
if it.Next() == ValNone {
return ValNone
}
}
return ValFloatHistogram
}
// decodeST decodes the start timestamp for the current sample.
// numRead is the number of samples read so far (already incremented by floatHistogramIterator.Next()).
// prevT is the timestamp of the previous sample (before floatHistogramIterator.Next() updated it.t).
func (it *floatHistogramSTIterator) decodeST(numRead uint16, prevT int64) error {
switch {
case numRead == 1: // After sample 0.
if it.firstSTKnown {
stDiff, err := it.br.readVarint()
if err != nil {
return err
}
it.stDiff = stDiff
it.st = it.t - stDiff
}
case numRead == 2: // After sample 1.
if it.firstSTChangeOn == 1 {
sdod, err := readVarbitInt(&it.br)
if err != nil {
return err
}
it.stDiff = sdod
it.st = prevT - sdod
}
default: // After sample N >= 2.
if it.firstSTChangeOn > 0 && numRead-1 >= uint16(it.firstSTChangeOn) {
sdod, err := readVarbitInt(&it.br)
if err != nil {
return err
}
if numRead-1 == uint16(it.firstSTChangeOn) {
it.stDiff = sdod
} else {
it.stDiff += sdod
}
it.st = prevT - it.stDiff
}
}
return nil
}

View File

@ -0,0 +1,333 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
type floatHistogramSTSample struct {
st, t int64
fh *histogram.FloatHistogram
}
func BenchmarkFloatHistogramSTWrite(b *testing.B) {
const n = 120
fhs := tsdbutil.GenerateTestFloatHistograms(n)
b.ReportAllocs()
for b.Loop() {
c := NewFloatHistogramSTChunk()
app, _ := c.Appender()
for i, fh := range fhs {
_, _, app, _ = app.AppendFloatHistogram(nil, 500, int64(i)*15000, fh, false)
}
}
}
func BenchmarkFloatHistogramSTRead(b *testing.B) {
const n = 120
fhs := tsdbutil.GenerateTestFloatHistograms(n)
c := NewFloatHistogramSTChunk()
app, err := c.Appender()
require.NoError(b, err)
for i, fh := range fhs {
_, _, app, err = app.AppendFloatHistogram(nil, 500, int64(i)*15000, fh, false)
require.NoError(b, err)
}
b.ReportAllocs()
var it Iterator
for b.Loop() {
it = c.Iterator(it)
for it.Next() != ValNone {
}
}
}
// requireFloatHistogramSTSamples appends the given float histogram samples to a
// new FloatHistogramSTChunk, then verifies all samples round-trip correctly
// through the iterator.
func requireFloatHistogramSTSamples(t *testing.T, samples []floatHistogramSTSample) {
t.Helper()
c := NewFloatHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
for _, s := range samples {
_, _, app, err = app.AppendFloatHistogram(nil, s.st, s.t, s.fh, false)
require.NoError(t, err)
}
require.Equal(t, len(samples), c.NumSamples())
it := c.Iterator(nil)
for i, s := range samples {
require.Equal(t, ValFloatHistogram, it.Next(), "sample %d", i)
require.Equal(t, s.t, it.AtT(), "sample %d: timestamp", i)
require.Equal(t, s.st, it.AtST(), "sample %d: start timestamp", i)
}
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
}
func TestFloatHistogramSTChunkST(t *testing.T) {
testChunkSTHandling(t, ValFloatHistogram, func() Chunk { return NewFloatHistogramSTChunk() })
}
func TestFloatHistogramSTBasic(t *testing.T) {
hs := tsdbutil.GenerateTestFloatHistograms(5)
requireFloatHistogramSTSamples(t, []floatHistogramSTSample{
{st: 0, t: 1000, fh: hs[0]},
{st: 0, t: 2000, fh: hs[1]},
{st: 0, t: 3000, fh: hs[2]},
{st: 0, t: 4000, fh: hs[3]},
{st: 0, t: 5000, fh: hs[4]},
})
}
func TestFloatHistogramSTChunkAppendAndIterate(t *testing.T) {
fhs := tsdbutil.GenerateTestFloatHistograms(5)
requireFloatHistogramSTSamples(t, []floatHistogramSTSample{
{st: 100, t: 1000, fh: fhs[0]},
{st: 100, t: 2000, fh: fhs[1]},
{st: 200, t: 3000, fh: fhs[2]},
{st: 200, t: 4000, fh: fhs[3]},
{st: 300, t: 5000, fh: fhs[4]},
})
}
func TestFloatHistogramSTChunkCounterReset(t *testing.T) {
c := NewFloatHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
fh1 := &histogram.FloatHistogram{
Count: 10,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
},
PositiveBuckets: []float64{6, 3},
}
_, _, app, err = app.AppendFloatHistogram(nil, 100, 1000, fh1, false)
require.NoError(t, err)
fh2 := fh1.Copy()
fh2.Count = 3
fh2.Sum = 5.0
fh2.PositiveBuckets = []float64{2, 1}
fh2.CounterResetHint = histogram.CounterReset
// Ensure counter reset produces new chunk.
newChunk, recoded, newApp, err := app.AppendFloatHistogram(nil, 200, 2000, fh2, false)
require.NoError(t, err)
require.NotNil(t, newChunk)
require.False(t, recoded)
stChunk, ok := newChunk.(*FloatHistogramSTChunk)
require.True(t, ok)
require.Equal(t, CounterReset, stChunk.GetCounterResetHeader())
require.Equal(t, 1, stChunk.NumSamples())
// Verify ST is preserved in the new chunk.
it := stChunk.Iterator(nil)
require.Equal(t, ValFloatHistogram, it.Next())
require.Equal(t, int64(2000), it.AtT())
require.Equal(t, int64(200), it.AtST())
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
fh3 := fh2.Copy()
fh3.CounterResetHint = histogram.UnknownCounterReset
fh3.Count = 8
fh3.Sum = 10.0
fh3.PositiveBuckets = []float64{4, 2}
_, _, _, err = newApp.AppendFloatHistogram(nil, 300, 3000, fh3, false)
require.NoError(t, err)
require.Equal(t, 2, stChunk.NumSamples())
}
func TestFloatHistogramSTChunkRecode(t *testing.T) {
c := NewFloatHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
h1 := &histogram.Histogram{
Count: 27,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4},
NegativeSpans: []histogram.Span{{Offset: 1, Length: 1}},
NegativeBuckets: []int64{1},
}
fh1 := h1.ToFloat(nil)
_, _, app, err = app.AppendFloatHistogram(nil, 100, 1000, fh1, false)
require.NoError(t, err)
// Force recode with expanded bucket layout
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.NegativeSpans = []histogram.Span{{Offset: 0, Length: 2}}
h2.Count = 35
h2.ZeroCount++
h2.Sum = 30
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1}
h2.NegativeBuckets = []int64{2, -1}
fh2 := h2.ToFloat(nil)
// Ensure recode produces a new chunk.
newChunk, recoded, newApp, err := app.AppendFloatHistogram(nil, 200, 2000, fh2, false)
require.NoError(t, err)
require.NotNil(t, newChunk)
require.True(t, recoded)
stChunk, ok := newChunk.(*FloatHistogramSTChunk)
require.True(t, ok)
require.Equal(t, 2, stChunk.NumSamples())
it := stChunk.Iterator(nil)
require.Equal(t, ValFloatHistogram, it.Next())
require.Equal(t, int64(1000), it.AtT())
require.Equal(t, int64(100), it.AtST())
require.Equal(t, ValFloatHistogram, it.Next())
require.Equal(t, int64(2000), it.AtT())
require.Equal(t, int64(200), it.AtST())
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
fh3 := fh2.Copy()
fh3.Count = 40
fh3.Sum = 35
for i := range fh3.PositiveBuckets {
fh3.PositiveBuckets[i]++
}
_, _, _, err = newApp.AppendFloatHistogram(nil, 300, 3000, fh3, false)
require.NoError(t, err)
require.Equal(t, 3, stChunk.NumSamples())
}
func TestFloatHistogramST_MoreThan127Samples(t *testing.T) {
c := NewFloatHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
fh := &histogram.FloatHistogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
},
PositiveBuckets: []float64{6, 3},
}
const numSamples = maxFirstSTChangeOn + 3 // 130
for i := range int(numSamples) {
fhi := fh.Copy()
fhi.Count = float64(5 + i)
fhi.Sum = float64(18 + i)
_, _, app, err = app.AppendFloatHistogram(nil, 500, int64(1000+i*1000), fhi, false)
require.NoError(t, err)
}
require.Equal(t, int(numSamples), c.NumSamples())
// Verify all samples round-trip correctly.
it := c.Iterator(nil)
for i := range int(numSamples) {
require.Equal(t, ValFloatHistogram, it.Next())
require.Equal(t, int64(1000+i*1000), it.AtT())
require.Equal(t, int64(500), it.AtST())
}
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
c2 := NewFloatHistogramSTChunk()
app2, err := c2.Appender()
require.NoError(t, err)
for i := range int(maxFirstSTChangeOn + 1) {
fhi := fh.Copy()
fhi.Count = float64(5 + i)
fhi.Sum = float64(18 + i)
_, _, app2, err = app2.AppendFloatHistogram(nil, 0, int64(1000+i*1000), fhi, false)
require.NoError(t, err)
}
for i := range 3 {
fhi := fh.Copy()
fhi.Count = float64(200 + i)
fhi.Sum = float64(200 + i)
_, _, app2, err = app2.AppendFloatHistogram(nil, 100, int64(200000+i*1000), fhi, false)
require.NoError(t, err)
}
it2 := c2.Iterator(nil)
for i := range int(maxFirstSTChangeOn + 1) {
require.Equal(t, ValFloatHistogram, it2.Next())
require.Equal(t, int64(1000+i*1000), it2.AtT())
require.Equal(t, int64(0), it2.AtST())
}
for i := range 3 {
require.Equal(t, ValFloatHistogram, it2.Next())
require.Equal(t, int64(200000+i*1000), it2.AtT())
require.Equal(t, int64(100), it2.AtST())
}
require.Equal(t, ValNone, it2.Next())
require.NoError(t, it2.Err())
}
func TestFloatHistogramSTChunkMixedST(t *testing.T) {
fhs := tsdbutil.GenerateTestFloatHistograms(5)
requireFloatHistogramSTSamples(t, []floatHistogramSTSample{
{st: 0, t: 1000, fh: fhs[0]},
{st: 0, t: 2000, fh: fhs[1]},
{st: 100, t: 3000, fh: fhs[2]},
{st: 0, t: 4000, fh: fhs[3]},
{st: 200, t: 5000, fh: fhs[4]},
})
}

View File

@ -0,0 +1,502 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"encoding/binary"
"errors"
"fmt"
"math"
"github.com/prometheus/prometheus/model/histogram"
)
const histogramSTHeaderSize = 4
// HistogramSTChunk is a chunk for histogram samples with start timestamp (ST) support.
// It extends the HistogramChunk format with a 1-byte ST header after the flags byte.
//
// Header layout (4 bytes):
//
// bytes 0-1: sample count (big-endian uint16)
// byte 2: flags (bits 7-6 = counter reset header)
// byte 3: ST header (bit 7 = firstSTKnown, bits 6-0 = firstSTChangeOn)
type HistogramSTChunk struct {
b bstream
}
// NewHistogramSTChunk returns a new empty HistogramSTChunk.
func NewHistogramSTChunk() *HistogramSTChunk {
b := make([]byte, histogramSTHeaderSize, chunkAllocationSize)
return &HistogramSTChunk{b: bstream{stream: b, count: 0}}
}
func (c *HistogramSTChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (*HistogramSTChunk) Encoding() Encoding { return EncHistogramST }
// Bytes returns the underlying byte slice of the chunk.
func (c *HistogramSTChunk) Bytes() []byte {
return c.b.bytes()
}
// NumSamples returns the number of samples in the chunk.
func (c *HistogramSTChunk) NumSamples() int {
return int(binary.BigEndian.Uint16(c.b.bytes()))
}
// GetCounterResetHeader returns the counter reset header from the flags byte.
func (c *HistogramSTChunk) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(c.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
}
// Compact implements the Chunk interface.
func (c *HistogramSTChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
buf := make([]byte, l)
copy(buf, c.b.stream)
c.b.stream = buf
}
}
// Appender implements the Chunk interface.
func (c *HistogramSTChunk) Appender() (Appender, error) {
if len(c.b.stream) == histogramSTHeaderSize {
return &HistogramSTAppender{
HistogramAppender: HistogramAppender{
b: &c.b,
t: math.MinInt64,
leading: 0xff},
}, nil
}
it := c.iterator(nil)
for it.Next() == ValHistogram {
}
if err := it.Err(); err != nil {
return nil, err
}
// Set the bit position for continuing writes. The iterator's reader tracks
// how many bits remain unread in the last byte.
c.b.count = it.br.valid
a := &HistogramSTAppender{
HistogramAppender: HistogramAppender{
b: &c.b,
schema: it.schema,
zThreshold: it.zThreshold,
pSpans: it.pSpans,
nSpans: it.nSpans,
customValues: it.customValues,
t: it.t,
cnt: it.cnt,
zCnt: it.zCnt,
tDelta: it.tDelta,
cntDelta: it.cntDelta,
zCntDelta: it.zCntDelta,
pBuckets: it.pBuckets,
nBuckets: it.nBuckets,
pBucketsDelta: it.pBucketsDelta,
nBucketsDelta: it.nBucketsDelta,
sum: it.sum,
leading: it.leading,
trailing: it.trailing,
},
st: it.st,
stDiff: it.stDiff,
firstSTKnown: it.firstSTKnown,
firstSTChangeOn: uint16(it.firstSTChangeOn),
}
return a, nil
}
func newHistogramSTIterator(b []byte) *histogramSTIterator {
it := &histogramSTIterator{
histogramIterator: histogramIterator{
br: newBReader(b[histogramSTHeaderSize:]),
numTotal: binary.BigEndian.Uint16(b),
t: math.MinInt64,
},
}
it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask)
it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:])
return it
}
func (c *HistogramSTChunk) iterator(it Iterator) *histogramSTIterator {
if histIter, ok := it.(*histogramSTIterator); ok {
histIter.Reset(c.b.bytes())
return histIter
}
return newHistogramSTIterator(c.b.bytes())
}
// Iterator implements the Chunk interface.
func (c *HistogramSTChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}
// HistogramSTAppender is an Appender for histogram samples with start timestamp support.
// It embeds HistogramAppender and adds ST encoding after each sample.
type HistogramSTAppender struct {
HistogramAppender
st int64
stDiff int64
firstSTChangeOn uint16
firstSTKnown bool
}
// encodeST encodes the start timestamp for the current sample.
// It must be called after appendHistogram() which increments the sample count.
// prevT is the timestamp of the previous sample (before appendHistogram updated a.t).
// For sample 0, prevT is unused.
func (a *HistogramSTAppender) encodeST(prevT, st int64) {
num := binary.BigEndian.Uint16(a.b.bytes())
switch {
case num == 1: // First sample (count was just incremented from 0).
if st != 0 {
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, a.t-st)] {
a.b.writeByte(b)
}
a.firstSTKnown = true
writeHeaderFirstSTKnown(a.b.bytes()[histogramSTHeaderSize-1:])
}
case num == 2: // Second sample.
if st != a.st {
stDiff := prevT - st
a.firstSTChangeOn = 1
writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], 1)
putVarbitInt(a.b, stDiff)
a.stDiff = stDiff
}
default: // Sample N >= 2.
// Fast path: no ST data to write.
if st == 0 && num-1 != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown {
break
}
if a.firstSTChangeOn == 0 {
if st != a.st || num-1 == maxFirstSTChangeOn {
stDiff := prevT - st
a.firstSTChangeOn = num - 1
writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], num-1)
putVarbitInt(a.b, stDiff)
a.stDiff = stDiff
}
} else {
stDiff := prevT - st
putVarbitInt(a.b, stDiff-a.stDiff)
a.stDiff = stDiff
}
}
a.st = st
}
// appendHistogramST encodes a histogram sample with start timestamp.
func (a *HistogramSTAppender) appendHistogramST(st, t int64, h *histogram.Histogram) {
prevT := a.t
a.appendHistogram(t, h)
a.encodeST(prevT, st)
}
func (*HistogramSTAppender) Append(int64, int64, float64) {
panic("appended a float sample to a histogram chunk")
}
func (*HistogramSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a histogram chunk")
}
// AppendHistogram implements Appender for HistogramSTAppender.
func (a *HistogramSTAppender) AppendHistogram(prev *HistogramAppender, st, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
if a.NumSamples() == 0 {
a.appendHistogramST(st, t, h)
if h.CounterResetHint == histogram.GaugeType {
a.setCounterResetHeader(GaugeType)
return nil, false, a, nil
}
switch {
case h.CounterResetHint == histogram.CounterReset:
a.setCounterResetHeader(CounterReset)
case prev != nil:
_, _, _, _, _, counterReset := prev.appendable(h)
a.setCounterResetHeader(counterReset)
}
return nil, false, a, nil
}
// Adding counter-like histogram.
if h.CounterResetHint != histogram.GaugeType {
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterResetHint := a.appendable(h)
if !okToAppend || counterResetHint != NotCounterReset {
if appendOnly {
if counterResetHint == CounterReset {
return nil, false, a, errors.New("histogram counter reset")
}
return nil, false, a, errors.New("histogram schema change")
}
newChunk := NewHistogramSTChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err)
}
happ := app.(*HistogramSTAppender)
happ.setCounterResetHeader(counterResetHint)
happ.appendHistogramST(st, t, h)
return newChunk, false, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
copy(h.PositiveSpans, a.pSpans)
h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
copy(h.NegativeSpans, a.nSpans)
} else {
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
}
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recodeST(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
app.(*HistogramSTAppender).appendHistogramST(st, t, h)
return chk, true, app, nil
}
a.appendHistogramST(st, t, h)
return nil, false, a, nil
}
// Adding gauge histogram.
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend := a.appendableGauge(h)
if !okToAppend {
if appendOnly {
return nil, false, a, errors.New("gauge histogram schema change")
}
newChunk := NewHistogramSTChunk()
app, err := newChunk.Appender()
if err != nil {
panic(err)
}
happ := app.(*HistogramSTAppender)
happ.setCounterResetHeader(GaugeType)
happ.appendHistogramST(st, t, h)
return newChunk, false, app, nil
}
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative backwards inserts", len(pBackwardInserts), len(nBackwardInserts))
}
h.PositiveSpans = pMergedSpans
h.NegativeSpans = nMergedSpans
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
}
chk, app := a.recodeST(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
app.(*HistogramSTAppender).appendHistogramST(st, t, h)
return chk, true, app, nil
}
a.appendHistogramST(st, t, h)
return nil, false, a, nil
}
// recodeST is like HistogramAppender.recode but creates HistogramSTChunk and preserves ST.
func (a *HistogramSTAppender) recodeST(
positiveInserts, negativeInserts []Insert,
positiveSpans, negativeSpans []histogram.Span,
) (Chunk, Appender) {
byts := a.b.bytes()
it := newHistogramSTIterator(byts)
hc := NewHistogramSTChunk()
app, err := hc.Appender()
if err != nil {
panic(err)
}
happ := app.(*HistogramSTAppender)
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
for it.Next() == ValHistogram {
tOld, hOld := it.AtHistogram(nil)
stOld := it.AtST()
var positiveBuckets, negativeBuckets []int64
if numPositiveBuckets > 0 {
positiveBuckets = make([]int64, numPositiveBuckets)
}
if numNegativeBuckets > 0 {
negativeBuckets = make([]int64, numNegativeBuckets)
}
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans
if len(positiveInserts) > 0 {
hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, true)
}
if len(negativeInserts) > 0 {
hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true)
}
happ.appendHistogramST(stOld, tOld, hOld)
}
happ.setCounterResetHeader(CounterResetHeader(byts[histogramFlagPos] & CounterResetHeaderMask))
return hc, app
}
// histogramSTIterator is an iterator for HistogramSTChunk that decodes ST after each sample.
type histogramSTIterator struct {
histogramIterator
// ST fields.
st int64
stDiff int64
firstSTKnown bool
firstSTChangeOn uint8
}
func (it *histogramSTIterator) AtST() int64 {
return it.st
}
func (it *histogramSTIterator) Reset(b []byte) {
it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:])
it.st = 0
it.stDiff = 0
// Reset the embedded histogramIterator but with the correct header offset.
it.br = newBReader(b[histogramSTHeaderSize:])
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask)
it.t, it.cnt, it.zCnt = 0, 0, 0
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0
if it.atHistogramCalled {
it.atHistogramCalled = false
it.pBuckets, it.nBuckets = nil, nil
it.pSpans, it.nSpans = nil, nil
} else {
it.pBuckets = it.pBuckets[:0]
it.nBuckets = it.nBuckets[:0]
}
if it.atFloatHistogramCalled {
it.atFloatHistogramCalled = false
it.pFloatBuckets, it.nFloatBuckets = nil, nil
} else {
it.pFloatBuckets = it.pFloatBuckets[:0]
it.nFloatBuckets = it.nFloatBuckets[:0]
}
it.pBucketsDelta = it.pBucketsDelta[:0]
it.nBucketsDelta = it.nBucketsDelta[:0]
it.sum = 0
it.leading = 0
it.trailing = 0
it.err = nil
it.customValues = nil
}
// Next advances the iterator by one sample.
// It calls the embedded histogramIterator.Next() to decode the histogram sample,
// then decodes the ST data that follows in the bitstream.
func (it *histogramSTIterator) Next() ValueType {
prevT := it.t
vt := it.histogramIterator.Next()
if vt == ValNone {
return ValNone
}
if err := it.decodeST(it.numRead, prevT); err != nil {
it.err = err
return ValNone
}
return vt
}
// Seek advances the iterator forward to the first sample with timestamp >= t.
func (it *histogramSTIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone
}
for t > it.t || it.numRead == 0 {
if it.Next() == ValNone {
return ValNone
}
}
return ValHistogram
}
// decodeST decodes the start timestamp for the current sample.
// numRead is the number of samples read so far (already incremented by histogramIterator.Next()).
// prevT is the timestamp of the previous sample (before histogramIterator.Next() updated it.t).
func (it *histogramSTIterator) decodeST(numRead uint16, prevT int64) error {
switch {
case numRead == 1: // After sample 0.
if it.firstSTKnown {
stDiff, err := it.br.readVarint()
if err != nil {
return err
}
it.stDiff = stDiff
it.st = it.t - stDiff
}
case numRead == 2: // After sample 1.
if it.firstSTChangeOn == 1 {
sdod, err := readVarbitInt(&it.br)
if err != nil {
return err
}
it.stDiff = sdod
it.st = prevT - sdod
}
default: // After sample N >= 2.
if it.firstSTChangeOn > 0 && numRead-1 >= uint16(it.firstSTChangeOn) {
sdod, err := readVarbitInt(&it.br)
if err != nil {
return err
}
if numRead-1 == uint16(it.firstSTChangeOn) {
it.stDiff = sdod
} else {
it.stDiff += sdod
}
it.st = prevT - it.stDiff
}
}
return nil
}

View File

@ -0,0 +1,330 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
type histogramSTSample struct {
st, t int64
h *histogram.Histogram
}
func BenchmarkHistogramSTWrite(b *testing.B) {
const n = 120
hs := tsdbutil.GenerateTestHistograms(n)
b.ReportAllocs()
for b.Loop() {
c := NewHistogramSTChunk()
app, _ := c.Appender()
for i, h := range hs {
_, _, app, _ = app.AppendHistogram(nil, 500, int64(i)*15000, h, false)
}
}
}
func BenchmarkHistogramSTRead(b *testing.B) {
const n = 120
hs := tsdbutil.GenerateTestHistograms(n)
c := NewHistogramSTChunk()
app, err := c.Appender()
require.NoError(b, err)
for i, h := range hs {
_, _, app, err = app.AppendHistogram(nil, 500, int64(i)*15000, h, false)
require.NoError(b, err)
}
b.ReportAllocs()
var it Iterator
for b.Loop() {
it = c.Iterator(it)
for it.Next() != ValNone {
}
}
}
// requireHistogramSTSamples appends the given histogram samples to a new
// HistogramSTChunk, then verifies all samples round-trip correctly through
// the iterator.
func requireHistogramSTSamples(t *testing.T, samples []histogramSTSample) {
t.Helper()
c := NewHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
for _, s := range samples {
_, _, app, err = app.AppendHistogram(nil, s.st, s.t, s.h, false)
require.NoError(t, err)
}
require.Equal(t, len(samples), c.NumSamples())
it := c.Iterator(nil)
for _, s := range samples {
require.Equal(t, ValHistogram, it.Next())
require.Equal(t, s.t, it.AtT())
require.Equal(t, s.st, it.AtST())
}
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
}
func TestHistogramSTChunkST(t *testing.T) {
testChunkSTHandling(t, ValHistogram, func() Chunk { return NewHistogramSTChunk() })
}
func TestHistogramSTBasic(t *testing.T) {
hs := tsdbutil.GenerateTestHistograms(5)
requireHistogramSTSamples(t, []histogramSTSample{
{st: 0, t: 1000, h: hs[0]},
{st: 0, t: 2000, h: hs[1]},
{st: 0, t: 3000, h: hs[2]},
{st: 0, t: 4000, h: hs[3]},
{st: 0, t: 5000, h: hs[4]},
})
}
func TestHistogramSTChunkAppendAndIterate(t *testing.T) {
hs := tsdbutil.GenerateTestHistograms(5)
requireHistogramSTSamples(t, []histogramSTSample{
{st: 100, t: 1000, h: hs[0]},
{st: 100, t: 2000, h: hs[1]},
{st: 200, t: 3000, h: hs[2]},
{st: 200, t: 4000, h: hs[3]},
{st: 300, t: 5000, h: hs[4]},
})
}
func TestHistogramSTChunkCounterReset(t *testing.T) {
c := NewHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
h1 := &histogram.Histogram{
Count: 10,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
},
PositiveBuckets: []int64{6, -3},
}
_, _, app, err = app.AppendHistogram(nil, 100, 1000, h1, false)
require.NoError(t, err)
h2 := h1.Copy()
h2.Count = 3
h2.Sum = 5.0
h2.PositiveBuckets = []int64{2, -1}
h2.CounterResetHint = histogram.CounterReset
newChunk, recoded, newApp, err := app.AppendHistogram(nil, 200, 2000, h2, false)
require.NoError(t, err)
require.NotNil(t, newChunk)
require.False(t, recoded)
stChunk, ok := newChunk.(*HistogramSTChunk)
require.True(t, ok)
require.Equal(t, CounterReset, stChunk.GetCounterResetHeader())
require.Equal(t, 1, stChunk.NumSamples())
// Verify ST is preserved in the new chunk.
it := stChunk.Iterator(nil)
require.Equal(t, ValHistogram, it.Next())
require.Equal(t, int64(2000), it.AtT())
require.Equal(t, int64(200), it.AtST())
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
h3 := h2.Copy()
h3.CounterResetHint = histogram.UnknownCounterReset
h3.Count = 8
h3.Sum = 10.0
h3.PositiveBuckets = []int64{4, -2}
_, _, _, err = newApp.AppendHistogram(nil, 300, 3000, h3, false)
require.NoError(t, err)
require.Equal(t, 2, stChunk.NumSamples())
}
func TestHistogramSTChunkRecode(t *testing.T) {
c := NewHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
h1 := &histogram.Histogram{
Count: 27,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4},
NegativeSpans: []histogram.Span{{Offset: 1, Length: 1}},
NegativeBuckets: []int64{1},
}
_, _, app, err = app.AppendHistogram(nil, 100, 1000, h1, false)
require.NoError(t, err)
// Force recode with expanded bucket layout.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.NegativeSpans = []histogram.Span{{Offset: 0, Length: 2}}
h2.Count = 35
h2.ZeroCount++
h2.Sum = 30
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1}
h2.NegativeBuckets = []int64{2, -1}
// Ensure recode produced a new chunk
newChunk, recoded, newApp, err := app.AppendHistogram(nil, 200, 2000, h2, false)
require.NoError(t, err)
require.NotNil(t, newChunk)
require.True(t, recoded)
stChunk, ok := newChunk.(*HistogramSTChunk)
require.True(t, ok)
require.Equal(t, 2, stChunk.NumSamples())
it := stChunk.Iterator(nil)
require.Equal(t, ValHistogram, it.Next())
require.Equal(t, int64(1000), it.AtT())
require.Equal(t, int64(100), it.AtST())
require.Equal(t, ValHistogram, it.Next())
require.Equal(t, int64(2000), it.AtT())
require.Equal(t, int64(200), it.AtST())
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
h3 := h2.Copy()
h3.Count = 40
h3.Sum = 35
for i := range h3.PositiveBuckets {
h3.PositiveBuckets[i]++
}
_, _, _, err = newApp.AppendHistogram(nil, 300, 3000, h3, false)
require.NoError(t, err)
require.Equal(t, 3, stChunk.NumSamples())
}
func TestHistogramST_MoreThan127Samples(t *testing.T) {
c := NewHistogramSTChunk()
app, err := c.Appender()
require.NoError(t, err)
h := &histogram.Histogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
},
PositiveBuckets: []int64{6, -3},
}
const numSamples = maxFirstSTChangeOn + 3 // 130
for i := range int(numSamples) {
hi := h.Copy()
hi.Count = uint64(5 + i)
hi.Sum = float64(18 + i)
_, _, app, err = app.AppendHistogram(nil, 500, int64(1000+i*1000), hi, false)
require.NoError(t, err)
}
require.Equal(t, int(numSamples), c.NumSamples())
it := c.Iterator(nil)
for i := range int(numSamples) {
require.Equal(t, ValHistogram, it.Next())
require.Equal(t, int64(1000+i*1000), it.AtT())
require.Equal(t, int64(500), it.AtST())
}
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
// Test ST changing after the boundary.
c2 := NewHistogramSTChunk()
app2, err := c2.Appender()
require.NoError(t, err)
for i := range int(maxFirstSTChangeOn + 1) {
hi := h.Copy()
hi.Count = uint64(5 + i)
hi.Sum = float64(18 + i)
_, _, app2, err = app2.AppendHistogram(nil, 0, int64(1000+i*1000), hi, false)
require.NoError(t, err)
}
for i := range 3 {
hi := h.Copy()
hi.Count = uint64(200 + i)
hi.Sum = float64(200 + i)
_, _, app2, err = app2.AppendHistogram(nil, 100, int64(200000+i*1000), hi, false)
require.NoError(t, err)
}
it2 := c2.Iterator(nil)
for i := range int(maxFirstSTChangeOn + 1) {
require.Equal(t, ValHistogram, it2.Next())
require.Equal(t, int64(1000+i*1000), it2.AtT())
require.Equal(t, int64(0), it2.AtST())
}
for i := range 3 {
require.Equal(t, ValHistogram, it2.Next())
require.Equal(t, int64(200000+i*1000), it2.AtT())
require.Equal(t, int64(100), it2.AtST())
}
require.Equal(t, ValNone, it2.Next())
require.NoError(t, it2.Err())
}
func TestHistogramSTChunkMixedST(t *testing.T) {
hs := tsdbutil.GenerateTestHistograms(5)
requireHistogramSTSamples(t, []histogramSTSample{
{st: 0, t: 1000, h: hs[0]},
{st: 0, t: 2000, h: hs[1]},
{st: 100, t: 3000, h: hs[2]},
{st: 0, t: 4000, h: hs[3]},
{st: 200, t: 5000, h: hs[4]},
})
}

View File

@ -917,9 +917,9 @@ func (DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compact
samples := uint64(chk.Chunk.NumSamples())
meta.Stats.NumSamples += samples
switch chk.Chunk.Encoding() {
case chunkenc.EncHistogram, chunkenc.EncFloatHistogram:
case chunkenc.EncHistogram, chunkenc.EncFloatHistogram, chunkenc.EncHistogramST, chunkenc.EncFloatHistogramST:
meta.Stats.NumHistogramSamples += samples
case chunkenc.EncXOR:
case chunkenc.EncXOR, chunkenc.EncXOR2:
meta.Stats.NumFloatSamples += samples
}
}

View File

@ -7735,67 +7735,117 @@ func TestHeadAppender_STStorage_WBLReplay(t *testing.T) {
// TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding
// is used based on EnableSTStorage setting.
func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) {
samples := []struct {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
type appendableSample struct {
st int64
ts int64
fSample float64
}{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
h *histogram.Histogram
fh *histogram.FloatHistogram
}
for _, enableST := range []bool{false, true} {
t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(enableST)
opts.EnableXOR2Encoding.Store(enableST) // ST storage implies XOR2 encoding.
h, _ := newTestHeadWithOptions(t, compression.None, opts)
for _, tc := range []struct {
name string
samples []appendableSample
stEncoding chunkenc.Encoding
regularEncoding chunkenc.Encoding
}{
{
name: "float samples",
samples: []appendableSample{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
},
stEncoding: chunkenc.EncXOR2,
regularEncoding: chunkenc.EncXOR,
},
{
name: "histogram samples",
samples: []appendableSample{
{st: 10, ts: 100, h: testHistogram},
{st: 20, ts: 200, h: testHistogram},
},
stEncoding: chunkenc.EncHistogramST,
regularEncoding: chunkenc.EncHistogram,
},
{
name: "float histogram samples",
samples: []appendableSample{
{st: 10, ts: 100, fh: testFloatHistogram},
{st: 20, ts: 200, fh: testFloatHistogram},
},
stEncoding: chunkenc.EncFloatHistogramST,
regularEncoding: chunkenc.EncFloatHistogram,
},
} {
for _, enableST := range []bool{false, true} {
t.Run(fmt.Sprintf("%s/EnableSTStorage=%t", tc.name, enableST), func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(enableST)
opts.EnableXOR2Encoding.Store(enableST) // ST storage implies XOR2 encoding.
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
a := h.Appender(context.Background())
for _, s := range samples {
_, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st)
require.NoError(t, err)
_, err = a.Append(0, lbls, s.ts, s.fSample)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
require.NotEmpty(t, chkMetas)
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
encoding := chk.Encoding()
if enableST {
require.Equal(t, chunkenc.EncXOR2, encoding,
"Expected ST-capable encoding when EnableSTStorage is true")
} else {
require.Equal(t, chunkenc.EncXOR, encoding,
"Expected regular XOR encoding when EnableSTStorage is false")
lbls := labels.FromStrings("foo", "bar")
a := h.Appender(context.Background())
for _, s := range tc.samples {
switch {
case s.h != nil:
_, err := a.AppendHistogramSTZeroSample(0, lbls, s.ts, s.st, s.h, nil)
require.NoError(t, err)
_, err = a.AppendHistogram(0, lbls, s.ts, s.h, nil)
require.NoError(t, err)
case s.fh != nil:
_, err := a.AppendHistogramSTZeroSample(0, lbls, s.ts, s.st, nil, s.fh)
require.NoError(t, err)
_, err = a.AppendHistogram(0, lbls, s.ts, nil, s.fh)
require.NoError(t, err)
default:
_, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st)
require.NoError(t, err)
_, err = a.Append(0, lbls, s.ts, s.fSample)
require.NoError(t, err)
}
}
}
})
require.NoError(t, a.Commit())
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
require.NotEmpty(t, chkMetas)
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
encoding := chk.Encoding()
if enableST {
require.Equal(t, tc.stEncoding, encoding,
"Expected ST-capable encoding when EnableSTStorage is true")
} else {
require.Equal(t, tc.regularEncoding, encoding,
"Expected regular encoding when EnableSTStorage is false")
}
}
})
}
}
}

View File

@ -1212,7 +1212,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
buf.PutUvarintBytes(s.headChunks.chunk.Bytes())
switch enc {
case chunkenc.EncXOR:
case chunkenc.EncXOR, chunkenc.EncXOR2:
// Backwards compatibility for old sampleBuf which had last 4 samples.
for range 3 {
buf.PutBE64int64(0)
@ -1220,9 +1220,9 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
}
buf.PutBE64int64(0)
buf.PutBEFloat64(s.lastValue)
case chunkenc.EncHistogram:
case chunkenc.EncHistogram, chunkenc.EncHistogramST:
record.EncodeHistogram(&buf, s.lastHistogramValue)
default: // chunkenc.FloatHistogram.
default: // chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST.
record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue)
}
}
@ -1273,10 +1273,10 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh
}
_ = dec.Be64int64()
csr.lastValue = dec.Be64Float64()
case chunkenc.EncHistogram:
case chunkenc.EncHistogram, chunkenc.EncHistogramST:
csr.lastHistogramValue = &histogram.Histogram{}
record.DecodeHistogram(&dec, csr.lastHistogramValue)
default: // chunkenc.FloatHistogram.
default: // chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST.
csr.lastFloatHistogramValue = &histogram.FloatHistogram{}
record.DecodeFloatHistogram(&dec, csr.lastFloatHistogramValue)
}

View File

@ -96,11 +96,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memCh
encoding := chunkenc.ValFloat.ChunkEncoding(useXOR2)
switch {
case s.h != nil:
// TODO(krajorama): use ST capable histogram chunk.
encoding = chunkenc.EncHistogram
encoding = chunkenc.ValHistogram.ChunkEncoding(useXOR2)
case s.fh != nil:
// TODO(krajorama): use ST capable float histogram chunk.
encoding = chunkenc.EncFloatHistogram
encoding = chunkenc.ValFloatHistogram.ChunkEncoding(useXOR2)
}
// prevApp is the appender for the previous sample.
@ -125,10 +123,15 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memCh
switch encoding {
case chunkenc.EncXOR, chunkenc.EncXOR2:
app.Append(s.st, s.t, s.f)
case chunkenc.EncHistogram:
// TODO(krajorama): handle ST capable histogram chunk.
case chunkenc.EncHistogram, chunkenc.EncHistogramST:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
var prevHApp *chunkenc.HistogramAppender
switch p := prevApp.(type) {
case *chunkenc.HistogramAppender:
prevHApp = p
case *chunkenc.HistogramSTAppender:
prevHApp = &p.HistogramAppender
}
var (
newChunk chunkenc.Chunk
recoded bool
@ -141,10 +144,15 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memCh
}
chunk = newChunk
}
case chunkenc.EncFloatHistogram:
// TODO(krajorama): handle ST capable float histogram chunk.
case chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
var prevHApp *chunkenc.FloatHistogramAppender
switch p := prevApp.(type) {
case *chunkenc.FloatHistogramAppender:
prevHApp = p
case *chunkenc.FloatHistogramSTAppender:
prevHApp = &p.FloatHistogramAppender
}
var (
newChunk chunkenc.Chunk
recoded bool

View File

@ -367,30 +367,44 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
}
}
// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with useXOR2=true and useXOR2=false for float samples.
// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with useXOR2=true and useXOR2=false.
// When useXOR2=true, st values are preserved; when useXOR2=false, AtST() returns 0.
// TODO(@krajorama): Add histogram test cases once ST storage is implemented for histograms.
func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
h1 := tsdbutil.GenerateTestHistogram(1)
h2 := tsdbutil.GenerateTestHistogram(2)
h3 := tsdbutil.GenerateTestHistogram(3)
fh1 := tsdbutil.GenerateTestFloatHistogram(1)
fh2 := tsdbutil.GenerateTestFloatHistogram(2)
fh3 := tsdbutil.GenerateTestFloatHistogram(3)
testCases := map[string]struct {
samples []sample
samples []sample
xor2Encoding chunkenc.Encoding // Expected encoding when useXOR2=true.
regularEncoding chunkenc.Encoding // Expected encoding when useXOR2=false.
}{
"floats with st=0": {
samples: []sample{
{st: 0, t: 1000, f: 43.0},
{st: 0, t: 1100, f: 42.0},
},
xor2Encoding: chunkenc.EncXOR2,
regularEncoding: chunkenc.EncXOR,
},
"floats with st=t": {
samples: []sample{
{st: 1000, t: 1000, f: 43.0},
{st: 1100, t: 1100, f: 42.0},
},
xor2Encoding: chunkenc.EncXOR2,
regularEncoding: chunkenc.EncXOR,
},
"floats with st=t-100": {
samples: []sample{
{st: 900, t: 1000, f: 43.0},
{st: 1000, t: 1100, f: 42.0},
},
xor2Encoding: chunkenc.EncXOR2,
regularEncoding: chunkenc.EncXOR,
},
"floats with varying st": {
samples: []sample{
@ -398,16 +412,67 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
{st: 1100, t: 1100, f: 42.0}, // st == t
{st: 0, t: 1200, f: 41.0}, // st == 0
},
xor2Encoding: chunkenc.EncXOR2,
regularEncoding: chunkenc.EncXOR,
},
"histograms with st=0": {
samples: []sample{
{st: 0, t: 1000, h: h1},
{st: 0, t: 1100, h: h2},
},
xor2Encoding: chunkenc.EncHistogramST,
regularEncoding: chunkenc.EncHistogram,
},
"histograms with st=t-100": {
samples: []sample{
{st: 900, t: 1000, h: h1},
{st: 1000, t: 1100, h: h2},
},
xor2Encoding: chunkenc.EncHistogramST,
regularEncoding: chunkenc.EncHistogram,
},
"histograms with varying st": {
samples: []sample{
{st: 500, t: 1000, h: h1},
{st: 1100, t: 1100, h: h2},
{st: 0, t: 1200, h: h3},
},
xor2Encoding: chunkenc.EncHistogramST,
regularEncoding: chunkenc.EncHistogram,
},
"float histograms with st=0": {
samples: []sample{
{st: 0, t: 1000, fh: fh1},
{st: 0, t: 1100, fh: fh2},
},
xor2Encoding: chunkenc.EncFloatHistogramST,
regularEncoding: chunkenc.EncFloatHistogram,
},
"float histograms with st=t-100": {
samples: []sample{
{st: 900, t: 1000, fh: fh1},
{st: 1000, t: 1100, fh: fh2},
},
xor2Encoding: chunkenc.EncFloatHistogramST,
regularEncoding: chunkenc.EncFloatHistogram,
},
"float histograms with varying st": {
samples: []sample{
{st: 500, t: 1000, fh: fh1},
{st: 1100, t: 1100, fh: fh2},
{st: 0, t: 1200, fh: fh3},
},
xor2Encoding: chunkenc.EncFloatHistogramST,
regularEncoding: chunkenc.EncFloatHistogram,
},
}
storageScenarios := []struct {
name string
useXOR2 bool
expectedEncoding chunkenc.Encoding
name string
useXOR2 bool
}{
{"useXOR2=true", true, chunkenc.EncXOR2},
{"useXOR2=false", false, chunkenc.EncXOR},
{"useXOR2=true", true},
{"useXOR2=false", false},
}
for name, tc := range testCases {
@ -415,7 +480,7 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
t.Run(name+"/"+ss.name, func(t *testing.T) {
oooChunk := OOOChunk{}
for _, s := range tc.samples {
oooChunk.Insert(s.st, s.t, s.f, nil, nil)
oooChunk.Insert(s.st, s.t, s.f, s.h, s.fh)
}
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, ss.useXOR2)
@ -423,28 +488,47 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
require.Len(t, chunks, 1, "number of chunks")
c := chunks[0]
require.Equal(t, ss.expectedEncoding, c.chunk.Encoding(), "chunk encoding")
expectedEnc := tc.regularEncoding
if ss.useXOR2 {
expectedEnc = tc.xor2Encoding
}
require.Equal(t, expectedEnc, c.chunk.Encoding(), "chunk encoding")
require.Equal(t, tc.samples[0].t, c.minTime, "chunk minTime")
require.Equal(t, tc.samples[len(tc.samples)-1].t, c.maxTime, "chunk maxTime")
// Verify samples can be read back with correct st and t values.
// Verify samples can be read back.
it := c.chunk.Iterator(nil)
sampleIndex := 0
for it.Next() == chunkenc.ValFloat {
gotT, gotF := it.At()
gotST := it.AtST()
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
expSample := tc.samples[sampleIndex]
switch vt {
case chunkenc.ValFloat:
gotT, gotF := it.At()
require.Equal(t, expSample.t, gotT, "sample %d t", sampleIndex)
require.Equal(t, expSample.f, gotF, "sample %d f", sampleIndex)
case chunkenc.ValHistogram:
gotT, gotH := it.AtHistogram(nil)
require.Equal(t, expSample.t, gotT, "sample %d t", sampleIndex)
expH := expSample.h.Copy()
expH.CounterResetHint = gotH.CounterResetHint
require.Equal(t, expH, gotH.Compact(0), "sample %d h", sampleIndex)
case chunkenc.ValFloatHistogram:
gotT, gotFH := it.AtFloatHistogram(nil)
require.Equal(t, expSample.t, gotT, "sample %d t", sampleIndex)
expFH := expSample.fh.Copy()
expFH.CounterResetHint = gotFH.CounterResetHint
require.Equal(t, expFH, gotFH.Compact(0), "sample %d fh", sampleIndex)
}
gotST := it.AtST()
if ss.useXOR2 {
// When useXOR2=true, st values should be preserved.
require.Equal(t, tc.samples[sampleIndex].st, gotST, "sample %d st", sampleIndex)
require.Equal(t, expSample.st, gotST, "sample %d st", sampleIndex)
} else {
// When useXOR2=false, AtST() should return 0.
require.Equal(t, int64(0), gotST, "sample %d st should be 0 when useXOR2=false", sampleIndex)
}
require.Equal(t, tc.samples[sampleIndex].t, gotT, "sample %d t", sampleIndex)
require.Equal(t, tc.samples[sampleIndex].f, gotF, "sample %d f", sampleIndex)
sampleIndex++
}
require.NoError(t, it.Err())
require.Equal(t, len(tc.samples), sampleIndex, "number of samples")
})
}