From 3c2ea91a8352e6cf9916222802dae4337beefb5e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 4 Jan 2023 15:54:15 +0530 Subject: [PATCH] tsdb: Test gauge float histograms Signed-off-by: Ganesh Vernekar --- promql/engine_test.go | 8 +- tsdb/chunkenc/float_histogram.go | 18 +- tsdb/chunkenc/float_histogram_test.go | 1 + tsdb/head.go | 41 ++++- tsdb/head_test.go | 232 +++++++++++++++++++------- 5 files changed, 230 insertions(+), 70 deletions(-) diff --git a/promql/engine_test.go b/promql/engine_test.go index 28db8af191..0582fd5a4e 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3158,10 +3158,12 @@ func TestSparseHistogramRate(t *testing.T) { Schema: 1, ZeroThreshold: 0.001, ZeroCount: 1. / 15., - Count: 4. / 15., + Count: 8. / 15., Sum: 1.226666666666667, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } @@ -3199,10 +3201,12 @@ func TestSparseFloatHistogramRate(t *testing.T) { Schema: 1, ZeroThreshold: 0.001, ZeroCount: 1. / 15., - Count: 4. / 15., + Count: 8. / 15., Sum: 1.226666666666667, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 10b20a1ca9..538af364ae 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -223,13 +223,11 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { // If the sample is a gauge histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: -// -// • The schema has changed. -// • The threshold for the zero bucket has changed. -// • Any buckets have disappeared. -// • There was a counter reset in the count of observations or in any bucket, -// including the zero bucket. -// • The last sample in the chunk was stale while the current sample is not stale. +// - The schema has changed. +// - The threshold for the zero bucket has changed. +// - Any buckets have disappeared. +// - There was a counter reset in the count of observations or in any bucket, including the zero bucket. +// - The last sample in the chunk was stale while the current sample is not stale. // // The method returns an additional boolean set to true if it is not appendable // because of a counter reset. If the given sample is stale, it is always ok to @@ -300,9 +298,9 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( // This method must be only used for gauge histograms. // // The chunk is not appendable in the following cases: -// - The schema has changed. -// - The threshold for the zero bucket has changed. -// - The last sample in the chunk was stale while the current sample is not stale. +// - The schema has changed. +// - The threshold for the zero bucket has changed. +// - The last sample in the chunk was stale while the current sample is not stale. func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( positiveInterjections, negativeInterjections []Interjection, backwardPositiveInterjections, backwardNegativeInterjections []Interjection, diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index cd1a1d7f61..03097d931b 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -385,6 +385,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) { app.AppendFloatHistogram(ts, h1.Copy()) require.Equal(t, 1, c.NumSamples()) + c.(*FloatHistogramChunk).SetCounterResetHeader(GaugeType) { // Schema change. h2 := h1.Copy() diff --git a/tsdb/head.go b/tsdb/head.go index 8ec8a010d9..3362d74c36 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -17,6 +17,7 @@ import ( "fmt" "io" "math" + "math/rand" "path/filepath" "sync" "time" @@ -2026,7 +2027,7 @@ func (h *Head) updateWALReplayStatusRead(current int) { func GenerateTestHistograms(n int) (r []*histogram.Histogram) { for i := 0; i < n; i++ { r = append(r, &histogram.Histogram{ - Count: 5 + uint64(i*4), + Count: 10 + uint64(i*8), ZeroCount: 2 + uint64(i), ZeroThreshold: 0.001, Sum: 18.4 * float64(i+1), @@ -2036,6 +2037,11 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { {Offset: 1, Length: 2}, }, PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{int64(i + 1), 1, -1, 0}, }) } @@ -2045,7 +2051,7 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { for i := 0; i < n; i++ { r = append(r, &histogram.FloatHistogram{ - Count: 5 + float64(i*4), + Count: 10 + float64(i*8), ZeroCount: 2 + float64(i), ZeroThreshold: 0.001, Sum: 18.4 * float64(i+1), @@ -2055,6 +2061,37 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { {Offset: 1, Length: 2}, }, PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, + }) + } + + return r +} + +func GenerateTestGaugeHistograms(n int) (r []*histogram.FloatHistogram) { + for x := 0; x < n; x++ { + i := rand.Intn(n) + r = append(r, &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Count: 10 + float64(i*8), + ZeroCount: 2 + float64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, }) } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6d7b82e1b7..545bf5046e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -110,7 +110,9 @@ func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) { func readTestWAL(t testing.TB, dir string) (recs []interface{}) { sr, err := wlog.NewSegmentsReader(dir) require.NoError(t, err) - defer sr.Close() + defer func() { + require.NoError(t, sr.Close()) + }() var dec record.Decoder r := wlog.NewReader(sr) @@ -127,6 +129,14 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { samples, err := dec.Samples(rec, nil) require.NoError(t, err) recs = append(recs, samples) + case record.HistogramSamples: + samples, err := dec.HistogramSamples(rec, nil) + require.NoError(t, err) + recs = append(recs, samples) + case record.FloatHistogramSamples: + samples, err := dec.FloatHistogramSamples(rec, nil) + require.NoError(t, err) + recs = append(recs, samples) case record.Tombstones: tstones, err := dec.Tombstones(rec, nil) require.NoError(t, err) @@ -2824,6 +2834,7 @@ func TestAppendHistogram(t *testing.T) { ingestTs := int64(0) app := head.Appender(context.Background()) + // Integer histograms. type timedHistogram struct { t int64 h *histogram.Histogram @@ -2844,6 +2855,7 @@ func TestAppendHistogram(t *testing.T) { t int64 h *histogram.FloatHistogram } + // Float counter histograms. expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms) for _, fh := range GenerateTestFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) @@ -2855,6 +2867,18 @@ func TestAppendHistogram(t *testing.T) { app = head.Appender(context.Background()) } } + + // Float gauge histograms. + for _, fh := range GenerateTestGaugeHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) + require.NoError(t, err) + expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) + ingestTs++ + if ingestTs%50 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } require.NoError(t, app.Commit()) q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) @@ -2898,7 +2922,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { // Series with only histograms. s1 := labels.FromStrings("a", "b1") k1 := s1.String() - numHistograms := 450 + numHistograms := 300 exp := map[string][]tsdbutil.Sample{} app := head.Appender(context.Background()) ts := int64(0) @@ -2916,26 +2940,34 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } } require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - for _, h := range GenerateTestFloatHistograms(numHistograms) { - h.Count = h.Count * 2 - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s1, ts, nil, h) - require.NoError(t, err) - exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) - ts++ - if ts%5 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) + for _, gauge := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.FloatHistogram + if gauge { + hists = GenerateTestGaugeHistograms(numHistograms) + } else { + hists = GenerateTestFloatHistograms(numHistograms) } + for _, h := range hists { + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s1, ts, nil, h) + require.NoError(t, err) + exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) + ts++ + if ts%5 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) } - require.NoError(t, app.Commit()) // There should be 7 mmap chunks in s1. ms := head.series.getByHash(s1.Hash(), s1) - require.Len(t, ms.mmappedChunks, 7) - expMmapChunks := make([]*mmappedChunk, 0, 7) + require.Len(t, ms.mmappedChunks, 8) + expMmapChunks := make([]*mmappedChunk, 0, 8) for _, mmap := range ms.mmappedChunks { require.Greater(t, mmap.numSamples, uint16(0)) cpy := *mmap @@ -2972,51 +3004,68 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } } require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - for _, h := range GenerateTestFloatHistograms(100) { - ts++ - h.Count = h.Count * 2 - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s2, int64(ts), nil, h) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) - if ts%20 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - // Add some float. - for i := 0; i < 10; i++ { - ts++ - _, err := app.Append(0, s2, int64(ts), float64(ts)) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) - } - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) + for _, gauge := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.FloatHistogram + if gauge { + hists = GenerateTestGaugeHistograms(100) + } else { + hists = GenerateTestFloatHistograms(100) } + for _, h := range hists { + ts++ + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s2, int64(ts), nil, h) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) + if ts%20 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + // Add some float. + for i := 0; i < 10; i++ { + ts++ + _, err := app.Append(0, s2, int64(ts), float64(ts)) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) + } + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) } - require.NoError(t, app.Commit()) // Restart head. require.NoError(t, head.Close()) - w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) - require.NoError(t, err) - head, err = NewHead(nil, nil, w, nil, head.opts, nil) - require.NoError(t, err) - require.NoError(t, head.Init(0)) + startHead := func() { + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + } + startHead() // Checking contents of s1. ms = head.series.getByHash(s1.Hash(), s1) require.Equal(t, expMmapChunks, ms.mmappedChunks) - for _, mmap := range ms.mmappedChunks { - require.Greater(t, mmap.numSamples, uint16(0)) - } require.Equal(t, expHeadChunkSamples, ms.headChunk.chunk.NumSamples()) - q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) - require.NoError(t, err) - act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) - require.Equal(t, exp, act) + testQuery := func() { + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) + require.Equal(t, exp, act) + } + testQuery() + + // Restart with no mmap chunks to test WAL replay. + require.NoError(t, head.Close()) + require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) + startHead() + testQuery() } func TestChunkSnapshot(t *testing.T) { @@ -3522,7 +3571,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { if floatHisto { _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat()) } else { - _, err = app.AppendHistogram(0, l, ts, h, nil) + _, err = app.AppendHistogram(0, l, ts, h.Copy(), nil) } require.NoError(t, err) require.NoError(t, app.Commit()) @@ -3553,10 +3602,6 @@ func TestHistogramCounterResetHeader(t *testing.T) { } h := GenerateTestHistograms(1)[0] - if len(h.NegativeBuckets) == 0 { - h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...) - h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...) - } h.PositiveBuckets = []int64{100, 1, 1, 1} h.NegativeBuckets = []int64{100, 1, 1, 1} h.Count = 1000 @@ -4517,3 +4562,78 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { require.NoError(t, h.truncateOOO(0, 2)) require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime()) } + +func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { + l := labels.FromStrings("a", "b") + head, _ := newTestHead(t, 1000, false, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + ts := int64(0) + appendHistogram := func(h *histogram.FloatHistogram) { + ts++ + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, ts, nil, h.Copy()) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + hists := GenerateTestGaugeHistograms(5) + hists[0].CounterResetHint = histogram.UnknownCounterReset + appendHistogram(hists[0]) + appendHistogram(hists[1]) + appendHistogram(hists[2]) + hists[3].CounterResetHint = histogram.UnknownCounterReset + appendHistogram(hists[3]) + appendHistogram(hists[3]) + appendHistogram(hists[4]) + + checkHeaders := func() { + ms, _, err := head.getOrCreate(l.Hash(), l) + require.NoError(t, err) + require.Len(t, ms.mmappedChunks, 3) + expHeaders := []chunkenc.CounterResetHeader{ + chunkenc.UnknownCounterReset, + chunkenc.GaugeType, + chunkenc.UnknownCounterReset, + chunkenc.GaugeType, + } + for i, mmapChunk := range ms.mmappedChunks { + chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) + require.NoError(t, err) + require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } + checkHeaders() + + recs := readTestWAL(t, head.wal.Dir()) + require.Equal(t, []interface{}{ + []record.RefSeries{ + { + Ref: 1, + Labels: labels.FromStrings("a", "b"), + }, + }, + []record.RefFloatHistogramSample{{Ref: 1, T: 1, FH: hists[0]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 2, FH: hists[1]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 3, FH: hists[2]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 4, FH: hists[3]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 5, FH: hists[3]}}, + []record.RefFloatHistogramSample{{Ref: 1, T: 6, FH: hists[4]}}, + }, recs) + + // Restart Head without mmap chunks to expect the WAL replay to recognize gauge histograms. + require.NoError(t, head.Close()) + require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) + + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + + checkHeaders() +}