From eb9931e9610e53ff7b0cc56d86d2b6ce4748888a Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 4 Oct 2021 18:44:12 +0530 Subject: [PATCH 1/6] Add info about counter resets in chunk meta Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/chunk.go | 2 +- tsdb/chunkenc/histo.go | 51 +++++++++++++++++++++++++------------ tsdb/chunkenc/histo_meta.go | 31 +++++++++++++++++----- tsdb/chunkenc/histo_test.go | 30 +++++++++++++--------- tsdb/chunkenc/xor.go | 2 +- tsdb/head_append.go | 11 +++++--- 6 files changed, 86 insertions(+), 41 deletions(-) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index a356ea019e..6d03cc2cca 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -82,7 +82,7 @@ type Chunk interface { // Appender adds sample pairs to a chunk. type Appender interface { Append(int64, float64) - AppendHistogram(t int64, h histogram.SparseHistogram) + AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) } // Iterator is a simple iterator that can only get the next value. diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 09d50326ba..972bb8a665 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -77,7 +77,7 @@ type HistoChunk struct { // NewHistoChunk returns a new chunk with Histo encoding of the given size. func NewHistoChunk() *HistoChunk { - b := make([]byte, 2, 128) + b := make([]byte, 3, 128) return &HistoChunk{b: bstream{stream: b, count: 0}} } @@ -98,7 +98,7 @@ func (c *HistoChunk) NumSamples() int { // Meta returns the histogram metadata. // callers may only call this on chunks that have at least one sample -func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, error) { +func (c *HistoChunk) Meta() (bool, int32, float64, []histogram.Span, []histogram.Span, error) { if c.NumSamples() == 0 { panic("HistoChunk.Meta() called on an empty chunk") } @@ -106,6 +106,14 @@ func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, return readHistoChunkMeta(&b) } +// CounterReset returns true if this new chunk was created because of a counter reset. +func (c *HistoChunk) CounterReset() bool { + if c.NumSamples() == 0 { + panic("HistoChunk.CounterReset() called on an empty chunk") + } + return (c.Bytes()[2] & counterResetMask) != 0 +} + // Compact implements the Chunk interface. func (c *HistoChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { @@ -200,6 +208,7 @@ type HistoAppender struct { b *bstream // Metadata: + counterReset bool schema int32 zeroThreshold float64 posSpans, negSpans []histogram.Span @@ -249,42 +258,43 @@ func (a *HistoAppender) Append(int64, float64) {} // * any buckets disappeared // * there was a counter reset in the count of observations or in any bucket, including the zero bucket // * the last sample in the chunk was stale while the current sample is not stale +// It returns an additional boolean set to true if it is not appendable because of a counter reset. // If the given sample is stale, it will always return true. -func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) { +func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool, bool) { if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. - return nil, nil, true + return nil, nil, true, false } if value.IsStaleNaN(a.sum) { // If the last sample was stale, then we can only accept stale samples in this chunk. - return nil, nil, false + return nil, nil, false, false } if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { - return nil, nil, false + return nil, nil, false, false } posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans) if !ok { - return nil, nil, false + return nil, nil, false, false } negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans) if !ok { - return nil, nil, false + return nil, nil, false, false } if h.Count < a.cnt || h.ZeroCount < a.zcnt { // There has been a counter reset. - return nil, nil, false + return nil, nil, false, false } if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) { - return nil, nil, false + return nil, nil, false, true } if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) { - return nil, nil, false + return nil, nil, false, true } - return posInterjections, negInterjections, ok + return posInterjections, negInterjections, true, false } // counterResetInAnyBucket returns true if there was a counter reset for any bucket. @@ -358,7 +368,7 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans // histogram is properly structured. E.g. that the number of pos/neg buckets // used corresponds to the number conveyed by the pos/neg span structures. // callers must call Appendable() first and act accordingly! -func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { +func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) { var tDelta, cntDelta, zcntDelta int64 num := binary.BigEndian.Uint16(a.b.bytes()) @@ -368,12 +378,17 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { h = histogram.SparseHistogram{Sum: h.Sum} } + if num != 0 && counterReset { + panic("got counterReset=true for partially filled chunk in HistoAppender.AppendHistogram") + } + switch num { case 0: // the first append gets the privilege to dictate the metadata // but it's also responsible for encoding it into the chunk! - writeHistoChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) + writeHistoChunkMeta(a.b, counterReset, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) + a.counterReset = true a.schema = h.Schema a.zeroThreshold = h.ZeroThreshold a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans @@ -484,6 +499,7 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection } numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) + counterReset := a.counterReset for it.Next() { tOld, hOld := it.AtHistogram() @@ -502,7 +518,9 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection if len(negInterjections) > 0 { hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negBuckets, negInterjections) } - app.AppendHistogram(tOld, hOld) + app.AppendHistogram(tOld, hOld, counterReset) + + counterReset = false // We need it only for the first sample. } return hc, app } @@ -643,7 +661,8 @@ func (it *histoIterator) Next() bool { if it.numRead == 0 { // first read is responsible for reading chunk metadata and initializing fields that depend on it - schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) + // We give counter reset info at chunk level, hence we discard it here. + _, schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) if err != nil { it.err = err return false diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index e470fd8469..47d7a78b70 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -17,7 +17,17 @@ import ( "github.com/prometheus/prometheus/pkg/histogram" ) -func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { +const ( + counterResetMask = 0b10000000 +) + +func writeHistoChunkMeta(b *bstream, counterReset bool, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { + header := byte(0) + if counterReset { + header |= counterResetMask + } + b.bytes()[2] = header + putInt64VBBucket(b, int64(schema)) putFloat64VBBucket(b, zeroThreshold) putHistoChunkMetaSpans(b, posSpans) @@ -32,29 +42,36 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) { } } -func readHistoChunkMeta(b *bstreamReader) (int32, float64, []histogram.Span, []histogram.Span, error) { +func readHistoChunkMeta(b *bstreamReader) (bool, int32, float64, []histogram.Span, []histogram.Span, error) { + header, err := b.ReadByte() + if err != nil { + return false, 0, 0, nil, nil, err + } + + counterReset := (header & counterResetMask) != 0 + v, err := readInt64VBBucket(b) if err != nil { - return 0, 0, nil, nil, err + return false, 0, 0, nil, nil, err } schema := int32(v) zeroThreshold, err := readFloat64VBBucket(b) if err != nil { - return 0, 0, nil, nil, err + return false, 0, 0, nil, nil, err } posSpans, err := readHistoChunkMetaSpans(b) if err != nil { - return 0, 0, nil, nil, err + return false, 0, 0, nil, nil, err } negSpans, err := readHistoChunkMetaSpans(b) if err != nil { - return 0, 0, nil, nil, err + return false, 0, 0, nil, nil, err } - return schema, zeroThreshold, posSpans, negSpans, nil + return counterReset, schema, zeroThreshold, posSpans, negSpans, nil } func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index ccbafe4472..71ca36baaa 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -42,7 +42,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { }, PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) } - app.AppendHistogram(ts, h) + app.AppendHistogram(ts, h, false) exp = append(exp, res{t: ts, h: h}) require.Equal(t, 1, c.NumSamples()) @@ -52,7 +52,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { h.ZeroCount++ h.Sum = 24.4 h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) - app.AppendHistogram(ts, h) + app.AppendHistogram(ts, h, false) exp = append(exp, res{t: ts, h: h}) require.Equal(t, 2, c.NumSamples()) @@ -65,7 +65,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { h.ZeroCount += 2 h.Sum = 24.4 h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) - app.AppendHistogram(ts, h) + app.AppendHistogram(ts, h, false) exp = append(exp, res{t: ts, h: h}) require.Equal(t, 3, c.NumSamples()) @@ -142,7 +142,7 @@ func TestHistoChunkBucketChanges(t *testing.T) { PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) } - app.AppendHistogram(ts1, h1) + app.AppendHistogram(ts1, h1, false) require.Equal(t, 1, c.NumSamples()) // Add a new histogram that has expanded buckets. @@ -163,12 +163,13 @@ func TestHistoChunkBucketChanges(t *testing.T) { // This is how span changes will be handled. histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Greater(t, len(posInterjections), 0) require.Equal(t, 0, len(negInterjections)) require.True(t, ok) // Only new buckets came in. + require.False(t, cr) c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) - app.AppendHistogram(ts2, h2) + app.AppendHistogram(ts2, h2, false) require.Equal(t, 2, c.NumSamples()) @@ -215,7 +216,7 @@ func TestHistoChunkAppendable(t *testing.T) { PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) } - app.AppendHistogram(ts, h1) + app.AppendHistogram(ts, h1, false) require.Equal(t, 1, c.NumSamples()) { // New histogram that has more buckets. @@ -234,10 +235,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Greater(t, len(posInterjections), 0) require.Equal(t, 0, len(negInterjections)) require.True(t, ok) // Only new buckets came in. + require.False(t, cr) } { // New histogram that has a bucket missing. @@ -252,10 +254,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.False(t, cr) } { // New histogram that has a counter reset while buckets are same. @@ -264,10 +267,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) } { // New histogram that has a counter reset while new buckets were added. @@ -284,10 +288,11 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) } { // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket. @@ -307,9 +312,10 @@ func TestHistoChunkAppendable(t *testing.T) { h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26) histoApp, _ := app.(*HistoAppender) - posInterjections, negInterjections, ok := histoApp.Appendable(h2) + posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2) require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) } } diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 24cec61cbd..03ca08e8f8 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -150,7 +150,7 @@ type xorAppender struct { trailing uint8 } -func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { +func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) { //panic("cannot call xorAppender.AppendHistogram().") } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index a73231db71..dbec3e49e0 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -605,15 +605,18 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the sparse histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { + // Head controls the execution of recoding, so that we own the proper chunk reference afterwards. + // We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk, + // we need to know if there was also a counter reset or not to set the meta properly. + app, _ := s.app.(*chunkenc.HistoAppender) + posInterjections, negInterjections, ok, counterReset := app.Appendable(sh) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper) if !sampleInOrder { return sampleInOrder, chunkCreated } if !chunkCreated { - // Head controls the execution of recoding, so that we own the proper chunk reference afterwards - app, _ := s.app.(*chunkenc.HistoAppender) - posInterjections, negInterjections, ok := app.Appendable(sh) // we have 3 cases here // !ok -> we need to cut a new chunk // ok but we have interjections -> existing chunk needs recoding before we can append our histogram @@ -633,7 +636,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen } } - s.app.AppendHistogram(t, sh) + s.app.AppendHistogram(t, sh, counterReset) s.sparseHistogramSeries = true c.maxTime = t From 59d77ff16d212d0165cac671affe254586a18f11 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 5 Oct 2021 12:57:49 +0530 Subject: [PATCH 2/6] Fix build Signed-off-by: Ganesh Vernekar --- tsdb/querier.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tsdb/querier.go b/tsdb/querier.go index c5d5f2bae4..fd46e591f5 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -717,12 +717,16 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { h histogram.SparseHistogram ) if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS { + counterReset := false + if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistoChunk); ok { + counterReset = hc.CounterReset() + } t, h = p.currDelIter.AtHistogram() p.curr.MinTime = t - app.AppendHistogram(t, h.Copy()) + app.AppendHistogram(t, h.Copy(), counterReset) for p.currDelIter.Next() { t, h = p.currDelIter.AtHistogram() - app.AppendHistogram(t, h.Copy()) + app.AppendHistogram(t, h.Copy(), false) } } else { t, v = p.currDelIter.At() From 9d81b2d610bfe8ec9c988e87b6f0f06e6b6004f8 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 5 Oct 2021 14:21:24 +0530 Subject: [PATCH 3/6] Fix tests Signed-off-by: Ganesh Vernekar --- tsdb/head_append.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index dbec3e49e0..cc200e9e28 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -609,7 +609,13 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen // We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk, // we need to know if there was also a counter reset or not to set the meta properly. app, _ := s.app.(*chunkenc.HistoAppender) - posInterjections, negInterjections, ok, counterReset := app.Appendable(sh) + var ( + posInterjections, negInterjections []chunkenc.Interjection + ok, counterReset bool + ) + if app != nil { + posInterjections, negInterjections, ok, counterReset = app.Appendable(sh) + } c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper) if !sampleInOrder { From a280b6c2daaa9ad1239a45996fc72eaff502f83b Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 6 Oct 2021 15:28:10 +0530 Subject: [PATCH 4/6] Fix review comments Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/histo.go | 50 +++++++++++++++++++++++-------------- tsdb/chunkenc/histo_meta.go | 27 ++++++++++---------- tsdb/chunkenc/histo_test.go | 2 +- 3 files changed, 46 insertions(+), 33 deletions(-) diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 972bb8a665..3ab6948d0c 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -260,41 +260,53 @@ func (a *HistoAppender) Append(int64, float64) {} // * the last sample in the chunk was stale while the current sample is not stale // It returns an additional boolean set to true if it is not appendable because of a counter reset. // If the given sample is stale, it will always return true. -func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool, bool) { +func (a *HistoAppender) Appendable(h histogram.SparseHistogram) (posInterjections []Interjection, negInterjections []Interjection, okToAppend bool, counterReset bool) { if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. - return nil, nil, true, false + okToAppend = true + return } if value.IsStaleNaN(a.sum) { // If the last sample was stale, then we can only accept stale samples in this chunk. - return nil, nil, false, false + return + } + + if h.Count < a.cnt { + // There has been a counter reset. + counterReset = true + return } if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { - return nil, nil, false, false + return } - posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans) + + if h.ZeroCount < a.zcnt { + // There has been a counter reset since ZeroThreshold didn't change. + counterReset = true + return + } + + var ok bool + posInterjections, ok = compareSpans(a.posSpans, h.PositiveSpans) if !ok { - return nil, nil, false, false + counterReset = true + return } - negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans) + negInterjections, ok = compareSpans(a.negSpans, h.NegativeSpans) if !ok { - return nil, nil, false, false + counterReset = true + return } - if h.Count < a.cnt || h.ZeroCount < a.zcnt { - // There has been a counter reset. - return nil, nil, false, false + if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) || + counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) { + counterReset, posInterjections, negInterjections = true, nil, nil + return } - if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) { - return nil, nil, false, true - } - if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) { - return nil, nil, false, true - } - - return posInterjections, negInterjections, true, false + okToAppend = true + return } // counterResetInAnyBucket returns true if there was a counter reset for any bucket. diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index 47d7a78b70..9308993ce5 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -42,36 +42,37 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) { } } -func readHistoChunkMeta(b *bstreamReader) (bool, int32, float64, []histogram.Span, []histogram.Span, error) { - header, err := b.ReadByte() +func readHistoChunkMeta(b *bstreamReader) (counterReset bool, schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) { + var header byte + header, err = b.ReadByte() if err != nil { - return false, 0, 0, nil, nil, err + return } - counterReset := (header & counterResetMask) != 0 + counterReset = (header & counterResetMask) != 0 v, err := readInt64VBBucket(b) if err != nil { - return false, 0, 0, nil, nil, err + return } - schema := int32(v) + schema = int32(v) - zeroThreshold, err := readFloat64VBBucket(b) + zeroThreshold, err = readFloat64VBBucket(b) if err != nil { - return false, 0, 0, nil, nil, err + return } - posSpans, err := readHistoChunkMetaSpans(b) + posSpans, err = readHistoChunkMetaSpans(b) if err != nil { - return false, 0, 0, nil, nil, err + return } - negSpans, err := readHistoChunkMetaSpans(b) + negSpans, err = readHistoChunkMetaSpans(b) if err != nil { - return false, 0, 0, nil, nil, err + return } - return counterReset, schema, zeroThreshold, posSpans, negSpans, nil + return } func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index 71ca36baaa..cb5a3734eb 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -258,7 +258,7 @@ func TestHistoChunkAppendable(t *testing.T) { require.Equal(t, 0, len(posInterjections)) require.Equal(t, 0, len(negInterjections)) require.False(t, ok) // Need to cut a new chunk. - require.False(t, cr) + require.True(t, cr) } { // New histogram that has a counter reset while buckets are same. From 175ef4ebcf33fd5292dea5d363a0a85659362ddb Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 6 Oct 2021 16:02:19 +0530 Subject: [PATCH 5/6] Add a NotCounterReset flag Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/chunk.go | 2 +- tsdb/chunkenc/histo.go | 60 +++++++++++++++++++++++++++---------- tsdb/chunkenc/histo_meta.go | 18 ++++------- tsdb/chunkenc/histo_test.go | 12 ++++---- tsdb/chunkenc/xor.go | 2 +- tsdb/head_append.go | 25 +++++++++++----- tsdb/querier.go | 8 ++--- 7 files changed, 79 insertions(+), 48 deletions(-) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 6d03cc2cca..a356ea019e 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -82,7 +82,7 @@ type Chunk interface { // Appender adds sample pairs to a chunk. type Appender interface { Append(int64, float64) - AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) + AppendHistogram(t int64, h histogram.SparseHistogram) } // Iterator is a simple iterator that can only get the next value. diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 3ab6948d0c..03de2797c6 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -98,7 +98,7 @@ func (c *HistoChunk) NumSamples() int { // Meta returns the histogram metadata. // callers may only call this on chunks that have at least one sample -func (c *HistoChunk) Meta() (bool, int32, float64, []histogram.Span, []histogram.Span, error) { +func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, error) { if c.NumSamples() == 0 { panic("HistoChunk.Meta() called on an empty chunk") } @@ -106,6 +106,18 @@ func (c *HistoChunk) Meta() (bool, int32, float64, []histogram.Span, []histogram return readHistoChunkMeta(&b) } +// SetCounterReset sets the counter reset flag to 1 if the passed argument is true, 0 otherwise. +func (c *HistoChunk) SetCounterReset(counterReset bool) { + bytes := c.Bytes() + header := bytes[2] + if counterReset { + header |= counterResetMask + } else if (header & counterResetMask) != 0 { + header ^= counterResetMask + } + bytes[2] = header +} + // CounterReset returns true if this new chunk was created because of a counter reset. func (c *HistoChunk) CounterReset() bool { if c.NumSamples() == 0 { @@ -114,6 +126,27 @@ func (c *HistoChunk) CounterReset() bool { return (c.Bytes()[2] & counterResetMask) != 0 } +// SetNotCounterReset sets the "not counter reset" flag to 1 if the passed argument is true, 0 otherwise. +func (c *HistoChunk) SetNotCounterReset(notCounterReset bool) { + bytes := c.Bytes() + header := bytes[2] + if notCounterReset { + header |= notCounterResetMask + } else if (header & notCounterResetMask) != 0 { + header ^= notCounterResetMask + } + bytes[2] = header +} + +// NotCounterReset returns true if this new chunk definitely did not have counter reset +// from the earlier chunk. +func (c *HistoChunk) NotCounterReset() bool { + if c.NumSamples() == 0 { + panic("HistoChunk.NotCounterReset() called on an empty chunk") + } + return (c.Bytes()[2] & notCounterResetMask) != 0 +} + // Compact implements the Chunk interface. func (c *HistoChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { @@ -208,7 +241,6 @@ type HistoAppender struct { b *bstream // Metadata: - counterReset bool schema int32 zeroThreshold float64 posSpans, negSpans []histogram.Span @@ -260,6 +292,7 @@ func (a *HistoAppender) Append(int64, float64) {} // * the last sample in the chunk was stale while the current sample is not stale // It returns an additional boolean set to true if it is not appendable because of a counter reset. // If the given sample is stale, it will always return true. +// If counterReset is true, okToAppend MUST be false. func (a *HistoAppender) Appendable(h histogram.SparseHistogram) (posInterjections []Interjection, negInterjections []Interjection, okToAppend bool, counterReset bool) { if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. @@ -380,7 +413,7 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans // histogram is properly structured. E.g. that the number of pos/neg buckets // used corresponds to the number conveyed by the pos/neg span structures. // callers must call Appendable() first and act accordingly! -func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) { +func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { var tDelta, cntDelta, zcntDelta int64 num := binary.BigEndian.Uint16(a.b.bytes()) @@ -390,17 +423,12 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram, co h = histogram.SparseHistogram{Sum: h.Sum} } - if num != 0 && counterReset { - panic("got counterReset=true for partially filled chunk in HistoAppender.AppendHistogram") - } - switch num { case 0: // the first append gets the privilege to dictate the metadata // but it's also responsible for encoding it into the chunk! - writeHistoChunkMeta(a.b, counterReset, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) - a.counterReset = true + writeHistoChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) a.schema = h.Schema a.zeroThreshold = h.ZeroThreshold a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans @@ -503,7 +531,8 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection // it again with the new span layout. This can probably be done in-place // by editing the chunk. But let's first see how expensive it is in the // big picture. - it := newHistoIterator(a.b.bytes()) + byts := a.b.bytes() + it := newHistoIterator(byts) hc := NewHistoChunk() app, err := hc.Appender() if err != nil { @@ -511,7 +540,6 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection } numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) - counterReset := a.counterReset for it.Next() { tOld, hOld := it.AtHistogram() @@ -530,10 +558,12 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection if len(negInterjections) > 0 { hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negBuckets, negInterjections) } - app.AppendHistogram(tOld, hOld, counterReset) - - counterReset = false // We need it only for the first sample. + app.AppendHistogram(tOld, hOld) } + + // Set the flags. + hc.SetCounterReset(byts[2]&counterResetMask != 0) + hc.SetNotCounterReset(byts[2]¬CounterResetMask != 0) return hc, app } @@ -674,7 +704,7 @@ func (it *histoIterator) Next() bool { // first read is responsible for reading chunk metadata and initializing fields that depend on it // We give counter reset info at chunk level, hence we discard it here. - _, schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) + schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) if err != nil { it.err = err return false diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index 9308993ce5..57aaf7bdf2 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -18,16 +18,11 @@ import ( ) const ( - counterResetMask = 0b10000000 + counterResetMask = 0b10000000 + notCounterResetMask = 0b01000000 ) -func writeHistoChunkMeta(b *bstream, counterReset bool, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { - header := byte(0) - if counterReset { - header |= counterResetMask - } - b.bytes()[2] = header - +func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { putInt64VBBucket(b, int64(schema)) putFloat64VBBucket(b, zeroThreshold) putHistoChunkMetaSpans(b, posSpans) @@ -42,15 +37,12 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) { } } -func readHistoChunkMeta(b *bstreamReader) (counterReset bool, schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) { - var header byte - header, err = b.ReadByte() +func readHistoChunkMeta(b *bstreamReader) (schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) { + _, err = b.ReadByte() // The header. if err != nil { return } - counterReset = (header & counterResetMask) != 0 - v, err := readInt64VBBucket(b) if err != nil { return diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index cb5a3734eb..71210e57a7 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -42,7 +42,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { }, PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) } - app.AppendHistogram(ts, h, false) + app.AppendHistogram(ts, h) exp = append(exp, res{t: ts, h: h}) require.Equal(t, 1, c.NumSamples()) @@ -52,7 +52,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { h.ZeroCount++ h.Sum = 24.4 h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) - app.AppendHistogram(ts, h, false) + app.AppendHistogram(ts, h) exp = append(exp, res{t: ts, h: h}) require.Equal(t, 2, c.NumSamples()) @@ -65,7 +65,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { h.ZeroCount += 2 h.Sum = 24.4 h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) - app.AppendHistogram(ts, h, false) + app.AppendHistogram(ts, h) exp = append(exp, res{t: ts, h: h}) require.Equal(t, 3, c.NumSamples()) @@ -142,7 +142,7 @@ func TestHistoChunkBucketChanges(t *testing.T) { PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) } - app.AppendHistogram(ts1, h1, false) + app.AppendHistogram(ts1, h1) require.Equal(t, 1, c.NumSamples()) // Add a new histogram that has expanded buckets. @@ -169,7 +169,7 @@ func TestHistoChunkBucketChanges(t *testing.T) { require.True(t, ok) // Only new buckets came in. require.False(t, cr) c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) - app.AppendHistogram(ts2, h2, false) + app.AppendHistogram(ts2, h2) require.Equal(t, 2, c.NumSamples()) @@ -216,7 +216,7 @@ func TestHistoChunkAppendable(t *testing.T) { PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) } - app.AppendHistogram(ts, h1, false) + app.AppendHistogram(ts, h1) require.Equal(t, 1, c.NumSamples()) { // New histogram that has more buckets. diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 03ca08e8f8..24cec61cbd 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -150,7 +150,7 @@ type xorAppender struct { trailing uint8 } -func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) { +func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { //panic("cannot call xorAppender.AppendHistogram().") } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index cc200e9e28..7378cef1a9 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -611,10 +611,10 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen app, _ := s.app.(*chunkenc.HistoAppender) var ( posInterjections, negInterjections []chunkenc.Interjection - ok, counterReset bool + okToAppend, counterReset bool ) if app != nil { - posInterjections, negInterjections, ok, counterReset = app.Appendable(sh) + posInterjections, negInterjections, okToAppend, counterReset = app.Appendable(sh) } c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper) @@ -623,11 +623,11 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen } if !chunkCreated { - // we have 3 cases here - // !ok -> we need to cut a new chunk - // ok but we have interjections -> existing chunk needs recoding before we can append our histogram - // ok and no interjections -> chunk is ready to support our histogram - if !ok { + // We have 3 cases here + // !okToAppend -> we need to cut a new chunk + // okToAppend but we have interjections -> existing chunk needs recoding before we can append our histogram + // okToAppend and no interjections -> chunk is ready to support our histogram + if !okToAppend || counterReset { c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper) chunkCreated = true } else if len(posInterjections) > 0 || len(negInterjections) > 0 { @@ -642,7 +642,16 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen } } - s.app.AppendHistogram(t, sh, counterReset) + if chunkCreated { + hc := s.headChunk.chunk.(*chunkenc.HistoChunk) + if counterReset { + hc.SetCounterReset(true) + } else if okToAppend { + hc.SetNotCounterReset(true) + } + } + + s.app.AppendHistogram(t, sh) s.sparseHistogramSeries = true c.maxTime = t diff --git a/tsdb/querier.go b/tsdb/querier.go index fd46e591f5..f6ccc2df3c 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -717,16 +717,16 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { h histogram.SparseHistogram ) if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS { - counterReset := false if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistoChunk); ok { - counterReset = hc.CounterReset() + newChunk.(*chunkenc.HistoChunk).SetCounterReset(hc.CounterReset()) + newChunk.(*chunkenc.HistoChunk).SetNotCounterReset(hc.NotCounterReset()) } t, h = p.currDelIter.AtHistogram() p.curr.MinTime = t - app.AppendHistogram(t, h.Copy(), counterReset) + app.AppendHistogram(t, h.Copy()) for p.currDelIter.Next() { t, h = p.currDelIter.AtHistogram() - app.AppendHistogram(t, h.Copy(), false) + app.AppendHistogram(t, h.Copy()) } } else { t, v = p.currDelIter.At() From 5d4dc7e413731d31fa5e24f20c5107bb6e18aecc Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 7 Oct 2021 19:53:24 +0530 Subject: [PATCH 6/6] Convert the header into an enum Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/histo.go | 60 ++++++++++++++----------------------- tsdb/chunkenc/histo_meta.go | 5 ---- tsdb/head_append.go | 6 ++-- tsdb/querier.go | 3 +- 4 files changed, 27 insertions(+), 47 deletions(-) diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 03de2797c6..db83c53e3a 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -106,45 +106,30 @@ func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, return readHistoChunkMeta(&b) } -// SetCounterReset sets the counter reset flag to 1 if the passed argument is true, 0 otherwise. -func (c *HistoChunk) SetCounterReset(counterReset bool) { - bytes := c.Bytes() - header := bytes[2] - if counterReset { - header |= counterResetMask - } else if (header & counterResetMask) != 0 { - header ^= counterResetMask +// CounterResetHeader defines the first 2 bits of the chunk header. +type CounterResetHeader byte + +const ( + CounterReset CounterResetHeader = 0b10000000 + NotCounterReset CounterResetHeader = 0b01000000 + GaugeType CounterResetHeader = 0b11000000 + UnknownCounterReset CounterResetHeader = 0b00000000 +) + +// SetCounterResetHeader sets the counter reset header. +func (c *HistoChunk) SetCounterResetHeader(h CounterResetHeader) { + switch h { + case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset: + bytes := c.Bytes() + bytes[2] = (bytes[2] & 0b00111111) | byte(h) + default: + panic("invalid CounterResetHeader type") } - bytes[2] = header } -// CounterReset returns true if this new chunk was created because of a counter reset. -func (c *HistoChunk) CounterReset() bool { - if c.NumSamples() == 0 { - panic("HistoChunk.CounterReset() called on an empty chunk") - } - return (c.Bytes()[2] & counterResetMask) != 0 -} - -// SetNotCounterReset sets the "not counter reset" flag to 1 if the passed argument is true, 0 otherwise. -func (c *HistoChunk) SetNotCounterReset(notCounterReset bool) { - bytes := c.Bytes() - header := bytes[2] - if notCounterReset { - header |= notCounterResetMask - } else if (header & notCounterResetMask) != 0 { - header ^= notCounterResetMask - } - bytes[2] = header -} - -// NotCounterReset returns true if this new chunk definitely did not have counter reset -// from the earlier chunk. -func (c *HistoChunk) NotCounterReset() bool { - if c.NumSamples() == 0 { - panic("HistoChunk.NotCounterReset() called on an empty chunk") - } - return (c.Bytes()[2] & notCounterResetMask) != 0 +// GetCounterResetHeader returns the info about the first 2 bits of the chunk header. +func (c *HistoChunk) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(c.Bytes()[2] & 0b11000000) } // Compact implements the Chunk interface. @@ -562,8 +547,7 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection } // Set the flags. - hc.SetCounterReset(byts[2]&counterResetMask != 0) - hc.SetNotCounterReset(byts[2]¬CounterResetMask != 0) + hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000)) return hc, app } diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index 57aaf7bdf2..b685035fb8 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -17,11 +17,6 @@ import ( "github.com/prometheus/prometheus/pkg/histogram" ) -const ( - counterResetMask = 0b10000000 - notCounterResetMask = 0b01000000 -) - func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { putInt64VBBucket(b, int64(schema)) putFloat64VBBucket(b, zeroThreshold) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 7378cef1a9..1c22c86c0e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -644,11 +644,13 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen if chunkCreated { hc := s.headChunk.chunk.(*chunkenc.HistoChunk) + header := chunkenc.UnknownCounterReset if counterReset { - hc.SetCounterReset(true) + header = chunkenc.CounterReset } else if okToAppend { - hc.SetNotCounterReset(true) + header = chunkenc.NotCounterReset } + hc.SetCounterResetHeader(header) } s.app.AppendHistogram(t, sh) diff --git a/tsdb/querier.go b/tsdb/querier.go index f6ccc2df3c..7e4c3269b5 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -718,8 +718,7 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { ) if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS { if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistoChunk); ok { - newChunk.(*chunkenc.HistoChunk).SetCounterReset(hc.CounterReset()) - newChunk.(*chunkenc.HistoChunk).SetNotCounterReset(hc.NotCounterReset()) + newChunk.(*chunkenc.HistoChunk).SetCounterResetHeader(hc.GetCounterResetHeader()) } t, h = p.currDelIter.AtHistogram() p.curr.MinTime = t