From 1dd22ed655587d17752d55af44e674e04e666c2a Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Fri, 1 Oct 2021 13:41:51 +0530 Subject: [PATCH] Support stale samples for sparse histograms (#9352) * Support stale samples for sparse histograms Signed-off-by: Ganesh Vernekar * Don't cut a new chunk for every stale sample Signed-off-by: Ganesh Vernekar * Update comments for HistoAppender.Appendable Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/histo.go | 55 ++++++++++++++++++--- tsdb/chunkenc/histo_test.go | 6 --- tsdb/head_append.go | 7 ++- tsdb/head_test.go | 96 ++++++++++++++++++++++++++++++++++++- 4 files changed, 149 insertions(+), 15 deletions(-) diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 82700b8b4b..09d50326ba 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -49,6 +49,7 @@ import ( "math/bits" "github.com/prometheus/prometheus/pkg/histogram" + "github.com/prometheus/prometheus/pkg/value" ) const () @@ -247,7 +248,18 @@ func (a *HistoAppender) Append(int64, float64) {} // * the zerobucket threshold has changed // * any buckets disappeared // * there was a counter reset in the count of observations or in any bucket, including the zero bucket +// * the last sample in the chunk was stale while the current sample is not stale +// If the given sample is stale, it will always return true. func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) { + if value.IsStaleNaN(h.Sum) { + // This is a stale sample whose buckets and spans don't matter. + return nil, nil, true + } + if value.IsStaleNaN(a.sum) { + // If the last sample was stale, then we can only accept stale samples in this chunk. + return nil, nil, false + } + if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { return nil, nil, false } @@ -350,6 +362,12 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { var tDelta, cntDelta, zcntDelta int64 num := binary.BigEndian.Uint16(a.b.bytes()) + if value.IsStaleNaN(h.Sum) { + // Emptying out other fields to write no buckets, and an empty meta in case of + // first histogram in the chunk. + h = histogram.SparseHistogram{Sum: h.Sum} + } + switch num { case 0: // the first append gets the privilege to dictate the metadata @@ -377,11 +395,14 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { putVarint(a.b, a.buf64, buck) } case 1: - tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) + if value.IsStaleNaN(h.Sum) { + cntDelta, zcntDelta = 0, 0 + } + putVarint(a.b, a.buf64, tDelta) putVarint(a.b, a.buf64, cntDelta) putVarint(a.b, a.buf64, zcntDelta) @@ -398,6 +419,7 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { putVarint(a.b, a.buf64, delta) a.negbucketsDelta[i] = delta } + default: tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) @@ -407,6 +429,10 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { cntDod := cntDelta - a.cntDelta zcntDod := zcntDelta - a.zcntDelta + if value.IsStaleNaN(h.Sum) { + cntDod, zcntDod = 0, 0 + } + putInt64VBBucket(a.b, tDod) putInt64VBBucket(a.b, cntDod) putInt64VBBucket(a.b, zcntDod) @@ -438,9 +464,7 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { a.posbuckets, a.negbuckets = h.PositiveBuckets, h.NegativeBuckets // note that the bucket deltas were already updated above - a.sum = h.Sum - } // Recode converts the current chunk to accommodate an expansion of the set of @@ -566,6 +590,9 @@ func (it *histoIterator) ChunkEncoding() Encoding { } func (it *histoIterator) AtHistogram() (int64, histogram.SparseHistogram) { + if value.IsStaleNaN(it.sum) { + return it.t, histogram.SparseHistogram{Sum: it.sum} + } return it.t, histogram.SparseHistogram{ Count: it.cnt, ZeroCount: it.zcnt, @@ -625,10 +652,14 @@ func (it *histoIterator) Next() bool { it.zeroThreshold = zeroThreshold it.posSpans, it.negSpans = posSpans, negSpans numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) - it.posbuckets = make([]int64, numPosBuckets) - it.negbuckets = make([]int64, numNegBuckets) - it.posbucketsDelta = make([]int64, numPosBuckets) - it.negbucketsDelta = make([]int64, numNegBuckets) + if numPosBuckets > 0 { + it.posbuckets = make([]int64, numPosBuckets) + it.posbucketsDelta = make([]int64, numPosBuckets) + } + if numNegBuckets > 0 { + it.negbuckets = make([]int64, numNegBuckets) + it.negbucketsDelta = make([]int64, numNegBuckets) + } // now read actual data @@ -711,6 +742,11 @@ func (it *histoIterator) Next() bool { return false } + if value.IsStaleNaN(it.sum) { + it.numRead++ + return true + } + for i := range it.posbuckets { delta, err := binary.ReadVarint(&it.br) if err != nil { @@ -764,6 +800,11 @@ func (it *histoIterator) Next() bool { return false } + if value.IsStaleNaN(it.sum) { + it.numRead++ + return true + } + for i := range it.posbuckets { dod, err := readInt64VBBucket(&it.br) if err != nil { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index 7b9ba7bded..ccbafe4472 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -41,8 +41,6 @@ func TestHistoChunkSameBuckets(t *testing.T) { {Offset: 1, Length: 2}, }, PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) - NegativeSpans: nil, - NegativeBuckets: []int64{}, } app.AppendHistogram(ts, h) exp = append(exp, res{t: ts, h: h}) @@ -142,8 +140,6 @@ func TestHistoChunkBucketChanges(t *testing.T) { {Offset: 1, Length: 1}, }, PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) - NegativeSpans: nil, - NegativeBuckets: []int64{}, } app.AppendHistogram(ts1, h1) @@ -217,8 +213,6 @@ func TestHistoChunkAppendable(t *testing.T) { {Offset: 1, Length: 1}, }, PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) - NegativeSpans: nil, - NegativeBuckets: []int64{}, } app.AppendHistogram(ts, h1) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 52b53e8ba9..a73231db71 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -266,6 +267,10 @@ func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64 } } + if value.IsStaleNaN(v) && s.sparseHistogramSeries { + return a.AppendHistogram(ref, lset, t, histogram.SparseHistogram{Sum: v}) + } + s.Lock() if err := s.appendable(t, v); err != nil { s.Unlock() @@ -389,9 +394,9 @@ func (a *headAppender) AppendHistogram(ref uint64, lset labels.Labels, t int64, if err != nil { return 0, err } + s.sparseHistogramSeries = true if created { a.head.metrics.sparseHistogramSeries.Inc() - s.sparseHistogramSeries = true a.series = append(a.series, record.RefSeries{ Ref: s.ref, Labels: lset, diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 9b9dbc749e..7325102299 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -2605,7 +2606,6 @@ func generateHistograms(n int) (r []histogram.SparseHistogram) { {Offset: 1, Length: 2}, }, PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, - NegativeBuckets: []int64{}, }) } @@ -2805,3 +2805,97 @@ func TestSparseHistogramMetrics(t *testing.T) { require.Equal(t, float64(expHistSeries), prom_testutil.ToFloat64(head.metrics.sparseHistogramSeries)) require.Equal(t, float64(0), prom_testutil.ToFloat64(head.metrics.sparseHistogramSamplesTotal)) // Counter reset. } + +func TestSparseHistogramStaleSample(t *testing.T) { + l := labels.Labels{{Name: "a", Value: "b"}} + numHistograms := 20 + head, _ := newTestHead(t, 100000, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + type timedHist struct { + t int64 + h histogram.SparseHistogram + } + expHists := make([]timedHist, 0, numHistograms) + + testQuery := func(numStale int) { + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, q.Close()) + }) + + ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + + it := s.Iterator() + actHists := make([]timedHist, 0, len(expHists)) + for it.Next() { + t, h := it.AtHistogram() + actHists = append(actHists, timedHist{t, h.Copy()}) + } + + // We cannot compare StaleNAN with require.Equal, hence checking each histogram manually. + require.Equal(t, len(expHists), len(actHists)) + actNumStale := 0 + for i, eh := range expHists { + ah := actHists[i] + if value.IsStaleNaN(eh.h.Sum) { + actNumStale++ + require.True(t, value.IsStaleNaN(ah.h.Sum)) + // To make require.Equal work. + ah.h.Sum = 0 + eh.h.Sum = 0 + } + require.Equal(t, eh, ah) + } + require.Equal(t, numStale, actNumStale) + } + + // Adding stale in the same appender. + app := head.Appender(context.Background()) + for _, h := range generateHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, 100*int64(len(expHists)), h) + require.NoError(t, err) + expHists = append(expHists, timedHist{100 * int64(len(expHists)), h}) + } + // +1 so that delta-of-delta is not 0. + _, err := app.Append(0, l, 100*int64(len(expHists))+1, math.Float64frombits(value.StaleNaN)) + require.NoError(t, err) + expHists = append(expHists, timedHist{100*int64(len(expHists)) + 1, histogram.SparseHistogram{Sum: math.Float64frombits(value.StaleNaN)}}) + require.NoError(t, app.Commit()) + + // Only 1 chunk in the memory, no m-mapped chunk. + s := head.series.getByHash(l.Hash(), l) + require.NotNil(t, s) + require.Equal(t, 0, len(s.mmappedChunks)) + testQuery(1) + + // Adding stale in different appender and continuing series after a stale sample. + app = head.Appender(context.Background()) + for _, h := range generateHistograms(2 * numHistograms)[numHistograms:] { + _, err := app.AppendHistogram(0, l, 100*int64(len(expHists)), h) + require.NoError(t, err) + expHists = append(expHists, timedHist{100 * int64(len(expHists)), h}) + } + require.NoError(t, app.Commit()) + + app = head.Appender(context.Background()) + // +1 so that delta-of-delta is not 0. + _, err = app.Append(0, l, 100*int64(len(expHists))+1, math.Float64frombits(value.StaleNaN)) + require.NoError(t, err) + expHists = append(expHists, timedHist{100*int64(len(expHists)) + 1, histogram.SparseHistogram{Sum: math.Float64frombits(value.StaleNaN)}}) + require.NoError(t, app.Commit()) + + // Total 2 chunks, 1 m-mapped. + s = head.series.getByHash(l.Hash(), l) + require.NotNil(t, s) + require.Equal(t, 1, len(s.mmappedChunks)) + testQuery(2) +}