diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 09d50326ba..db83c53e3a 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -77,7 +77,7 @@ type HistoChunk struct { // NewHistoChunk returns a new chunk with Histo encoding of the given size. func NewHistoChunk() *HistoChunk { - b := make([]byte, 2, 128) + b := make([]byte, 3, 128) return &HistoChunk{b: bstream{stream: b, count: 0}} } @@ -106,6 +106,32 @@ func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, return readHistoChunkMeta(&b) } +// CounterResetHeader defines the first 2 bits of the chunk header. +type CounterResetHeader byte + +const ( + CounterReset CounterResetHeader = 0b10000000 + NotCounterReset CounterResetHeader = 0b01000000 + GaugeType CounterResetHeader = 0b11000000 + UnknownCounterReset CounterResetHeader = 0b00000000 +) + +// SetCounterResetHeader sets the counter reset header. +func (c *HistoChunk) SetCounterResetHeader(h CounterResetHeader) { + switch h { + case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset: + bytes := c.Bytes() + bytes[2] = (bytes[2] & 0b00111111) | byte(h) + default: + panic("invalid CounterResetHeader type") + } +} + +// GetCounterResetHeader returns the info about the first 2 bits of the chunk header. +func (c *HistoChunk) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(c.Bytes()[2] & 0b11000000) +} + // Compact implements the Chunk interface. func (c *HistoChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { @@ -249,42 +275,56 @@ func (a *HistoAppender) Append(int64, float64) {} // * any buckets disappeared // * there was a counter reset in the count of observations or in any bucket, including the zero bucket // * the last sample in the chunk was stale while the current sample is not stale +// It returns an additional boolean set to true if it is not appendable because of a counter reset. // If the given sample is stale, it will always return true. -func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) { +// If counterReset is true, okToAppend MUST be false. +func (a *HistoAppender) Appendable(h histogram.SparseHistogram) (posInterjections []Interjection, negInterjections []Interjection, okToAppend bool, counterReset bool) { if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. - return nil, nil, true + okToAppend = true + return } if value.IsStaleNaN(a.sum) { // If the last sample was stale, then we can only accept stale samples in this chunk. - return nil, nil, false + return + } + + if h.Count < a.cnt { + // There has been a counter reset. + counterReset = true + return } if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { - return nil, nil, false + return } - posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans) + + if h.ZeroCount < a.zcnt { + // There has been a counter reset since ZeroThreshold didn't change. + counterReset = true + return + } + + var ok bool + posInterjections, ok = compareSpans(a.posSpans, h.PositiveSpans) if !ok { - return nil, nil, false + counterReset = true + return } - negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans) + negInterjections, ok = compareSpans(a.negSpans, h.NegativeSpans) if !ok { - return nil, nil, false + counterReset = true + return } - if h.Count < a.cnt || h.ZeroCount < a.zcnt { - // There has been a counter reset. - return nil, nil, false + if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) || + counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) { + counterReset, posInterjections, negInterjections = true, nil, nil + return } - if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) { - return nil, nil, false - } - if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) { - return nil, nil, false - } - - return posInterjections, negInterjections, ok + okToAppend = true + return } // counterResetInAnyBucket returns true if there was a counter reset for any bucket. @@ -476,7 +516,8 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection // it again with the new span layout. This can probably be done in-place // by editing the chunk. But let's first see how expensive it is in the // big picture. - it := newHistoIterator(a.b.bytes()) + byts := a.b.bytes() + it := newHistoIterator(byts) hc := NewHistoChunk() app, err := hc.Appender() if err != nil { @@ -504,6 +545,9 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection } app.AppendHistogram(tOld, hOld) } + + // Set the flags. + hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000)) return hc, app } @@ -643,6 +687,7 @@ func (it *histoIterator) Next() bool { if it.numRead == 0 { // first read is responsible for reading chunk metadata and initializing fields that depend on it + // We give counter reset info at chunk level, hence we discard it here. schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) if err != nil { it.err = err diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index e470fd8469..b685035fb8 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -32,29 +32,34 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) { } } -func readHistoChunkMeta(b *bstreamReader) (int32, float64, []histogram.Span, []histogram.Span, error) { +func readHistoChunkMeta(b *bstreamReader) (schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) { + _, err = b.ReadByte() // The header. + if err != nil { + return + } + v, err := readInt64VBBucket(b) if err != nil { - return 0, 0, nil, nil, err + return } - schema := int32(v) + schema = int32(v) - zeroThreshold, err := readFloat64VBBucket(b) + zeroThreshold, err = readFloat64VBBucket(b) if err != nil { - return 0, 0, nil, nil, err + return } - posSpans, err := readHistoChunkMetaSpans(b) + posSpans, err = readHistoChunkMetaSpans(b) if err != nil { - return 0, 0, nil, nil, err + return } - negSpans, err := readHistoChunkMetaSpans(b) + negSpans, err = readHistoChunkMetaSpans(b) if err != nil { - return 0, 0, nil, nil, err + return } - return schema, zeroThreshold, posSpans, negSpans, nil + return } func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index ccbafe4472..71210e57a7 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -163,10 +163,11 @@ func TestHistoChunkBucketChanges(t *testing.T) { // This is how span changes will be handled. histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Greater(t, len(posInterjections), 0) require.Equal(t, 0, len(negInterjections)) require.True(t, ok) // Only new buckets came in. + require.False(t, cr) c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) app.AppendHistogram(ts2, h2) @@ -234,10 +235,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Greater(t, len(posInterjections), 0) require.Equal(t, 0, len(negInterjections)) require.True(t, ok) // Only new buckets came in. + require.False(t, cr) } { // New histogram that has a bucket missing. @@ -252,10 +254,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) } { // New histogram that has a counter reset while buckets are same. @@ -264,10 +267,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) } { // New histogram that has a counter reset while new buckets were added. @@ -284,10 +288,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) } { // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket. @@ -307,9 +312,10 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) } } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index a73231db71..1c22c86c0e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -605,20 +605,29 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the sparse histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { + // Head controls the execution of recoding, so that we own the proper chunk reference afterwards. + // We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk, + // we need to know if there was also a counter reset or not to set the meta properly. + app, _ := s.app.(*chunkenc.HistoAppender) + var ( + posInterjections, negInterjections []chunkenc.Interjection + okToAppend, counterReset bool + ) + if app != nil { + posInterjections, negInterjections, okToAppend, counterReset = app.Appendable(sh) + } + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper) if !sampleInOrder { return sampleInOrder, chunkCreated } if !chunkCreated { - // Head controls the execution of recoding, so that we own the proper chunk reference afterwards - app, _ := s.app.(*chunkenc.HistoAppender) - posInterjections, negInterjections, ok := app.Appendable(sh) - // we have 3 cases here - // !ok -> we need to cut a new chunk - // ok but we have interjections -> existing chunk needs recoding before we can append our histogram - // ok and no interjections -> chunk is ready to support our histogram - if !ok { + // We have 3 cases here + // !okToAppend -> we need to cut a new chunk + // okToAppend but we have interjections -> existing chunk needs recoding before we can append our histogram + // okToAppend and no interjections -> chunk is ready to support our histogram + if !okToAppend || counterReset { c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper) chunkCreated = true } else if len(posInterjections) > 0 || len(negInterjections) > 0 { @@ -633,6 +642,17 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen } } + if chunkCreated { + hc := s.headChunk.chunk.(*chunkenc.HistoChunk) + header := chunkenc.UnknownCounterReset + if counterReset { + header = chunkenc.CounterReset + } else if okToAppend { + header = chunkenc.NotCounterReset + } + hc.SetCounterResetHeader(header) + } + s.app.AppendHistogram(t, sh) s.sparseHistogramSeries = true diff --git a/tsdb/querier.go b/tsdb/querier.go index c5d5f2bae4..7e4c3269b5 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -717,6 +717,9 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { h histogram.SparseHistogram ) if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS { + if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistoChunk); ok { + newChunk.(*chunkenc.HistoChunk).SetCounterResetHeader(hc.GetCounterResetHeader()) + } t, h = p.currDelIter.AtHistogram() p.curr.MinTime = t app.AppendHistogram(t, h.Copy())