diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 13c544d5af..ba7676f4d2 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -649,17 +649,18 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. return errors.New("ChunkOrIterable should not return an iterable when reading a block") } switch chk.Encoding() { - case chunkenc.EncXOR: + case chunkenc.EncXOR, chunkenc.EncXOR2: floatChunkSamplesCount = append(floatChunkSamplesCount, chk.NumSamples()) floatChunkSize = append(floatChunkSize, len(chk.Bytes())) - case chunkenc.EncFloatHistogram: + case chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST: histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples()) histogramChunkSize = append(histogramChunkSize, len(chk.Bytes())) - fhchk, ok := chk.(*chunkenc.FloatHistogramChunk) - if !ok { - return errors.New("chunk is not FloatHistogramChunk") + if _, ok := chk.(*chunkenc.FloatHistogramChunk); !ok { + if _, ok := chk.(*chunkenc.FloatHistogramSTChunk); !ok { + return errors.New("chunk is not FloatHistogramChunk or FloatHistogramSTChunk") + } } - it := fhchk.Iterator(nil) + it := chk.Iterator(nil) bucketCount := 0 for it.Next() == chunkenc.ValFloatHistogram { _, f := it.AtFloatHistogram(nil) @@ -667,14 +668,15 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. bucketCount += len(f.NegativeBuckets) } histogramChunkBucketsCount = append(histogramChunkBucketsCount, bucketCount) - case chunkenc.EncHistogram: + case chunkenc.EncHistogram, chunkenc.EncHistogramST: histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples()) histogramChunkSize = append(histogramChunkSize, len(chk.Bytes())) - hchk, ok := chk.(*chunkenc.HistogramChunk) - if !ok { - return errors.New("chunk is not HistogramChunk") + if _, ok := chk.(*chunkenc.HistogramChunk); !ok { + if _, ok := chk.(*chunkenc.HistogramSTChunk); !ok { + return errors.New("chunk is not HistogramChunk or HistogramSTChunk") + } } - it := hchk.Iterator(nil) + it := chk.Iterator(nil) bucketCount := 0 for it.Next() == chunkenc.ValHistogram { _, f := it.AtHistogram(nil) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 3a405e8cf7..e5fa71ba97 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -31,6 +31,8 @@ const ( EncHistogram EncFloatHistogram EncXOR2 + EncHistogramST + EncFloatHistogramST ) func (e Encoding) String() string { @@ -45,13 +47,17 @@ func (e Encoding) String() string { return "floathistogram" case EncXOR2: return "XOR2" + case EncHistogramST: + return "histogramST" + case EncFloatHistogramST: + return "floathistogramST" } return "" } // IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { - return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOR2 + return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOR2 || e == EncHistogramST || e == EncFloatHistogramST } const ( @@ -199,8 +205,14 @@ func (v ValueType) ChunkEncoding(useXOR2 bool) Encoding { } return EncXOR case ValHistogram: + if useXOR2 { + return EncHistogramST + } return EncHistogram case ValFloatHistogram: + if useXOR2 { + return EncFloatHistogramST + } return EncFloatHistogram default: return EncNone @@ -296,10 +308,12 @@ type Pool interface { // pool is a memory pool of chunk objects. type pool struct { - xor sync.Pool - histogram sync.Pool - floatHistogram sync.Pool - xo2 sync.Pool + xor sync.Pool + histogram sync.Pool + floatHistogram sync.Pool + xo2 sync.Pool + histogramST sync.Pool + floatHistogramST sync.Pool } // NewPool returns a new pool. @@ -325,6 +339,16 @@ func NewPool() Pool { return &XOR2Chunk{b: bstream{}} }, }, + histogramST: sync.Pool{ + New: func() any { + return &HistogramSTChunk{b: bstream{}} + }, + }, + floatHistogramST: sync.Pool{ + New: func() any { + return &FloatHistogramSTChunk{b: bstream{}} + }, + }, } } @@ -339,6 +363,10 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { c = p.floatHistogram.Get().(*FloatHistogramChunk) case EncXOR2: c = p.xo2.Get().(*XOR2Chunk) + case EncHistogramST: + c = p.histogramST.Get().(*HistogramSTChunk) + case EncFloatHistogramST: + c = p.floatHistogramST.Get().(*FloatHistogramSTChunk) default: return nil, fmt.Errorf("invalid chunk encoding %q", e) } @@ -363,6 +391,12 @@ func (p *pool) Put(c Chunk) error { case EncXOR2: _, ok = c.(*XOR2Chunk) sp = &p.xo2 + case EncHistogramST: + _, ok = c.(*HistogramSTChunk) + sp = &p.histogramST + case EncFloatHistogramST: + _, ok = c.(*FloatHistogramSTChunk) + sp = &p.floatHistogramST default: return fmt.Errorf("invalid chunk encoding %q", c.Encoding()) } @@ -391,6 +425,10 @@ func FromData(e Encoding, d []byte) (Chunk, error) { return &FloatHistogramChunk{b: bstream{count: 0, stream: d}}, nil case EncXOR2: return &XOR2Chunk{b: bstream{count: 0, stream: d}}, nil + case EncHistogramST: + return &HistogramSTChunk{b: bstream{count: 0, stream: d}}, nil + case EncFloatHistogramST: + return &FloatHistogramSTChunk{b: bstream{count: 0, stream: d}}, nil } return nil, fmt.Errorf("invalid chunk encoding %q", e) } @@ -406,6 +444,10 @@ func NewEmptyChunk(e Encoding) (Chunk, error) { return NewFloatHistogramChunk(), nil case EncXOR2: return NewXOR2Chunk(), nil + case EncHistogramST: + return NewHistogramSTChunk(), nil + case EncFloatHistogramST: + return NewFloatHistogramSTChunk(), nil } return nil, fmt.Errorf("invalid chunk encoding %q", e) } diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 4e19f15b42..accf24a21f 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -146,6 +146,14 @@ func TestPool(t *testing.T) { name: "xor opt st", encoding: EncXOR2, }, + { + name: "histogram st", + encoding: EncHistogramST, + }, + { + name: "float histogram st", + encoding: EncFloatHistogramST, + }, { name: "invalid encoding", encoding: EncNone, @@ -169,6 +177,10 @@ func TestPool(t *testing.T) { b = &c.(*FloatHistogramChunk).b case EncXOR2: b = &c.(*XOR2Chunk).b + case EncHistogramST: + b = &c.(*HistogramSTChunk).b + case EncFloatHistogramST: + b = &c.(*FloatHistogramSTChunk).b default: b = &c.(*XORChunk).b } diff --git a/tsdb/chunkenc/float_histogram_st.go b/tsdb/chunkenc/float_histogram_st.go new file mode 100644 index 0000000000..62f01598e0 --- /dev/null +++ b/tsdb/chunkenc/float_histogram_st.go @@ -0,0 +1,513 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + + "github.com/prometheus/prometheus/model/histogram" +) + +// FloatHistogramSTChunk is a chunk for float histogram samples with start timestamp (ST) support. +// It extends the FloatHistogramChunk format with a 1-byte ST header after the flags byte. +// +// Header layout (4 bytes): +// +// bytes 0-1: sample count (big-endian uint16) +// byte 2: flags (bits 7-6 = counter reset header) +// byte 3: ST header (bit 7 = firstSTKnown, bits 6-0 = firstSTChangeOn) +type FloatHistogramSTChunk struct { + b bstream +} + +// NewFloatHistogramSTChunk returns a new empty FloatHistogramSTChunk. +func NewFloatHistogramSTChunk() *FloatHistogramSTChunk { + b := make([]byte, histogramSTHeaderSize, chunkAllocationSize) + return &FloatHistogramSTChunk{b: bstream{stream: b, count: 0}} +} + +// Reset resets the chunk given stream. +func (c *FloatHistogramSTChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + +// Encoding returns the encoding type. +func (*FloatHistogramSTChunk) Encoding() Encoding { return EncFloatHistogramST } + +// Bytes returns the underlying byte slice of the chunk. +func (c *FloatHistogramSTChunk) Bytes() []byte { + return c.b.bytes() +} + +// NumSamples returns the number of samples in the chunk. +func (c *FloatHistogramSTChunk) NumSamples() int { + return int(binary.BigEndian.Uint16(c.b.bytes())) +} + +// GetCounterResetHeader returns the counter reset header from the flags byte. +func (c *FloatHistogramSTChunk) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(c.b.bytes()[histogramFlagPos] & CounterResetHeaderMask) +} + +// Compact implements the Chunk interface. +func (c *FloatHistogramSTChunk) Compact() { + if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { + buf := make([]byte, l) + copy(buf, c.b.stream) + c.b.stream = buf + } +} + +// Appender implements the Chunk interface. +func (c *FloatHistogramSTChunk) Appender() (Appender, error) { + if len(c.b.stream) == histogramSTHeaderSize { + return &FloatHistogramSTAppender{ + FloatHistogramAppender: FloatHistogramAppender{ + b: &c.b, + t: math.MinInt64, + sum: xorValue{leading: 0xff}, + cnt: xorValue{leading: 0xff}, + zCnt: xorValue{leading: 0xff}, + }, + }, nil + } + + it := c.iterator(nil) + + for it.Next() == ValFloatHistogram { + } + if err := it.Err(); err != nil { + return nil, err + } + + // Set the bit position for continuing writes. + c.b.count = it.br.valid + + pBuckets := make([]xorValue, len(it.pBuckets)) + for i := 0; i < len(it.pBuckets); i++ { + pBuckets[i] = xorValue{ + value: it.pBuckets[i], + leading: it.pBucketsLeading[i], + trailing: it.pBucketsTrailing[i], + } + } + nBuckets := make([]xorValue, len(it.nBuckets)) + for i := 0; i < len(it.nBuckets); i++ { + nBuckets[i] = xorValue{ + value: it.nBuckets[i], + leading: it.nBucketsLeading[i], + trailing: it.nBucketsTrailing[i], + } + } + + a := &FloatHistogramSTAppender{ + FloatHistogramAppender: FloatHistogramAppender{ + b: &c.b, + + schema: it.schema, + zThreshold: it.zThreshold, + pSpans: it.pSpans, + nSpans: it.nSpans, + customValues: it.customValues, + t: it.t, + tDelta: it.tDelta, + cnt: it.cnt, + zCnt: it.zCnt, + pBuckets: pBuckets, + nBuckets: nBuckets, + sum: it.sum, + }, + st: it.st, + stDiff: it.stDiff, + firstSTKnown: it.firstSTKnown, + firstSTChangeOn: uint16(it.firstSTChangeOn), + } + return a, nil +} + +func newFloatHistogramSTIterator(b []byte) *floatHistogramSTIterator { + it := &floatHistogramSTIterator{ + floatHistogramIterator: floatHistogramIterator{ + br: newBReader(b[histogramSTHeaderSize:]), + numTotal: binary.BigEndian.Uint16(b), + t: math.MinInt64, + }, + } + it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask) + it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:]) + return it +} + +func (c *FloatHistogramSTChunk) iterator(it Iterator) *floatHistogramSTIterator { + if fhIter, ok := it.(*floatHistogramSTIterator); ok { + fhIter.Reset(c.b.bytes()) + return fhIter + } + return newFloatHistogramSTIterator(c.b.bytes()) +} + +// Iterator implements the Chunk interface. +func (c *FloatHistogramSTChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) +} + +// FloatHistogramSTAppender is an Appender for float histogram samples with start timestamp support. +// It embeds FloatHistogramAppender and adds ST encoding after each sample. +type FloatHistogramSTAppender struct { + FloatHistogramAppender + + st int64 + stDiff int64 + firstSTChangeOn uint16 + firstSTKnown bool +} + +// encodeST encodes the start timestamp for the current sample. +// It must be called after appendFloatHistogram() which increments the sample count. +// prevT is the timestamp of the previous sample (before appendFloatHistogram updated a.t). +// For sample 0, prevT is unused. +func (a *FloatHistogramSTAppender) encodeST(prevT, st int64) { + num := binary.BigEndian.Uint16(a.b.bytes()) + + switch { + case num == 1: // First sample (count was just incremented from 0). + if st != 0 { + buf := make([]byte, binary.MaxVarintLen64) + for _, b := range buf[:binary.PutVarint(buf, a.t-st)] { + a.b.writeByte(b) + } + a.firstSTKnown = true + writeHeaderFirstSTKnown(a.b.bytes()[histogramSTHeaderSize-1:]) + } + case num == 2: // Second sample. + if st != a.st { + stDiff := prevT - st + a.firstSTChangeOn = 1 + writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], 1) + putVarbitInt(a.b, stDiff) + a.stDiff = stDiff + } + default: // Sample N >= 2. + // Fast path: no ST data to write. + if st == 0 && num-1 != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown { + break + } + if a.firstSTChangeOn == 0 { + if st != a.st || num-1 == maxFirstSTChangeOn { + stDiff := prevT - st + a.firstSTChangeOn = num - 1 + writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], num-1) + putVarbitInt(a.b, stDiff) + a.stDiff = stDiff + } + } else { + stDiff := prevT - st + putVarbitInt(a.b, stDiff-a.stDiff) + a.stDiff = stDiff + } + } + a.st = st +} + +// appendFloatHistogramST encodes a float histogram sample with start timestamp. +func (a *FloatHistogramSTAppender) appendFloatHistogramST(st, t int64, fh *histogram.FloatHistogram) { + prevT := a.t + a.appendFloatHistogram(t, fh) + a.encodeST(prevT, st) +} + +// Append implements Appender. This implementation panics because normal float +// samples must never be appended to a float histogram chunk. +func (*FloatHistogramSTAppender) Append(int64, int64, float64) { + panic("appended a float sample to a float histogram chunk") +} + +// AppendHistogram implements Appender. This implementation panics because integer +// histogram samples must never be appended to a float histogram chunk. +func (*FloatHistogramSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) { + panic("appended a histogram sample to a float histogram chunk") +} + +// AppendFloatHistogram implements Appender for FloatHistogramSTAppender. +func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev *FloatHistogramAppender, st, t int64, fh *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) { + if a.NumSamples() == 0 { + a.appendFloatHistogramST(st, t, fh) + if fh.CounterResetHint == histogram.GaugeType { + a.setCounterResetHeader(GaugeType) + return nil, false, a, nil + } + + switch { + case fh.CounterResetHint == histogram.CounterReset: + a.setCounterResetHeader(CounterReset) + case prev != nil: + _, _, _, _, _, counterReset := prev.appendable(fh) + if counterReset { + a.setCounterResetHeader(CounterReset) + } else { + a.setCounterResetHeader(NotCounterReset) + } + } + return nil, false, a, nil + } + + // Adding counter-like histogram. + if fh.CounterResetHint != histogram.GaugeType { + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(fh) + if !okToAppend || counterReset { + if appendOnly { + if counterReset { + return nil, false, a, errors.New("float histogram counter reset") + } + return nil, false, a, errors.New("float histogram schema change") + } + newChunk := NewFloatHistogramSTChunk() + app, err := newChunk.Appender() + if err != nil { + panic(err) + } + happ := app.(*FloatHistogramSTAppender) + if counterReset { + happ.setCounterResetHeader(CounterReset) + } + happ.appendFloatHistogramST(st, t, fh) + return newChunk, false, app, nil + } + if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 { + if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 { + fh.PositiveSpans = make([]histogram.Span, len(a.pSpans)) + copy(fh.PositiveSpans, a.pSpans) + fh.NegativeSpans = make([]histogram.Span, len(a.nSpans)) + copy(fh.NegativeSpans, a.nSpans) + } else { + fh.PositiveSpans = adjustForInserts(fh.PositiveSpans, pBackwardInserts) + fh.NegativeSpans = adjustForInserts(fh.NegativeSpans, nBackwardInserts) + } + a.recodeHistogram(fh, pBackwardInserts, nBackwardInserts) + } + if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { + if appendOnly { + return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) + } + chk, app := a.recodeST( + pForwardInserts, nForwardInserts, + fh.PositiveSpans, fh.NegativeSpans, + ) + app.(*FloatHistogramSTAppender).appendFloatHistogramST(st, t, fh) + return chk, true, app, nil + } + a.appendFloatHistogramST(st, t, fh) + return nil, false, a, nil + } + + // Adding gauge histogram. + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend := a.appendableGauge(fh) + if !okToAppend { + if appendOnly { + return nil, false, a, errors.New("float gauge histogram schema change") + } + newChunk := NewFloatHistogramSTChunk() + app, err := newChunk.Appender() + if err != nil { + panic(err) + } + happ := app.(*FloatHistogramSTAppender) + happ.setCounterResetHeader(GaugeType) + happ.appendFloatHistogramST(st, t, fh) + return newChunk, false, app, nil + } + + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { + if appendOnly { + return nil, false, a, fmt.Errorf("float gauge histogram layout change with %d positive and %d negative backwards inserts", len(pBackwardInserts), len(nBackwardInserts)) + } + fh.PositiveSpans = pMergedSpans + fh.NegativeSpans = nMergedSpans + a.recodeHistogram(fh, pBackwardInserts, nBackwardInserts) + } + + if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { + if appendOnly { + return nil, false, a, fmt.Errorf("float gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) + } + chk, app := a.recodeST( + pForwardInserts, nForwardInserts, + fh.PositiveSpans, fh.NegativeSpans, + ) + app.(*FloatHistogramSTAppender).appendFloatHistogramST(st, t, fh) + return chk, true, app, nil + } + + a.appendFloatHistogramST(st, t, fh) + return nil, false, a, nil +} + +// recodeST is like FloatHistogramAppender.recode but creates FloatHistogramSTChunk and preserves ST. +func (a *FloatHistogramSTAppender) recodeST( + positiveInserts, negativeInserts []Insert, + positiveSpans, negativeSpans []histogram.Span, +) (Chunk, Appender) { + byts := a.b.bytes() + it := newFloatHistogramSTIterator(byts) + hc := NewFloatHistogramSTChunk() + app, err := hc.Appender() + if err != nil { + panic(err) + } + happ := app.(*FloatHistogramSTAppender) + numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) + + for it.Next() == ValFloatHistogram { + tOld, fhOld := it.AtFloatHistogram(nil) + stOld := it.AtST() + + var positiveBuckets, negativeBuckets []float64 + if numPositiveBuckets > 0 { + positiveBuckets = make([]float64, numPositiveBuckets) + } + if numNegativeBuckets > 0 { + negativeBuckets = make([]float64, numNegativeBuckets) + } + + fhOld.PositiveSpans, fhOld.NegativeSpans = positiveSpans, negativeSpans + if len(positiveInserts) > 0 { + fhOld.PositiveBuckets = insert(fhOld.PositiveBuckets, positiveBuckets, positiveInserts, false) + } + if len(negativeInserts) > 0 { + fhOld.NegativeBuckets = insert(fhOld.NegativeBuckets, negativeBuckets, negativeInserts, false) + } + happ.appendFloatHistogramST(stOld, tOld, fhOld) + } + + happ.setCounterResetHeader(CounterResetHeader(byts[histogramFlagPos] & CounterResetHeaderMask)) + return hc, app +} + +// floatHistogramSTIterator is an iterator for FloatHistogramSTChunk that decodes ST after each sample. +type floatHistogramSTIterator struct { + floatHistogramIterator + + // ST fields. + st int64 + stDiff int64 + firstSTKnown bool + firstSTChangeOn uint8 +} + +// AtST returns the start timestamp for the current sample. +func (it *floatHistogramSTIterator) AtST() int64 { + return it.st +} + +// Reset resets the iterator for reuse. +func (it *floatHistogramSTIterator) Reset(b []byte) { + it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:]) + it.st = 0 + it.stDiff = 0 + + // Reset the embedded floatHistogramIterator but with the correct header offset. + it.br = newBReader(b[histogramSTHeaderSize:]) + it.numTotal = binary.BigEndian.Uint16(b) + it.numRead = 0 + + it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask) + + it.t, it.tDelta = 0, 0 + it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{} + + if it.atFloatHistogramCalled { + it.atFloatHistogramCalled = false + it.pBuckets, it.nBuckets = nil, nil + it.pSpans, it.nSpans = nil, nil + it.customValues = nil + } else { + it.pBuckets, it.nBuckets = it.pBuckets[:0], it.nBuckets[:0] + } + it.pBucketsLeading, it.pBucketsTrailing = it.pBucketsLeading[:0], it.pBucketsTrailing[:0] + it.nBucketsLeading, it.nBucketsTrailing = it.nBucketsLeading[:0], it.nBucketsTrailing[:0] + + it.err = nil +} + +// Next advances the iterator by one sample. +// It calls the embedded floatHistogramIterator.Next() to decode the float histogram sample, +// then decodes the ST data that follows in the bitstream. +func (it *floatHistogramSTIterator) Next() ValueType { + prevT := it.t + vt := it.floatHistogramIterator.Next() + if vt == ValNone { + return ValNone + } + if err := it.decodeST(it.numRead, prevT); err != nil { + it.err = err + return ValNone + } + return vt +} + +// Seek advances the iterator forward to the first sample with timestamp >= t. +func (it *floatHistogramSTIterator) Seek(t int64) ValueType { + if it.err != nil { + return ValNone + } + for t > it.t || it.numRead == 0 { + if it.Next() == ValNone { + return ValNone + } + } + return ValFloatHistogram +} + +// decodeST decodes the start timestamp for the current sample. +// numRead is the number of samples read so far (already incremented by floatHistogramIterator.Next()). +// prevT is the timestamp of the previous sample (before floatHistogramIterator.Next() updated it.t). +func (it *floatHistogramSTIterator) decodeST(numRead uint16, prevT int64) error { + switch { + case numRead == 1: // After sample 0. + if it.firstSTKnown { + stDiff, err := it.br.readVarint() + if err != nil { + return err + } + it.stDiff = stDiff + it.st = it.t - stDiff + } + case numRead == 2: // After sample 1. + if it.firstSTChangeOn == 1 { + sdod, err := readVarbitInt(&it.br) + if err != nil { + return err + } + it.stDiff = sdod + it.st = prevT - sdod + } + default: // After sample N >= 2. + if it.firstSTChangeOn > 0 && numRead-1 >= uint16(it.firstSTChangeOn) { + sdod, err := readVarbitInt(&it.br) + if err != nil { + return err + } + if numRead-1 == uint16(it.firstSTChangeOn) { + it.stDiff = sdod + } else { + it.stDiff += sdod + } + it.st = prevT - it.stDiff + } + } + return nil +} diff --git a/tsdb/chunkenc/float_histogram_st_test.go b/tsdb/chunkenc/float_histogram_st_test.go new file mode 100644 index 0000000000..31d99d69ee --- /dev/null +++ b/tsdb/chunkenc/float_histogram_st_test.go @@ -0,0 +1,333 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/tsdb/tsdbutil" +) + +type floatHistogramSTSample struct { + st, t int64 + fh *histogram.FloatHistogram +} + +func BenchmarkFloatHistogramSTWrite(b *testing.B) { + const n = 120 + fhs := tsdbutil.GenerateTestFloatHistograms(n) + + b.ReportAllocs() + + for b.Loop() { + c := NewFloatHistogramSTChunk() + app, _ := c.Appender() + for i, fh := range fhs { + _, _, app, _ = app.AppendFloatHistogram(nil, 500, int64(i)*15000, fh, false) + } + } +} + +func BenchmarkFloatHistogramSTRead(b *testing.B) { + const n = 120 + fhs := tsdbutil.GenerateTestFloatHistograms(n) + + c := NewFloatHistogramSTChunk() + app, err := c.Appender() + require.NoError(b, err) + for i, fh := range fhs { + _, _, app, err = app.AppendFloatHistogram(nil, 500, int64(i)*15000, fh, false) + require.NoError(b, err) + } + + b.ReportAllocs() + + var it Iterator + for b.Loop() { + it = c.Iterator(it) + for it.Next() != ValNone { + } + } +} + +// requireFloatHistogramSTSamples appends the given float histogram samples to a +// new FloatHistogramSTChunk, then verifies all samples round-trip correctly +// through the iterator. +func requireFloatHistogramSTSamples(t *testing.T, samples []floatHistogramSTSample) { + t.Helper() + + c := NewFloatHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + for _, s := range samples { + _, _, app, err = app.AppendFloatHistogram(nil, s.st, s.t, s.fh, false) + require.NoError(t, err) + } + + require.Equal(t, len(samples), c.NumSamples()) + + it := c.Iterator(nil) + for i, s := range samples { + require.Equal(t, ValFloatHistogram, it.Next(), "sample %d", i) + require.Equal(t, s.t, it.AtT(), "sample %d: timestamp", i) + require.Equal(t, s.st, it.AtST(), "sample %d: start timestamp", i) + } + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) +} + +func TestFloatHistogramSTChunkST(t *testing.T) { + testChunkSTHandling(t, ValFloatHistogram, func() Chunk { return NewFloatHistogramSTChunk() }) +} + +func TestFloatHistogramSTBasic(t *testing.T) { + hs := tsdbutil.GenerateTestFloatHistograms(5) + requireFloatHistogramSTSamples(t, []floatHistogramSTSample{ + {st: 0, t: 1000, fh: hs[0]}, + {st: 0, t: 2000, fh: hs[1]}, + {st: 0, t: 3000, fh: hs[2]}, + {st: 0, t: 4000, fh: hs[3]}, + {st: 0, t: 5000, fh: hs[4]}, + }) +} + +func TestFloatHistogramSTChunkAppendAndIterate(t *testing.T) { + fhs := tsdbutil.GenerateTestFloatHistograms(5) + requireFloatHistogramSTSamples(t, []floatHistogramSTSample{ + {st: 100, t: 1000, fh: fhs[0]}, + {st: 100, t: 2000, fh: fhs[1]}, + {st: 200, t: 3000, fh: fhs[2]}, + {st: 200, t: 4000, fh: fhs[3]}, + {st: 300, t: 5000, fh: fhs[4]}, + }) +} + +func TestFloatHistogramSTChunkCounterReset(t *testing.T) { + c := NewFloatHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + fh1 := &histogram.FloatHistogram{ + Count: 10, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []float64{6, 3}, + } + + _, _, app, err = app.AppendFloatHistogram(nil, 100, 1000, fh1, false) + require.NoError(t, err) + + fh2 := fh1.Copy() + fh2.Count = 3 + fh2.Sum = 5.0 + fh2.PositiveBuckets = []float64{2, 1} + fh2.CounterResetHint = histogram.CounterReset + + // Ensure counter reset produces new chunk. + newChunk, recoded, newApp, err := app.AppendFloatHistogram(nil, 200, 2000, fh2, false) + require.NoError(t, err) + require.NotNil(t, newChunk) + require.False(t, recoded) + + stChunk, ok := newChunk.(*FloatHistogramSTChunk) + require.True(t, ok) + require.Equal(t, CounterReset, stChunk.GetCounterResetHeader()) + require.Equal(t, 1, stChunk.NumSamples()) + + // Verify ST is preserved in the new chunk. + it := stChunk.Iterator(nil) + require.Equal(t, ValFloatHistogram, it.Next()) + require.Equal(t, int64(2000), it.AtT()) + require.Equal(t, int64(200), it.AtST()) + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + + fh3 := fh2.Copy() + fh3.CounterResetHint = histogram.UnknownCounterReset + fh3.Count = 8 + fh3.Sum = 10.0 + fh3.PositiveBuckets = []float64{4, 2} + _, _, _, err = newApp.AppendFloatHistogram(nil, 300, 3000, fh3, false) + require.NoError(t, err) + require.Equal(t, 2, stChunk.NumSamples()) +} + +func TestFloatHistogramSTChunkRecode(t *testing.T) { + c := NewFloatHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + h1 := &histogram.Histogram{ + Count: 27, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, + NegativeSpans: []histogram.Span{{Offset: 1, Length: 1}}, + NegativeBuckets: []int64{1}, + } + fh1 := h1.ToFloat(nil) + + _, _, app, err = app.AppendFloatHistogram(nil, 100, 1000, fh1, false) + require.NoError(t, err) + + // Force recode with expanded bucket layout + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.NegativeSpans = []histogram.Span{{Offset: 0, Length: 2}} + h2.Count = 35 + h2.ZeroCount++ + h2.Sum = 30 + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} + h2.NegativeBuckets = []int64{2, -1} + fh2 := h2.ToFloat(nil) + + // Ensure recode produces a new chunk. + newChunk, recoded, newApp, err := app.AppendFloatHistogram(nil, 200, 2000, fh2, false) + require.NoError(t, err) + require.NotNil(t, newChunk) + require.True(t, recoded) + + stChunk, ok := newChunk.(*FloatHistogramSTChunk) + require.True(t, ok) + require.Equal(t, 2, stChunk.NumSamples()) + + it := stChunk.Iterator(nil) + require.Equal(t, ValFloatHistogram, it.Next()) + require.Equal(t, int64(1000), it.AtT()) + require.Equal(t, int64(100), it.AtST()) + + require.Equal(t, ValFloatHistogram, it.Next()) + require.Equal(t, int64(2000), it.AtT()) + require.Equal(t, int64(200), it.AtST()) + + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + + fh3 := fh2.Copy() + fh3.Count = 40 + fh3.Sum = 35 + for i := range fh3.PositiveBuckets { + fh3.PositiveBuckets[i]++ + } + _, _, _, err = newApp.AppendFloatHistogram(nil, 300, 3000, fh3, false) + require.NoError(t, err) + require.Equal(t, 3, stChunk.NumSamples()) +} + +func TestFloatHistogramST_MoreThan127Samples(t *testing.T) { + c := NewFloatHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + fh := &histogram.FloatHistogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []float64{6, 3}, + } + + const numSamples = maxFirstSTChangeOn + 3 // 130 + + for i := range int(numSamples) { + fhi := fh.Copy() + fhi.Count = float64(5 + i) + fhi.Sum = float64(18 + i) + _, _, app, err = app.AppendFloatHistogram(nil, 500, int64(1000+i*1000), fhi, false) + require.NoError(t, err) + } + require.Equal(t, int(numSamples), c.NumSamples()) + + // Verify all samples round-trip correctly. + it := c.Iterator(nil) + for i := range int(numSamples) { + require.Equal(t, ValFloatHistogram, it.Next()) + require.Equal(t, int64(1000+i*1000), it.AtT()) + require.Equal(t, int64(500), it.AtST()) + } + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + + c2 := NewFloatHistogramSTChunk() + app2, err := c2.Appender() + require.NoError(t, err) + + for i := range int(maxFirstSTChangeOn + 1) { + fhi := fh.Copy() + fhi.Count = float64(5 + i) + fhi.Sum = float64(18 + i) + _, _, app2, err = app2.AppendFloatHistogram(nil, 0, int64(1000+i*1000), fhi, false) + require.NoError(t, err) + } + + for i := range 3 { + fhi := fh.Copy() + fhi.Count = float64(200 + i) + fhi.Sum = float64(200 + i) + _, _, app2, err = app2.AppendFloatHistogram(nil, 100, int64(200000+i*1000), fhi, false) + require.NoError(t, err) + } + + it2 := c2.Iterator(nil) + for i := range int(maxFirstSTChangeOn + 1) { + require.Equal(t, ValFloatHistogram, it2.Next()) + require.Equal(t, int64(1000+i*1000), it2.AtT()) + require.Equal(t, int64(0), it2.AtST()) + } + for i := range 3 { + require.Equal(t, ValFloatHistogram, it2.Next()) + require.Equal(t, int64(200000+i*1000), it2.AtT()) + require.Equal(t, int64(100), it2.AtST()) + } + require.Equal(t, ValNone, it2.Next()) + require.NoError(t, it2.Err()) +} + +func TestFloatHistogramSTChunkMixedST(t *testing.T) { + fhs := tsdbutil.GenerateTestFloatHistograms(5) + requireFloatHistogramSTSamples(t, []floatHistogramSTSample{ + {st: 0, t: 1000, fh: fhs[0]}, + {st: 0, t: 2000, fh: fhs[1]}, + {st: 100, t: 3000, fh: fhs[2]}, + {st: 0, t: 4000, fh: fhs[3]}, + {st: 200, t: 5000, fh: fhs[4]}, + }) +} diff --git a/tsdb/chunkenc/histogram_st.go b/tsdb/chunkenc/histogram_st.go new file mode 100644 index 0000000000..60f98465c4 --- /dev/null +++ b/tsdb/chunkenc/histogram_st.go @@ -0,0 +1,502 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + + "github.com/prometheus/prometheus/model/histogram" +) + +const histogramSTHeaderSize = 4 + +// HistogramSTChunk is a chunk for histogram samples with start timestamp (ST) support. +// It extends the HistogramChunk format with a 1-byte ST header after the flags byte. +// +// Header layout (4 bytes): +// +// bytes 0-1: sample count (big-endian uint16) +// byte 2: flags (bits 7-6 = counter reset header) +// byte 3: ST header (bit 7 = firstSTKnown, bits 6-0 = firstSTChangeOn) +type HistogramSTChunk struct { + b bstream +} + +// NewHistogramSTChunk returns a new empty HistogramSTChunk. +func NewHistogramSTChunk() *HistogramSTChunk { + b := make([]byte, histogramSTHeaderSize, chunkAllocationSize) + return &HistogramSTChunk{b: bstream{stream: b, count: 0}} +} + +func (c *HistogramSTChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + +// Encoding returns the encoding type. +func (*HistogramSTChunk) Encoding() Encoding { return EncHistogramST } + +// Bytes returns the underlying byte slice of the chunk. +func (c *HistogramSTChunk) Bytes() []byte { + return c.b.bytes() +} + +// NumSamples returns the number of samples in the chunk. +func (c *HistogramSTChunk) NumSamples() int { + return int(binary.BigEndian.Uint16(c.b.bytes())) +} + +// GetCounterResetHeader returns the counter reset header from the flags byte. +func (c *HistogramSTChunk) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(c.b.bytes()[histogramFlagPos] & CounterResetHeaderMask) +} + +// Compact implements the Chunk interface. +func (c *HistogramSTChunk) Compact() { + if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { + buf := make([]byte, l) + copy(buf, c.b.stream) + c.b.stream = buf + } +} + +// Appender implements the Chunk interface. +func (c *HistogramSTChunk) Appender() (Appender, error) { + if len(c.b.stream) == histogramSTHeaderSize { + return &HistogramSTAppender{ + HistogramAppender: HistogramAppender{ + b: &c.b, + t: math.MinInt64, + leading: 0xff}, + }, nil + } + + it := c.iterator(nil) + + for it.Next() == ValHistogram { + } + if err := it.Err(); err != nil { + return nil, err + } + + // Set the bit position for continuing writes. The iterator's reader tracks + // how many bits remain unread in the last byte. + c.b.count = it.br.valid + + a := &HistogramSTAppender{ + HistogramAppender: HistogramAppender{ + b: &c.b, + + schema: it.schema, + zThreshold: it.zThreshold, + pSpans: it.pSpans, + nSpans: it.nSpans, + customValues: it.customValues, + t: it.t, + cnt: it.cnt, + zCnt: it.zCnt, + tDelta: it.tDelta, + cntDelta: it.cntDelta, + zCntDelta: it.zCntDelta, + pBuckets: it.pBuckets, + nBuckets: it.nBuckets, + pBucketsDelta: it.pBucketsDelta, + nBucketsDelta: it.nBucketsDelta, + + sum: it.sum, + leading: it.leading, + trailing: it.trailing, + }, + st: it.st, + stDiff: it.stDiff, + firstSTKnown: it.firstSTKnown, + firstSTChangeOn: uint16(it.firstSTChangeOn), + } + return a, nil +} + +func newHistogramSTIterator(b []byte) *histogramSTIterator { + it := &histogramSTIterator{ + histogramIterator: histogramIterator{ + br: newBReader(b[histogramSTHeaderSize:]), + numTotal: binary.BigEndian.Uint16(b), + t: math.MinInt64, + }, + } + it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask) + it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:]) + return it +} + +func (c *HistogramSTChunk) iterator(it Iterator) *histogramSTIterator { + if histIter, ok := it.(*histogramSTIterator); ok { + histIter.Reset(c.b.bytes()) + return histIter + } + return newHistogramSTIterator(c.b.bytes()) +} + +// Iterator implements the Chunk interface. +func (c *HistogramSTChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) +} + +// HistogramSTAppender is an Appender for histogram samples with start timestamp support. +// It embeds HistogramAppender and adds ST encoding after each sample. +type HistogramSTAppender struct { + HistogramAppender + + st int64 + stDiff int64 + firstSTChangeOn uint16 + firstSTKnown bool +} + +// encodeST encodes the start timestamp for the current sample. +// It must be called after appendHistogram() which increments the sample count. +// prevT is the timestamp of the previous sample (before appendHistogram updated a.t). +// For sample 0, prevT is unused. +func (a *HistogramSTAppender) encodeST(prevT, st int64) { + num := binary.BigEndian.Uint16(a.b.bytes()) + + switch { + case num == 1: // First sample (count was just incremented from 0). + if st != 0 { + buf := make([]byte, binary.MaxVarintLen64) + for _, b := range buf[:binary.PutVarint(buf, a.t-st)] { + a.b.writeByte(b) + } + a.firstSTKnown = true + writeHeaderFirstSTKnown(a.b.bytes()[histogramSTHeaderSize-1:]) + } + case num == 2: // Second sample. + if st != a.st { + stDiff := prevT - st + a.firstSTChangeOn = 1 + writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], 1) + putVarbitInt(a.b, stDiff) + a.stDiff = stDiff + } + default: // Sample N >= 2. + // Fast path: no ST data to write. + if st == 0 && num-1 != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown { + break + } + if a.firstSTChangeOn == 0 { + if st != a.st || num-1 == maxFirstSTChangeOn { + stDiff := prevT - st + a.firstSTChangeOn = num - 1 + writeHeaderFirstSTChangeOn(a.b.bytes()[histogramSTHeaderSize-1:], num-1) + putVarbitInt(a.b, stDiff) + a.stDiff = stDiff + } + } else { + stDiff := prevT - st + putVarbitInt(a.b, stDiff-a.stDiff) + a.stDiff = stDiff + } + } + a.st = st +} + +// appendHistogramST encodes a histogram sample with start timestamp. +func (a *HistogramSTAppender) appendHistogramST(st, t int64, h *histogram.Histogram) { + prevT := a.t + a.appendHistogram(t, h) + a.encodeST(prevT, st) +} + +func (*HistogramSTAppender) Append(int64, int64, float64) { + panic("appended a float sample to a histogram chunk") +} + +func (*HistogramSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) { + panic("appended a float histogram sample to a histogram chunk") +} + +// AppendHistogram implements Appender for HistogramSTAppender. +func (a *HistogramSTAppender) AppendHistogram(prev *HistogramAppender, st, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) { + if a.NumSamples() == 0 { + a.appendHistogramST(st, t, h) + if h.CounterResetHint == histogram.GaugeType { + a.setCounterResetHeader(GaugeType) + return nil, false, a, nil + } + + switch { + case h.CounterResetHint == histogram.CounterReset: + a.setCounterResetHeader(CounterReset) + case prev != nil: + _, _, _, _, _, counterReset := prev.appendable(h) + a.setCounterResetHeader(counterReset) + } + return nil, false, a, nil + } + + // Adding counter-like histogram. + if h.CounterResetHint != histogram.GaugeType { + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterResetHint := a.appendable(h) + if !okToAppend || counterResetHint != NotCounterReset { + if appendOnly { + if counterResetHint == CounterReset { + return nil, false, a, errors.New("histogram counter reset") + } + return nil, false, a, errors.New("histogram schema change") + } + newChunk := NewHistogramSTChunk() + app, err := newChunk.Appender() + if err != nil { + panic(err) + } + happ := app.(*HistogramSTAppender) + happ.setCounterResetHeader(counterResetHint) + happ.appendHistogramST(st, t, h) + return newChunk, false, app, nil + } + if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 { + if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 { + h.PositiveSpans = make([]histogram.Span, len(a.pSpans)) + copy(h.PositiveSpans, a.pSpans) + h.NegativeSpans = make([]histogram.Span, len(a.nSpans)) + copy(h.NegativeSpans, a.nSpans) + } else { + h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts) + h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts) + } + a.recodeHistogram(h, pBackwardInserts, nBackwardInserts) + } + if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { + if appendOnly { + return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) + } + chk, app := a.recodeST( + pForwardInserts, nForwardInserts, + h.PositiveSpans, h.NegativeSpans, + ) + app.(*HistogramSTAppender).appendHistogramST(st, t, h) + return chk, true, app, nil + } + a.appendHistogramST(st, t, h) + return nil, false, a, nil + } + + // Adding gauge histogram. + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend := a.appendableGauge(h) + if !okToAppend { + if appendOnly { + return nil, false, a, errors.New("gauge histogram schema change") + } + newChunk := NewHistogramSTChunk() + app, err := newChunk.Appender() + if err != nil { + panic(err) + } + happ := app.(*HistogramSTAppender) + happ.setCounterResetHeader(GaugeType) + happ.appendHistogramST(st, t, h) + return newChunk, false, app, nil + } + + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { + if appendOnly { + return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative backwards inserts", len(pBackwardInserts), len(nBackwardInserts)) + } + h.PositiveSpans = pMergedSpans + h.NegativeSpans = nMergedSpans + a.recodeHistogram(h, pBackwardInserts, nBackwardInserts) + } + + if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { + if appendOnly { + return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) + } + chk, app := a.recodeST( + pForwardInserts, nForwardInserts, + h.PositiveSpans, h.NegativeSpans, + ) + app.(*HistogramSTAppender).appendHistogramST(st, t, h) + return chk, true, app, nil + } + + a.appendHistogramST(st, t, h) + return nil, false, a, nil +} + +// recodeST is like HistogramAppender.recode but creates HistogramSTChunk and preserves ST. +func (a *HistogramSTAppender) recodeST( + positiveInserts, negativeInserts []Insert, + positiveSpans, negativeSpans []histogram.Span, +) (Chunk, Appender) { + byts := a.b.bytes() + it := newHistogramSTIterator(byts) + hc := NewHistogramSTChunk() + app, err := hc.Appender() + if err != nil { + panic(err) + } + happ := app.(*HistogramSTAppender) + numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) + + for it.Next() == ValHistogram { + tOld, hOld := it.AtHistogram(nil) + stOld := it.AtST() + + var positiveBuckets, negativeBuckets []int64 + if numPositiveBuckets > 0 { + positiveBuckets = make([]int64, numPositiveBuckets) + } + if numNegativeBuckets > 0 { + negativeBuckets = make([]int64, numNegativeBuckets) + } + + hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans + if len(positiveInserts) > 0 { + hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, true) + } + if len(negativeInserts) > 0 { + hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true) + } + happ.appendHistogramST(stOld, tOld, hOld) + } + + happ.setCounterResetHeader(CounterResetHeader(byts[histogramFlagPos] & CounterResetHeaderMask)) + return hc, app +} + +// histogramSTIterator is an iterator for HistogramSTChunk that decodes ST after each sample. +type histogramSTIterator struct { + histogramIterator + + // ST fields. + st int64 + stDiff int64 + firstSTKnown bool + firstSTChangeOn uint8 +} + +func (it *histogramSTIterator) AtST() int64 { + return it.st +} + +func (it *histogramSTIterator) Reset(b []byte) { + it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[histogramSTHeaderSize-1:]) + it.st = 0 + it.stDiff = 0 + + // Reset the embedded histogramIterator but with the correct header offset. + it.br = newBReader(b[histogramSTHeaderSize:]) + it.numTotal = binary.BigEndian.Uint16(b) + it.numRead = 0 + + it.counterResetHeader = CounterResetHeader(b[histogramFlagPos] & CounterResetHeaderMask) + + it.t, it.cnt, it.zCnt = 0, 0, 0 + it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0 + + if it.atHistogramCalled { + it.atHistogramCalled = false + it.pBuckets, it.nBuckets = nil, nil + it.pSpans, it.nSpans = nil, nil + } else { + it.pBuckets = it.pBuckets[:0] + it.nBuckets = it.nBuckets[:0] + } + if it.atFloatHistogramCalled { + it.atFloatHistogramCalled = false + it.pFloatBuckets, it.nFloatBuckets = nil, nil + } else { + it.pFloatBuckets = it.pFloatBuckets[:0] + it.nFloatBuckets = it.nFloatBuckets[:0] + } + + it.pBucketsDelta = it.pBucketsDelta[:0] + it.nBucketsDelta = it.nBucketsDelta[:0] + + it.sum = 0 + it.leading = 0 + it.trailing = 0 + it.err = nil + it.customValues = nil +} + +// Next advances the iterator by one sample. +// It calls the embedded histogramIterator.Next() to decode the histogram sample, +// then decodes the ST data that follows in the bitstream. +func (it *histogramSTIterator) Next() ValueType { + prevT := it.t + vt := it.histogramIterator.Next() + if vt == ValNone { + return ValNone + } + if err := it.decodeST(it.numRead, prevT); err != nil { + it.err = err + return ValNone + } + return vt +} + +// Seek advances the iterator forward to the first sample with timestamp >= t. +func (it *histogramSTIterator) Seek(t int64) ValueType { + if it.err != nil { + return ValNone + } + for t > it.t || it.numRead == 0 { + if it.Next() == ValNone { + return ValNone + } + } + return ValHistogram +} + +// decodeST decodes the start timestamp for the current sample. +// numRead is the number of samples read so far (already incremented by histogramIterator.Next()). +// prevT is the timestamp of the previous sample (before histogramIterator.Next() updated it.t). +func (it *histogramSTIterator) decodeST(numRead uint16, prevT int64) error { + switch { + case numRead == 1: // After sample 0. + if it.firstSTKnown { + stDiff, err := it.br.readVarint() + if err != nil { + return err + } + it.stDiff = stDiff + it.st = it.t - stDiff + } + case numRead == 2: // After sample 1. + if it.firstSTChangeOn == 1 { + sdod, err := readVarbitInt(&it.br) + if err != nil { + return err + } + it.stDiff = sdod + it.st = prevT - sdod + } + default: // After sample N >= 2. + if it.firstSTChangeOn > 0 && numRead-1 >= uint16(it.firstSTChangeOn) { + sdod, err := readVarbitInt(&it.br) + if err != nil { + return err + } + if numRead-1 == uint16(it.firstSTChangeOn) { + it.stDiff = sdod + } else { + it.stDiff += sdod + } + it.st = prevT - it.stDiff + } + } + return nil +} diff --git a/tsdb/chunkenc/histogram_st_test.go b/tsdb/chunkenc/histogram_st_test.go new file mode 100644 index 0000000000..1e2cf5bf1f --- /dev/null +++ b/tsdb/chunkenc/histogram_st_test.go @@ -0,0 +1,330 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/tsdb/tsdbutil" +) + +type histogramSTSample struct { + st, t int64 + h *histogram.Histogram +} + +func BenchmarkHistogramSTWrite(b *testing.B) { + const n = 120 + hs := tsdbutil.GenerateTestHistograms(n) + + b.ReportAllocs() + + for b.Loop() { + c := NewHistogramSTChunk() + app, _ := c.Appender() + for i, h := range hs { + _, _, app, _ = app.AppendHistogram(nil, 500, int64(i)*15000, h, false) + } + } +} + +func BenchmarkHistogramSTRead(b *testing.B) { + const n = 120 + hs := tsdbutil.GenerateTestHistograms(n) + + c := NewHistogramSTChunk() + app, err := c.Appender() + require.NoError(b, err) + for i, h := range hs { + _, _, app, err = app.AppendHistogram(nil, 500, int64(i)*15000, h, false) + require.NoError(b, err) + } + + b.ReportAllocs() + + var it Iterator + for b.Loop() { + it = c.Iterator(it) + for it.Next() != ValNone { + } + } +} + +// requireHistogramSTSamples appends the given histogram samples to a new +// HistogramSTChunk, then verifies all samples round-trip correctly through +// the iterator. +func requireHistogramSTSamples(t *testing.T, samples []histogramSTSample) { + t.Helper() + + c := NewHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + for _, s := range samples { + _, _, app, err = app.AppendHistogram(nil, s.st, s.t, s.h, false) + require.NoError(t, err) + } + + require.Equal(t, len(samples), c.NumSamples()) + + it := c.Iterator(nil) + for _, s := range samples { + require.Equal(t, ValHistogram, it.Next()) + require.Equal(t, s.t, it.AtT()) + require.Equal(t, s.st, it.AtST()) + } + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) +} + +func TestHistogramSTChunkST(t *testing.T) { + testChunkSTHandling(t, ValHistogram, func() Chunk { return NewHistogramSTChunk() }) +} + +func TestHistogramSTBasic(t *testing.T) { + hs := tsdbutil.GenerateTestHistograms(5) + requireHistogramSTSamples(t, []histogramSTSample{ + {st: 0, t: 1000, h: hs[0]}, + {st: 0, t: 2000, h: hs[1]}, + {st: 0, t: 3000, h: hs[2]}, + {st: 0, t: 4000, h: hs[3]}, + {st: 0, t: 5000, h: hs[4]}, + }) +} + +func TestHistogramSTChunkAppendAndIterate(t *testing.T) { + hs := tsdbutil.GenerateTestHistograms(5) + requireHistogramSTSamples(t, []histogramSTSample{ + {st: 100, t: 1000, h: hs[0]}, + {st: 100, t: 2000, h: hs[1]}, + {st: 200, t: 3000, h: hs[2]}, + {st: 200, t: 4000, h: hs[3]}, + {st: 300, t: 5000, h: hs[4]}, + }) +} + +func TestHistogramSTChunkCounterReset(t *testing.T) { + c := NewHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + h1 := &histogram.Histogram{ + Count: 10, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []int64{6, -3}, + } + + _, _, app, err = app.AppendHistogram(nil, 100, 1000, h1, false) + require.NoError(t, err) + + h2 := h1.Copy() + h2.Count = 3 + h2.Sum = 5.0 + h2.PositiveBuckets = []int64{2, -1} + h2.CounterResetHint = histogram.CounterReset + + newChunk, recoded, newApp, err := app.AppendHistogram(nil, 200, 2000, h2, false) + require.NoError(t, err) + require.NotNil(t, newChunk) + require.False(t, recoded) + + stChunk, ok := newChunk.(*HistogramSTChunk) + require.True(t, ok) + require.Equal(t, CounterReset, stChunk.GetCounterResetHeader()) + require.Equal(t, 1, stChunk.NumSamples()) + + // Verify ST is preserved in the new chunk. + it := stChunk.Iterator(nil) + require.Equal(t, ValHistogram, it.Next()) + require.Equal(t, int64(2000), it.AtT()) + require.Equal(t, int64(200), it.AtST()) + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + + h3 := h2.Copy() + h3.CounterResetHint = histogram.UnknownCounterReset + h3.Count = 8 + h3.Sum = 10.0 + h3.PositiveBuckets = []int64{4, -2} + _, _, _, err = newApp.AppendHistogram(nil, 300, 3000, h3, false) + require.NoError(t, err) + require.Equal(t, 2, stChunk.NumSamples()) +} + +func TestHistogramSTChunkRecode(t *testing.T) { + c := NewHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + h1 := &histogram.Histogram{ + Count: 27, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, + NegativeSpans: []histogram.Span{{Offset: 1, Length: 1}}, + NegativeBuckets: []int64{1}, + } + + _, _, app, err = app.AppendHistogram(nil, 100, 1000, h1, false) + require.NoError(t, err) + + // Force recode with expanded bucket layout. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.NegativeSpans = []histogram.Span{{Offset: 0, Length: 2}} + h2.Count = 35 + h2.ZeroCount++ + h2.Sum = 30 + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} + h2.NegativeBuckets = []int64{2, -1} + + // Ensure recode produced a new chunk + newChunk, recoded, newApp, err := app.AppendHistogram(nil, 200, 2000, h2, false) + require.NoError(t, err) + require.NotNil(t, newChunk) + require.True(t, recoded) + + stChunk, ok := newChunk.(*HistogramSTChunk) + require.True(t, ok) + require.Equal(t, 2, stChunk.NumSamples()) + + it := stChunk.Iterator(nil) + require.Equal(t, ValHistogram, it.Next()) + require.Equal(t, int64(1000), it.AtT()) + require.Equal(t, int64(100), it.AtST()) + + require.Equal(t, ValHistogram, it.Next()) + require.Equal(t, int64(2000), it.AtT()) + require.Equal(t, int64(200), it.AtST()) + + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + + h3 := h2.Copy() + h3.Count = 40 + h3.Sum = 35 + for i := range h3.PositiveBuckets { + h3.PositiveBuckets[i]++ + } + _, _, _, err = newApp.AppendHistogram(nil, 300, 3000, h3, false) + require.NoError(t, err) + require.Equal(t, 3, stChunk.NumSamples()) +} + +func TestHistogramST_MoreThan127Samples(t *testing.T) { + c := NewHistogramSTChunk() + app, err := c.Appender() + require.NoError(t, err) + + h := &histogram.Histogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []int64{6, -3}, + } + + const numSamples = maxFirstSTChangeOn + 3 // 130 + + for i := range int(numSamples) { + hi := h.Copy() + hi.Count = uint64(5 + i) + hi.Sum = float64(18 + i) + _, _, app, err = app.AppendHistogram(nil, 500, int64(1000+i*1000), hi, false) + require.NoError(t, err) + } + require.Equal(t, int(numSamples), c.NumSamples()) + + it := c.Iterator(nil) + for i := range int(numSamples) { + require.Equal(t, ValHistogram, it.Next()) + require.Equal(t, int64(1000+i*1000), it.AtT()) + require.Equal(t, int64(500), it.AtST()) + } + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + + // Test ST changing after the boundary. + c2 := NewHistogramSTChunk() + app2, err := c2.Appender() + require.NoError(t, err) + + for i := range int(maxFirstSTChangeOn + 1) { + hi := h.Copy() + hi.Count = uint64(5 + i) + hi.Sum = float64(18 + i) + _, _, app2, err = app2.AppendHistogram(nil, 0, int64(1000+i*1000), hi, false) + require.NoError(t, err) + } + + for i := range 3 { + hi := h.Copy() + hi.Count = uint64(200 + i) + hi.Sum = float64(200 + i) + _, _, app2, err = app2.AppendHistogram(nil, 100, int64(200000+i*1000), hi, false) + require.NoError(t, err) + } + + it2 := c2.Iterator(nil) + for i := range int(maxFirstSTChangeOn + 1) { + require.Equal(t, ValHistogram, it2.Next()) + require.Equal(t, int64(1000+i*1000), it2.AtT()) + require.Equal(t, int64(0), it2.AtST()) + } + for i := range 3 { + require.Equal(t, ValHistogram, it2.Next()) + require.Equal(t, int64(200000+i*1000), it2.AtT()) + require.Equal(t, int64(100), it2.AtST()) + } + require.Equal(t, ValNone, it2.Next()) + require.NoError(t, it2.Err()) +} + +func TestHistogramSTChunkMixedST(t *testing.T) { + hs := tsdbutil.GenerateTestHistograms(5) + requireHistogramSTSamples(t, []histogramSTSample{ + {st: 0, t: 1000, h: hs[0]}, + {st: 0, t: 2000, h: hs[1]}, + {st: 100, t: 3000, h: hs[2]}, + {st: 0, t: 4000, h: hs[3]}, + {st: 200, t: 5000, h: hs[4]}, + }) +} diff --git a/tsdb/compact.go b/tsdb/compact.go index 7091d34d50..37398a8f4b 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -917,9 +917,9 @@ func (DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compact samples := uint64(chk.Chunk.NumSamples()) meta.Stats.NumSamples += samples switch chk.Chunk.Encoding() { - case chunkenc.EncHistogram, chunkenc.EncFloatHistogram: + case chunkenc.EncHistogram, chunkenc.EncFloatHistogram, chunkenc.EncHistogramST, chunkenc.EncFloatHistogramST: meta.Stats.NumHistogramSamples += samples - case chunkenc.EncXOR: + case chunkenc.EncXOR, chunkenc.EncXOR2: meta.Stats.NumFloatSamples += samples } } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8aaad13c0e..1000b292ea 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -7735,67 +7735,117 @@ func TestHeadAppender_STStorage_WBLReplay(t *testing.T) { // TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding // is used based on EnableSTStorage setting. func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) { - samples := []struct { + testHistogram := tsdbutil.GenerateTestHistogram(1) + testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1) + + type appendableSample struct { st int64 ts int64 fSample float64 - }{ - {st: 10, ts: 100, fSample: 1.0}, - {st: 20, ts: 200, fSample: 2.0}, + h *histogram.Histogram + fh *histogram.FloatHistogram } - for _, enableST := range []bool{false, true} { - t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) { - opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) - opts.EnableSTStorage.Store(enableST) - opts.EnableXOR2Encoding.Store(enableST) // ST storage implies XOR2 encoding. - h, _ := newTestHeadWithOptions(t, compression.None, opts) + for _, tc := range []struct { + name string + samples []appendableSample + stEncoding chunkenc.Encoding + regularEncoding chunkenc.Encoding + }{ + { + name: "float samples", + samples: []appendableSample{ + {st: 10, ts: 100, fSample: 1.0}, + {st: 20, ts: 200, fSample: 2.0}, + }, + stEncoding: chunkenc.EncXOR2, + regularEncoding: chunkenc.EncXOR, + }, + { + name: "histogram samples", + samples: []appendableSample{ + {st: 10, ts: 100, h: testHistogram}, + {st: 20, ts: 200, h: testHistogram}, + }, + stEncoding: chunkenc.EncHistogramST, + regularEncoding: chunkenc.EncHistogram, + }, + { + name: "float histogram samples", + samples: []appendableSample{ + {st: 10, ts: 100, fh: testFloatHistogram}, + {st: 20, ts: 200, fh: testFloatHistogram}, + }, + stEncoding: chunkenc.EncFloatHistogramST, + regularEncoding: chunkenc.EncFloatHistogram, + }, + } { + for _, enableST := range []bool{false, true} { + t.Run(fmt.Sprintf("%s/EnableSTStorage=%t", tc.name, enableST), func(t *testing.T) { + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(enableST) + opts.EnableXOR2Encoding.Store(enableST) // ST storage implies XOR2 encoding. + h, _ := newTestHeadWithOptions(t, compression.None, opts) - lbls := labels.FromStrings("foo", "bar") - a := h.Appender(context.Background()) - for _, s := range samples { - _, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st) - require.NoError(t, err) - _, err = a.Append(0, lbls, s.ts, s.fSample) - require.NoError(t, err) - } - require.NoError(t, a.Commit()) - - ctx := context.Background() - idxReader, err := h.Index() - require.NoError(t, err) - defer idxReader.Close() - - chkReader, err := h.Chunks() - require.NoError(t, err) - defer chkReader.Close() - - p, err := idxReader.Postings(ctx, "foo", "bar") - require.NoError(t, err) - - var lblBuilder labels.ScratchBuilder - require.True(t, p.Next()) - sRef := p.At() - - var chkMetas []chunks.Meta - require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) - require.NotEmpty(t, chkMetas) - - for _, meta := range chkMetas { - chk, iterable, err := chkReader.ChunkOrIterable(meta) - require.NoError(t, err) - require.Nil(t, iterable) - - encoding := chk.Encoding() - if enableST { - require.Equal(t, chunkenc.EncXOR2, encoding, - "Expected ST-capable encoding when EnableSTStorage is true") - } else { - require.Equal(t, chunkenc.EncXOR, encoding, - "Expected regular XOR encoding when EnableSTStorage is false") + lbls := labels.FromStrings("foo", "bar") + a := h.Appender(context.Background()) + for _, s := range tc.samples { + switch { + case s.h != nil: + _, err := a.AppendHistogramSTZeroSample(0, lbls, s.ts, s.st, s.h, nil) + require.NoError(t, err) + _, err = a.AppendHistogram(0, lbls, s.ts, s.h, nil) + require.NoError(t, err) + case s.fh != nil: + _, err := a.AppendHistogramSTZeroSample(0, lbls, s.ts, s.st, nil, s.fh) + require.NoError(t, err) + _, err = a.AppendHistogram(0, lbls, s.ts, nil, s.fh) + require.NoError(t, err) + default: + _, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st) + require.NoError(t, err) + _, err = a.Append(0, lbls, s.ts, s.fSample) + require.NoError(t, err) + } } - } - }) + require.NoError(t, a.Commit()) + + ctx := context.Background() + idxReader, err := h.Index() + require.NoError(t, err) + defer idxReader.Close() + + chkReader, err := h.Chunks() + require.NoError(t, err) + defer chkReader.Close() + + p, err := idxReader.Postings(ctx, "foo", "bar") + require.NoError(t, err) + + var lblBuilder labels.ScratchBuilder + require.True(t, p.Next()) + sRef := p.At() + + var chkMetas []chunks.Meta + require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) + require.NotEmpty(t, chkMetas) + + for _, meta := range chkMetas { + chk, iterable, err := chkReader.ChunkOrIterable(meta) + require.NoError(t, err) + require.Nil(t, iterable) + + encoding := chk.Encoding() + if enableST { + require.Equal(t, tc.stEncoding, encoding, + "Expected ST-capable encoding when EnableSTStorage is true") + } else { + require.Equal(t, tc.regularEncoding, encoding, + "Expected regular encoding when EnableSTStorage is false") + } + } + }) + } } } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index de8b19b6c1..0918c47ace 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -1212,7 +1212,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutUvarintBytes(s.headChunks.chunk.Bytes()) switch enc { - case chunkenc.EncXOR: + case chunkenc.EncXOR, chunkenc.EncXOR2: // Backwards compatibility for old sampleBuf which had last 4 samples. for range 3 { buf.PutBE64int64(0) @@ -1220,9 +1220,9 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { } buf.PutBE64int64(0) buf.PutBEFloat64(s.lastValue) - case chunkenc.EncHistogram: + case chunkenc.EncHistogram, chunkenc.EncHistogramST: record.EncodeHistogram(&buf, s.lastHistogramValue) - default: // chunkenc.FloatHistogram. + default: // chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST. record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue) } } @@ -1273,10 +1273,10 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh } _ = dec.Be64int64() csr.lastValue = dec.Be64Float64() - case chunkenc.EncHistogram: + case chunkenc.EncHistogram, chunkenc.EncHistogramST: csr.lastHistogramValue = &histogram.Histogram{} record.DecodeHistogram(&dec, csr.lastHistogramValue) - default: // chunkenc.FloatHistogram. + default: // chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST. csr.lastFloatHistogramValue = &histogram.FloatHistogram{} record.DecodeFloatHistogram(&dec, csr.lastFloatHistogramValue) } diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 60cee8d005..c3959d0204 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -96,11 +96,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memCh encoding := chunkenc.ValFloat.ChunkEncoding(useXOR2) switch { case s.h != nil: - // TODO(krajorama): use ST capable histogram chunk. - encoding = chunkenc.EncHistogram + encoding = chunkenc.ValHistogram.ChunkEncoding(useXOR2) case s.fh != nil: - // TODO(krajorama): use ST capable float histogram chunk. - encoding = chunkenc.EncFloatHistogram + encoding = chunkenc.ValFloatHistogram.ChunkEncoding(useXOR2) } // prevApp is the appender for the previous sample. @@ -125,10 +123,15 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memCh switch encoding { case chunkenc.EncXOR, chunkenc.EncXOR2: app.Append(s.st, s.t, s.f) - case chunkenc.EncHistogram: - // TODO(krajorama): handle ST capable histogram chunk. + case chunkenc.EncHistogram, chunkenc.EncHistogramST: // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. - prevHApp, _ := prevApp.(*chunkenc.HistogramAppender) + var prevHApp *chunkenc.HistogramAppender + switch p := prevApp.(type) { + case *chunkenc.HistogramAppender: + prevHApp = p + case *chunkenc.HistogramSTAppender: + prevHApp = &p.HistogramAppender + } var ( newChunk chunkenc.Chunk recoded bool @@ -141,10 +144,15 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memCh } chunk = newChunk } - case chunkenc.EncFloatHistogram: - // TODO(krajorama): handle ST capable float histogram chunk. + case chunkenc.EncFloatHistogram, chunkenc.EncFloatHistogramST: // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. - prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender) + var prevHApp *chunkenc.FloatHistogramAppender + switch p := prevApp.(type) { + case *chunkenc.FloatHistogramAppender: + prevHApp = p + case *chunkenc.FloatHistogramSTAppender: + prevHApp = &p.FloatHistogramAppender + } var ( newChunk chunkenc.Chunk recoded bool diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index d410835571..e564abc5e8 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -367,30 +367,44 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { } } -// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with useXOR2=true and useXOR2=false for float samples. +// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with useXOR2=true and useXOR2=false. // When useXOR2=true, st values are preserved; when useXOR2=false, AtST() returns 0. -// TODO(@krajorama): Add histogram test cases once ST storage is implemented for histograms. func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) { + h1 := tsdbutil.GenerateTestHistogram(1) + h2 := tsdbutil.GenerateTestHistogram(2) + h3 := tsdbutil.GenerateTestHistogram(3) + fh1 := tsdbutil.GenerateTestFloatHistogram(1) + fh2 := tsdbutil.GenerateTestFloatHistogram(2) + fh3 := tsdbutil.GenerateTestFloatHistogram(3) + testCases := map[string]struct { - samples []sample + samples []sample + xor2Encoding chunkenc.Encoding // Expected encoding when useXOR2=true. + regularEncoding chunkenc.Encoding // Expected encoding when useXOR2=false. }{ "floats with st=0": { samples: []sample{ {st: 0, t: 1000, f: 43.0}, {st: 0, t: 1100, f: 42.0}, }, + xor2Encoding: chunkenc.EncXOR2, + regularEncoding: chunkenc.EncXOR, }, "floats with st=t": { samples: []sample{ {st: 1000, t: 1000, f: 43.0}, {st: 1100, t: 1100, f: 42.0}, }, + xor2Encoding: chunkenc.EncXOR2, + regularEncoding: chunkenc.EncXOR, }, "floats with st=t-100": { samples: []sample{ {st: 900, t: 1000, f: 43.0}, {st: 1000, t: 1100, f: 42.0}, }, + xor2Encoding: chunkenc.EncXOR2, + regularEncoding: chunkenc.EncXOR, }, "floats with varying st": { samples: []sample{ @@ -398,16 +412,67 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) { {st: 1100, t: 1100, f: 42.0}, // st == t {st: 0, t: 1200, f: 41.0}, // st == 0 }, + xor2Encoding: chunkenc.EncXOR2, + regularEncoding: chunkenc.EncXOR, + }, + "histograms with st=0": { + samples: []sample{ + {st: 0, t: 1000, h: h1}, + {st: 0, t: 1100, h: h2}, + }, + xor2Encoding: chunkenc.EncHistogramST, + regularEncoding: chunkenc.EncHistogram, + }, + "histograms with st=t-100": { + samples: []sample{ + {st: 900, t: 1000, h: h1}, + {st: 1000, t: 1100, h: h2}, + }, + xor2Encoding: chunkenc.EncHistogramST, + regularEncoding: chunkenc.EncHistogram, + }, + "histograms with varying st": { + samples: []sample{ + {st: 500, t: 1000, h: h1}, + {st: 1100, t: 1100, h: h2}, + {st: 0, t: 1200, h: h3}, + }, + xor2Encoding: chunkenc.EncHistogramST, + regularEncoding: chunkenc.EncHistogram, + }, + "float histograms with st=0": { + samples: []sample{ + {st: 0, t: 1000, fh: fh1}, + {st: 0, t: 1100, fh: fh2}, + }, + xor2Encoding: chunkenc.EncFloatHistogramST, + regularEncoding: chunkenc.EncFloatHistogram, + }, + "float histograms with st=t-100": { + samples: []sample{ + {st: 900, t: 1000, fh: fh1}, + {st: 1000, t: 1100, fh: fh2}, + }, + xor2Encoding: chunkenc.EncFloatHistogramST, + regularEncoding: chunkenc.EncFloatHistogram, + }, + "float histograms with varying st": { + samples: []sample{ + {st: 500, t: 1000, fh: fh1}, + {st: 1100, t: 1100, fh: fh2}, + {st: 0, t: 1200, fh: fh3}, + }, + xor2Encoding: chunkenc.EncFloatHistogramST, + regularEncoding: chunkenc.EncFloatHistogram, }, } storageScenarios := []struct { - name string - useXOR2 bool - expectedEncoding chunkenc.Encoding + name string + useXOR2 bool }{ - {"useXOR2=true", true, chunkenc.EncXOR2}, - {"useXOR2=false", false, chunkenc.EncXOR}, + {"useXOR2=true", true}, + {"useXOR2=false", false}, } for name, tc := range testCases { @@ -415,7 +480,7 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) { t.Run(name+"/"+ss.name, func(t *testing.T) { oooChunk := OOOChunk{} for _, s := range tc.samples { - oooChunk.Insert(s.st, s.t, s.f, nil, nil) + oooChunk.Insert(s.st, s.t, s.f, s.h, s.fh) } chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, ss.useXOR2) @@ -423,28 +488,47 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) { require.Len(t, chunks, 1, "number of chunks") c := chunks[0] - require.Equal(t, ss.expectedEncoding, c.chunk.Encoding(), "chunk encoding") + expectedEnc := tc.regularEncoding + if ss.useXOR2 { + expectedEnc = tc.xor2Encoding + } + require.Equal(t, expectedEnc, c.chunk.Encoding(), "chunk encoding") require.Equal(t, tc.samples[0].t, c.minTime, "chunk minTime") require.Equal(t, tc.samples[len(tc.samples)-1].t, c.maxTime, "chunk maxTime") - // Verify samples can be read back with correct st and t values. + // Verify samples can be read back. it := c.chunk.Iterator(nil) sampleIndex := 0 - for it.Next() == chunkenc.ValFloat { - gotT, gotF := it.At() - gotST := it.AtST() + for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { + expSample := tc.samples[sampleIndex] + switch vt { + case chunkenc.ValFloat: + gotT, gotF := it.At() + require.Equal(t, expSample.t, gotT, "sample %d t", sampleIndex) + require.Equal(t, expSample.f, gotF, "sample %d f", sampleIndex) + case chunkenc.ValHistogram: + gotT, gotH := it.AtHistogram(nil) + require.Equal(t, expSample.t, gotT, "sample %d t", sampleIndex) + expH := expSample.h.Copy() + expH.CounterResetHint = gotH.CounterResetHint + require.Equal(t, expH, gotH.Compact(0), "sample %d h", sampleIndex) + case chunkenc.ValFloatHistogram: + gotT, gotFH := it.AtFloatHistogram(nil) + require.Equal(t, expSample.t, gotT, "sample %d t", sampleIndex) + expFH := expSample.fh.Copy() + expFH.CounterResetHint = gotFH.CounterResetHint + require.Equal(t, expFH, gotFH.Compact(0), "sample %d fh", sampleIndex) + } + gotST := it.AtST() if ss.useXOR2 { - // When useXOR2=true, st values should be preserved. - require.Equal(t, tc.samples[sampleIndex].st, gotST, "sample %d st", sampleIndex) + require.Equal(t, expSample.st, gotST, "sample %d st", sampleIndex) } else { - // When useXOR2=false, AtST() should return 0. require.Equal(t, int64(0), gotST, "sample %d st should be 0 when useXOR2=false", sampleIndex) } - require.Equal(t, tc.samples[sampleIndex].t, gotT, "sample %d t", sampleIndex) - require.Equal(t, tc.samples[sampleIndex].f, gotF, "sample %d f", sampleIndex) sampleIndex++ } + require.NoError(t, it.Err()) require.Equal(t, len(tc.samples), sampleIndex, "number of samples") }) }