diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index a9239d937c..4bba8421c2 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -667,7 +667,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. it := fhchk.Iterator(nil) bucketCount := 0 for it.Next() == chunkenc.ValFloatHistogram { - _, f := it.AtFloatHistogram() + _, f := it.AtFloatHistogram(nil) bucketCount += len(f.PositiveBuckets) bucketCount += len(f.NegativeBuckets) } @@ -682,7 +682,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. it := hchk.Iterator(nil) bucketCount := 0 for it.Next() == chunkenc.ValHistogram { - _, f := it.AtHistogram() + _, f := it.AtHistogram(nil) bucketCount += len(f.PositiveBuckets) bucketCount += len(f.NegativeBuckets) } @@ -745,11 +745,11 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []str fmt.Printf("%s %g %d\n", lbs, val, ts) } for it.Next() == chunkenc.ValFloatHistogram { - ts, fh := it.AtFloatHistogram() + ts, fh := it.AtFloatHistogram(nil) fmt.Printf("%s %s %d\n", lbs, fh.String(), ts) } for it.Next() == chunkenc.ValHistogram { - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) fmt.Printf("%s %s %d\n", lbs, h.String(), ts) } if it.Err() != nil { diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index bf89f2a471..19a92b3d5a 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -53,21 +53,28 @@ type FloatHistogram struct { // Copy returns a deep copy of the Histogram. func (h *FloatHistogram) Copy() *FloatHistogram { - c := *h + c := FloatHistogram{ + CounterResetHint: h.CounterResetHint, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.ZeroCount, + Count: h.Count, + Sum: h.Sum, + } - if h.PositiveSpans != nil { + if len(h.PositiveSpans) != 0 { c.PositiveSpans = make([]Span, len(h.PositiveSpans)) copy(c.PositiveSpans, h.PositiveSpans) } - if h.NegativeSpans != nil { + if len(h.NegativeSpans) != 0 { c.NegativeSpans = make([]Span, len(h.NegativeSpans)) copy(c.NegativeSpans, h.NegativeSpans) } - if h.PositiveBuckets != nil { + if len(h.PositiveBuckets) != 0 { c.PositiveBuckets = make([]float64, len(h.PositiveBuckets)) copy(c.PositiveBuckets, h.PositiveBuckets) } - if h.NegativeBuckets != nil { + if len(h.NegativeBuckets) != 0 { c.NegativeBuckets = make([]float64, len(h.NegativeBuckets)) copy(c.NegativeBuckets, h.NegativeBuckets) } @@ -75,6 +82,29 @@ func (h *FloatHistogram) Copy() *FloatHistogram { return &c } +// CopyTo makes a deep copy into the given FloatHistogram. +// The destination object has to be a non-nil pointer. +func (h *FloatHistogram) CopyTo(to *FloatHistogram) { + to.CounterResetHint = h.CounterResetHint + to.Schema = h.Schema + to.ZeroThreshold = h.ZeroThreshold + to.ZeroCount = h.ZeroCount + to.Count = h.Count + to.Sum = h.Sum + + to.PositiveSpans = resize(to.PositiveSpans, len(h.PositiveSpans)) + copy(to.PositiveSpans, h.PositiveSpans) + + to.NegativeSpans = resize(to.NegativeSpans, len(h.NegativeSpans)) + copy(to.NegativeSpans, h.NegativeSpans) + + to.PositiveBuckets = resize(to.PositiveBuckets, len(h.PositiveBuckets)) + copy(to.PositiveBuckets, h.PositiveBuckets) + + to.NegativeBuckets = resize(to.NegativeBuckets, len(h.NegativeBuckets)) + copy(to.NegativeBuckets, h.NegativeBuckets) +} + // CopyToSchema works like Copy, but the returned deep copy has the provided // target schema, which must be ≤ the original schema (i.e. it must have a lower // resolution). diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index 3d20960f6c..49fb77ab0b 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -142,6 +142,118 @@ func TestFloatHistogramMul(t *testing.T) { } } +func TestFloatHistogramCopy(t *testing.T) { + cases := []struct { + name string + orig *FloatHistogram + expected *FloatHistogram + }{ + { + name: "without buckets", + orig: &FloatHistogram{}, + expected: &FloatHistogram{}, + }, + { + name: "with buckets", + orig: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + expected: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &FloatHistogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]float64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]float64, 0, 1), + }, + expected: &FloatHistogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := tcase.orig.Copy() + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyFHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func TestFloatHistogramCopyTo(t *testing.T) { + cases := []struct { + name string + orig *FloatHistogram + expected *FloatHistogram + }{ + { + name: "without buckets", + orig: &FloatHistogram{}, + expected: &FloatHistogram{}, + }, + { + name: "with buckets", + orig: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + expected: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &FloatHistogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]float64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]float64, 0, 1), + }, + expected: &FloatHistogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := &FloatHistogram{} + tcase.orig.CopyTo(hCopy) + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyFHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func assertDeepCopyFHSpans(t *testing.T, orig, hCopy, expected *FloatHistogram) { + // Do an in-place expansion of an original spans slice. + orig.PositiveSpans = expandSpans(orig.PositiveSpans) + orig.PositiveSpans[len(orig.PositiveSpans)-1] = Span{1, 2} + + hCopy.PositiveSpans = expandSpans(hCopy.PositiveSpans) + expected.PositiveSpans = expandSpans(expected.PositiveSpans) + // Expand the copy spans and assert that modifying the original has not affected the copy. + require.Equal(t, expected, hCopy) +} + func TestFloatHistogramDiv(t *testing.T) { cases := []struct { name string diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index f4d292b344..d40adeb620 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -83,7 +83,14 @@ type Span struct { // Copy returns a deep copy of the Histogram. func (h *Histogram) Copy() *Histogram { - c := *h + c := Histogram{ + CounterResetHint: h.CounterResetHint, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.ZeroCount, + Count: h.Count, + Sum: h.Sum, + } if len(h.PositiveSpans) != 0 { c.PositiveSpans = make([]Span, len(h.PositiveSpans)) @@ -105,6 +112,29 @@ func (h *Histogram) Copy() *Histogram { return &c } +// CopyTo makes a deep copy into the given Histogram object. +// The destination object has to be a non-nil pointer. +func (h *Histogram) CopyTo(to *Histogram) { + to.CounterResetHint = h.CounterResetHint + to.Schema = h.Schema + to.ZeroThreshold = h.ZeroThreshold + to.ZeroCount = h.ZeroCount + to.Count = h.Count + to.Sum = h.Sum + + to.PositiveSpans = resize(to.PositiveSpans, len(h.PositiveSpans)) + copy(to.PositiveSpans, h.PositiveSpans) + + to.NegativeSpans = resize(to.NegativeSpans, len(h.NegativeSpans)) + copy(to.NegativeSpans, h.NegativeSpans) + + to.PositiveBuckets = resize(to.PositiveBuckets, len(h.PositiveBuckets)) + copy(to.PositiveBuckets, h.PositiveBuckets) + + to.NegativeBuckets = resize(to.NegativeBuckets, len(h.NegativeBuckets)) + copy(to.NegativeBuckets, h.NegativeBuckets) +} + // String returns a string representation of the Histogram. func (h *Histogram) String() string { var sb strings.Builder diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index 9a64faaaae..14a948e644 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -604,6 +604,128 @@ func TestHistogramEquals(t *testing.T) { notEquals(*hStale, *hNaN) } +func TestHistogramCopy(t *testing.T) { + cases := []struct { + name string + orig *Histogram + expected *Histogram + }{ + { + name: "without buckets", + orig: &Histogram{}, + expected: &Histogram{}, + }, + { + name: "with buckets", + orig: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + expected: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &Histogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]int64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]int64, 0, 1), + }, + expected: &Histogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := tcase.orig.Copy() + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func TestHistogramCopyTo(t *testing.T) { + cases := []struct { + name string + orig *Histogram + expected *Histogram + }{ + { + name: "without buckets", + orig: &Histogram{}, + expected: &Histogram{}, + }, + { + name: "with buckets", + orig: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + expected: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &Histogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]int64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]int64, 0, 1), + }, + expected: &Histogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := &Histogram{} + tcase.orig.CopyTo(hCopy) + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func assertDeepCopyHSpans(t *testing.T, orig, hCopy, expected *Histogram) { + // Do an in-place expansion of an original spans slice. + orig.PositiveSpans = expandSpans(orig.PositiveSpans) + orig.PositiveSpans[len(orig.PositiveSpans)-1] = Span{1, 2} + + hCopy.PositiveSpans = expandSpans(hCopy.PositiveSpans) + expected.PositiveSpans = expandSpans(expected.PositiveSpans) + // Expand the copy spans and assert that modifying the original has not affected the copy. + require.Equal(t, expected, hCopy) +} + +func expandSpans(spans []Span) []Span { + n := len(spans) + if cap(spans) > n { + spans = spans[:n+1] + } else { + spans = append(spans, Span{}) + } + return spans +} + func TestHistogramCompact(t *testing.T) { cases := []struct { name string diff --git a/promql/engine.go b/promql/engine.go index 7165631e09..8c8afd181b 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -66,6 +66,9 @@ const ( // The getHPointSlice and getFPointSlice functions are called with an estimated size which often can be // over-estimated. maxPointsSliceSize = 5000 + + // The default buffer size for points used by the matrix selector. + matrixSelectorSliceSize = 16 ) type engineMetrics struct { @@ -1564,7 +1567,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio ev.currentSamples -= len(floats) + totalHPointSize(histograms) putFPointSlice(floats) - putHPointSlice(histograms) + putMatrixSelectorHPointSlice(histograms) // The absent_over_time function returns 0 or 1 series. So far, the matrix // contains multiple series. The following code will create a new series @@ -1940,6 +1943,13 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, no var ( fPointPool zeropool.Pool[[]FPoint] hPointPool zeropool.Pool[[]HPoint] + + // matrixSelectorHPool holds reusable histogram slices used by the matrix + // selector. The key difference between this pool and the hPointPool is that + // slices returned by this pool should never hold multiple copies of the same + // histogram pointer since histogram objects are reused across query evaluation + // steps. + matrixSelectorHPool zeropool.Pool[[]HPoint] ) func getFPointSlice(sz int) []FPoint { @@ -1982,6 +1992,20 @@ func putHPointSlice(p []HPoint) { } } +func getMatrixSelectorHPoints() []HPoint { + if p := matrixSelectorHPool.Get(); p != nil { + return p + } + + return make([]HPoint, 0, matrixSelectorSliceSize) +} + +func putMatrixSelectorHPointSlice(p []HPoint) { + if p != nil { + matrixSelectorHPool.Put(p[:0]) + } +} + // matrixSelector evaluates a *parser.MatrixSelector expression. func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) { var ( @@ -2106,13 +2130,13 @@ loop: // Values in the buffer are guaranteed to be smaller than maxt. if t >= mintHistograms { if histograms == nil { - histograms = getHPointSlice(16) + histograms = getMatrixSelectorHPoints() } n := len(histograms) if n < cap(histograms) { histograms = histograms[:n+1] } else { - histograms = append(histograms, HPoint{}) + histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}}) } histograms[n].T, histograms[n].H = buf.AtFloatHistogram(histograms[n].H) if value.IsStaleNaN(histograms[n].H.Sum) { @@ -2145,23 +2169,28 @@ loop: // The sought sample might also be in the range. switch soughtValueType { case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: - t := it.AtT() - if t == maxt { - _, h := it.AtFloatHistogram() - if !value.IsStaleNaN(h.Sum) { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - if histograms == nil { - histograms = getHPointSlice(16) - } - // The last sample comes directly from the iterator, so we need to copy it to - // avoid having the same reference twice in the buffer. - point := HPoint{T: t, H: h.Copy()} - histograms = append(histograms, point) - ev.currentSamples += point.size() - } + if it.AtT() != maxt { + break } + if histograms == nil { + histograms = getMatrixSelectorHPoints() + } + n := len(histograms) + if n < cap(histograms) { + histograms = histograms[:n+1] + } else { + histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}}) + } + histograms[n].T, histograms[n].H = it.AtFloatHistogram(histograms[n].H) + if value.IsStaleNaN(histograms[n].H.Sum) { + histograms = histograms[:n] + break + } + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + ev.currentSamples += histograms[n].size() + case chunkenc.ValFloat: t, f := it.At() if t == maxt && !value.IsStaleNaN(f) { diff --git a/promql/value.go b/promql/value.go index 28cf3fe31c..f129137d80 100644 --- a/promql/value.go +++ b/promql/value.go @@ -464,11 +464,11 @@ func (ssi *storageSeriesIterator) At() (t int64, v float64) { return ssi.currT, ssi.currF } -func (ssi *storageSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (ssi *storageSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic(errors.New("storageSeriesIterator: AtHistogram not supported")) } -func (ssi *storageSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (ssi *storageSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return ssi.currT, ssi.currH } diff --git a/rules/manager_test.go b/rules/manager_test.go index 6418c5a370..3feae51de6 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1397,7 +1397,7 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { it := s.Iterator(nil) require.Equal(t, chunkenc.ValFloatHistogram, it.Next()) - tsp, fh := it.AtFloatHistogram() + tsp, fh := it.AtFloatHistogram(nil) require.Equal(t, ts.Add(10*time.Second).UnixMilli(), tsp) require.Equal(t, expHist, fh) require.Equal(t, chunkenc.ValNone, it.Next()) diff --git a/storage/buffer.go b/storage/buffer.go index d19f841d43..b3c789e97d 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -24,6 +24,9 @@ import ( // BufferedSeriesIterator wraps an iterator with a look-back buffer. type BufferedSeriesIterator struct { + hReader histogram.Histogram + fhReader histogram.FloatHistogram + it chunkenc.Iterator buf *sampleRing delta int64 @@ -118,10 +121,10 @@ func (b *BufferedSeriesIterator) Next() chunkenc.ValueType { t, f := b.it.At() b.buf.addF(fSample{t: t, f: f}) case chunkenc.ValHistogram: - t, h := b.it.AtHistogram() + t, h := b.it.AtHistogram(&b.hReader) b.buf.addH(hSample{t: t, h: h}) case chunkenc.ValFloatHistogram: - t, fh := b.it.AtFloatHistogram() + t, fh := b.it.AtFloatHistogram(&b.fhReader) b.buf.addFH(fhSample{t: t, fh: fh}) default: panic(fmt.Errorf("BufferedSeriesIterator: unknown value type %v", b.valueType)) @@ -140,13 +143,13 @@ func (b *BufferedSeriesIterator) At() (int64, float64) { } // AtHistogram returns the current histogram element of the iterator. -func (b *BufferedSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - return b.it.AtHistogram() +func (b *BufferedSeriesIterator) AtHistogram(fh *histogram.Histogram) (int64, *histogram.Histogram) { + return b.it.AtHistogram(fh) } // AtFloatHistogram returns the current float-histogram element of the iterator. -func (b *BufferedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - return b.it.AtFloatHistogram() +func (b *BufferedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return b.it.AtFloatHistogram(fh) } // AtT returns the current timestamp of the iterator. @@ -378,7 +381,11 @@ func (it *SampleRingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (in if it.fh == nil { return it.t, it.h.ToFloat(fh) } - return it.t, it.fh + if fh != nil { + it.fh.CopyTo(fh) + return it.t, fh + } + return it.t, it.fh.Copy() } func (it *SampleRingIterator) AtT() int64 { @@ -672,7 +679,12 @@ func addH(s hSample, buf []hSample, r *sampleRing) []hSample { } } - buf[r.i] = s + buf[r.i].t = s.t + if buf[r.i].h == nil { + buf[r.i].h = s.h.Copy() + } else { + s.h.CopyTo(buf[r.i].h) + } r.l++ // Free head of the buffer of samples that just fell out of the range. @@ -711,7 +723,12 @@ func addFH(s fhSample, buf []fhSample, r *sampleRing) []fhSample { } } - buf[r.i] = s + buf[r.i].t = s.t + if buf[r.i].fh == nil { + buf[r.i].fh = s.fh.Copy() + } else { + s.fh.CopyTo(buf[r.i].fh) + } r.l++ // Free head of the buffer of samples that just fell out of the range. diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 12e6ff0f05..61074c2122 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -277,11 +277,11 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } func (m *mockSeriesIterator) Next() chunkenc.ValueType { return m.next() } func (m *mockSeriesIterator) Err() error { return m.err() } -func (m *mockSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (m *mockSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return 0, nil // Not really mocked. } -func (m *mockSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (m *mockSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return 0, nil // Not really mocked. } @@ -303,11 +303,11 @@ func (it *fakeSeriesIterator) At() (int64, float64) { return it.idx * it.step, 123 // Value doesn't matter. } -func (it *fakeSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *fakeSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return it.idx * it.step, &histogram.Histogram{} // Value doesn't matter. } -func (it *fakeSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *fakeSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return it.idx * it.step, &histogram.FloatHistogram{} // Value doesn't matter. } diff --git a/storage/memoized_iterator.go b/storage/memoized_iterator.go index cb9fdeef46..4ab2aa5d78 100644 --- a/storage/memoized_iterator.go +++ b/storage/memoized_iterator.go @@ -113,7 +113,7 @@ func (b *MemoizedSeriesIterator) Next() chunkenc.ValueType { b.prevFloatHistogram = nil case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: b.prevValue = 0 - b.prevTime, b.prevFloatHistogram = b.it.AtFloatHistogram() + b.prevTime, b.prevFloatHistogram = b.it.AtFloatHistogram(nil) } b.valueType = b.it.Next() @@ -133,7 +133,7 @@ func (b *MemoizedSeriesIterator) At() (int64, float64) { // AtFloatHistogram returns the current float-histogram element of the iterator. func (b *MemoizedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - return b.it.AtFloatHistogram() + return b.it.AtFloatHistogram(nil) } // Err returns the last encountered error. diff --git a/storage/merge.go b/storage/merge.go index bcb0f66fbc..38897449b5 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -525,11 +525,11 @@ func (c *chainSampleIterator) At() (t int64, v float64) { return c.curr.At() } -func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) { +func (c *chainSampleIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { if c.curr == nil { panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.") } - t, h := c.curr.AtHistogram() + t, h := c.curr.AtHistogram(h) // If the current sample is not consecutive with the previous one, we // cannot be sure anymore about counter resets for counter histograms. // TODO(beorn7): If a `NotCounterReset` sample is followed by a @@ -542,11 +542,11 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) { return t, h } -func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (c *chainSampleIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { if c.curr == nil { panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.") } - t, fh := c.curr.AtFloatHistogram() + t, fh := c.curr.AtFloatHistogram(fh) // If the current sample is not consecutive with the previous one, we // cannot be sure anymore about counter resets for counter histograms. // TODO(beorn7): If a `NotCounterReset` sample is followed by a diff --git a/storage/merge_test.go b/storage/merge_test.go index 02c2a34094..05e1c75278 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1173,10 +1173,10 @@ func TestChainSampleIteratorSeek(t *testing.T) { t, f := merged.At() actual = append(actual, fSample{t, f}) case chunkenc.ValHistogram: - t, h := merged.AtHistogram() + t, h := merged.AtHistogram(nil) actual = append(actual, hSample{t, h}) case chunkenc.ValFloatHistogram: - t, fh := merged.AtFloatHistogram() + t, fh := merged.AtFloatHistogram(nil) actual = append(actual, fhSample{t, fh}) } s, err := ExpandSamples(merged, nil) @@ -1259,10 +1259,10 @@ func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) { t, f := merged.At() actual = append(actual, fSample{t, f}) case chunkenc.ValHistogram: - t, h := merged.AtHistogram() + t, h := merged.AtHistogram(nil) actual = append(actual, hSample{t, h}) case chunkenc.ValFloatHistogram: - t, fh := merged.AtFloatHistogram() + t, fh := merged.AtFloatHistogram(nil) actual = append(actual, fhSample{t, fh}) } s, err := ExpandSamples(merged, nil) @@ -1629,11 +1629,11 @@ func (e errIterator) At() (int64, float64) { return 0, 0 } -func (e errIterator) AtHistogram() (int64, *histogram.Histogram) { +func (e errIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return 0, nil } -func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (e errIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return 0, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index ffab821a5f..9cf1ed8712 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -152,10 +152,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, Value: val, }) case chunkenc.ValHistogram: - ts, h := iter.AtHistogram() + ts, h := iter.AtHistogram(nil) histograms = append(histograms, HistogramToHistogramProto(ts, h)) case chunkenc.ValFloatHistogram: - ts, fh := iter.AtFloatHistogram() + ts, fh := iter.AtFloatHistogram(nil) histograms = append(histograms, FloatHistogramToHistogramProto(ts, fh)) default: return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType) @@ -475,7 +475,7 @@ func (c *concreteSeriesIterator) At() (t int64, v float64) { } // AtHistogram implements chunkenc.Iterator. -func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { if c.curValType != chunkenc.ValHistogram { panic("iterator is not on an integer histogram sample") } @@ -484,7 +484,7 @@ func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { } // AtFloatHistogram implements chunkenc.Iterator. -func (c *concreteSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { switch c.curValType { case chunkenc.ValHistogram: fh := c.series.histograms[c.histogramsCur] diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index dbc22a3771..0451953cb1 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -278,31 +278,31 @@ func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) { // Seek to the first sample with ts=1. require.Equal(t, chunkenc.ValHistogram, it.Seek(1)) - ts, v := it.AtHistogram() + ts, v := it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[0], v) // Seek one further, next sample still has ts=1. require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[1], v) // Seek again to 1 and make sure we stay where we are. require.Equal(t, chunkenc.ValHistogram, it.Seek(1)) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[1], v) // Another seek. require.Equal(t, chunkenc.ValHistogram, it.Seek(3)) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(3), ts) require.Equal(t, histograms[3], v) // And we don't go back. require.Equal(t, chunkenc.ValHistogram, it.Seek(2)) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(3), ts) require.Equal(t, histograms[3], v) @@ -347,12 +347,12 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { fh *histogram.FloatHistogram ) require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, h = it.AtHistogram() + ts, h = it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[0], h) require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, h = it.AtHistogram() + ts, h = it.AtHistogram(nil) require.Equal(t, int64(2), ts) require.Equal(t, histograms[1], h) @@ -393,13 +393,13 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { require.Equal(t, 8., v) require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, h = it.AtHistogram() + ts, h = it.AtHistogram(nil) require.Equal(t, int64(16), ts) require.Equal(t, histograms[10], h) // Getting a float histogram from an int histogram works. require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, fh = it.AtFloatHistogram() + ts, fh = it.AtFloatHistogram(nil) require.Equal(t, int64(17), ts) expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11])) require.Equal(t, expected, fh) diff --git a/storage/series.go b/storage/series.go index b111505aa3..eba11b4d9b 100644 --- a/storage/series.go +++ b/storage/series.go @@ -123,12 +123,12 @@ func (it *listSeriesIterator) At() (int64, float64) { return s.T(), s.F() } -func (it *listSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *listSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { s := it.samples.Get(it.idx) return s.T(), s.H() } -func (it *listSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *listSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { s := it.samples.Get(it.idx) return s.T(), s.FH() } @@ -337,7 +337,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { t, v = seriesIter.At() app.Append(t, v) case chunkenc.ValHistogram: - t, h = seriesIter.AtHistogram() + t, h = seriesIter.AtHistogram(nil) newChk, recoded, app, err = app.AppendHistogram(nil, t, h, false) if err != nil { return errChunksIterator{err: err} @@ -352,7 +352,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { chk = newChk } case chunkenc.ValFloatHistogram: - t, fh = seriesIter.AtFloatHistogram() + t, fh = seriesIter.AtFloatHistogram(nil) newChk, recoded, app, err = app.AppendFloatHistogram(nil, t, fh, false) if err != nil { return errChunksIterator{err: err} @@ -438,10 +438,10 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, } result = append(result, newSampleFn(t, f, nil, nil)) case chunkenc.ValHistogram: - t, h := iter.AtHistogram() + t, h := iter.AtHistogram(nil) result = append(result, newSampleFn(t, 0, h, nil)) case chunkenc.ValFloatHistogram: - t, fh := iter.AtFloatHistogram() + t, fh := iter.AtFloatHistogram(nil) result = append(result, newSampleFn(t, 0, nil, fh)) } } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index c155b4451d..d48418b579 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -555,10 +555,10 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str t, v := it.At() ref, err = app.Append(ref, lset, t, v) case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, h, nil) case chunkenc.ValFloatHistogram: - t, fh := it.AtFloatHistogram() + t, fh := it.AtFloatHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, nil, fh) default: err = fmt.Errorf("unknown sample type %s", typ.String()) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 0126f1fbdb..21c41257b5 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -131,16 +131,20 @@ type Iterator interface { // At returns the current timestamp/value pair if the value is a float. // Before the iterator has advanced, the behaviour is unspecified. At() (int64, float64) - // AtHistogram returns the current timestamp/value pair if the value is - // a histogram with integer counts. Before the iterator has advanced, - // the behaviour is unspecified. - AtHistogram() (int64, *histogram.Histogram) + // AtHistogram returns the current timestamp/value pair if the value is a + // histogram with integer counts. Before the iterator has advanced, the behaviour + // is unspecified. + // The method accepts an optional Histogram object which will be + // reused when not nil. Otherwise, a new Histogram object will be allocated. + AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) // AtFloatHistogram returns the current timestamp/value pair if the // value is a histogram with floating-point counts. It also works if the // value is a histogram with integer counts, in which case a // FloatHistogram copy of the histogram is returned. Before the iterator // has advanced, the behaviour is unspecified. - AtFloatHistogram() (int64, *histogram.FloatHistogram) + // The method accepts an optional FloatHistogram object which will be + // reused when not nil. Otherwise, a new FloatHistogram object will be allocated. + AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) // AtT returns the current timestamp. // Before the iterator has advanced, the behaviour is unspecified. AtT() int64 @@ -222,9 +226,11 @@ func (it *mockSeriesIterator) At() (int64, float64) { return it.timeStamps[it.currIndex], it.values[it.currIndex] } -func (it *mockSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { return math.MinInt64, nil } +func (it *mockSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { + return math.MinInt64, nil +} -func (it *mockSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *mockSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return math.MinInt64, nil } @@ -249,13 +255,18 @@ func NewNopIterator() Iterator { type nopIterator struct{} -func (nopIterator) Next() ValueType { return ValNone } -func (nopIterator) Seek(int64) ValueType { return ValNone } -func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } -func (nopIterator) AtHistogram() (int64, *histogram.Histogram) { return math.MinInt64, nil } -func (nopIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { return math.MinInt64, nil } -func (nopIterator) AtT() int64 { return math.MinInt64 } -func (nopIterator) Err() error { return nil } +func (nopIterator) Next() ValueType { return ValNone } +func (nopIterator) Seek(int64) ValueType { return ValNone } +func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } +func (nopIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { + return math.MinInt64, nil +} + +func (nopIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return math.MinInt64, nil +} +func (nopIterator) AtT() int64 { return math.MinInt64 } +func (nopIterator) Err() error { return nil } // Pool is used to create and reuse chunk references to avoid allocations. type Pool interface { diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 3d76cdf65f..88d189254f 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -527,7 +527,7 @@ func (a *FloatHistogramAppender) recode( numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) for it.Next() == ValFloatHistogram { - tOld, hOld := it.AtFloatHistogram() + tOld, hOld := it.AtFloatHistogram(nil) // We have to newly allocate slices for the modified buckets // here because they are kept by the appender until the next @@ -728,27 +728,50 @@ func (it *floatHistogramIterator) At() (int64, float64) { panic("cannot call floatHistogramIterator.At") } -func (it *floatHistogramIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *floatHistogramIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic("cannot call floatHistogramIterator.AtHistogram") } -func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { if value.IsStaleNaN(it.sum.value) { return it.t, &histogram.FloatHistogram{Sum: it.sum.value} } - it.atFloatHistogramCalled = true - return it.t, &histogram.FloatHistogram{ - CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), - Count: it.cnt.value, - ZeroCount: it.zCnt.value, - Sum: it.sum.value, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pBuckets, - NegativeBuckets: it.nBuckets, + if fh == nil { + it.atFloatHistogramCalled = true + return it.t, &histogram.FloatHistogram{ + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), + Count: it.cnt.value, + ZeroCount: it.zCnt.value, + Sum: it.sum.value, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, + } } + + fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead) + fh.Schema = it.schema + fh.ZeroThreshold = it.zThreshold + fh.ZeroCount = it.zCnt.value + fh.Count = it.cnt.value + fh.Sum = it.sum.value + + fh.PositiveSpans = resize(fh.PositiveSpans, len(it.pSpans)) + copy(fh.PositiveSpans, it.pSpans) + + fh.NegativeSpans = resize(fh.NegativeSpans, len(it.nSpans)) + copy(fh.NegativeSpans, it.nSpans) + + fh.PositiveBuckets = resize(fh.PositiveBuckets, len(it.pBuckets)) + copy(fh.PositiveBuckets, it.pBuckets) + + fh.NegativeBuckets = resize(fh.NegativeBuckets, len(it.nBuckets)) + copy(fh.NegativeBuckets, it.nBuckets) + + return it.t, fh } func (it *floatHistogramIterator) AtT() int64 { diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index 6f5a95fb1c..054c17f7d9 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -140,7 +140,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { require.NoError(t, it.Err()) var act []floatResult for it.Next() == ValFloatHistogram { - fts, fh := it.AtFloatHistogram() + fts, fh := it.AtFloatHistogram(nil) act = append(act, floatResult{t: fts, h: fh}) } require.NoError(t, it.Err()) @@ -150,7 +150,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { it2 := c.Iterator(it) var act2 []floatResult for it2.Next() == ValFloatHistogram { - fts, fh := it2.AtFloatHistogram() + fts, fh := it2.AtFloatHistogram(nil) act2 = append(act2, floatResult{t: fts, h: fh}) } require.NoError(t, it2.Err()) @@ -164,7 +164,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { it3 := c.iterator(itX) var act3 []floatResult for it3.Next() == ValFloatHistogram { - fts, fh := it3.AtFloatHistogram() + fts, fh := it3.AtFloatHistogram(nil) act3 = append(act3, floatResult{t: fts, h: fh}) } require.NoError(t, it3.Err()) @@ -178,10 +178,10 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { // Below ones should not matter. require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t)) require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t)) - fts, fh := it4.AtFloatHistogram() + fts, fh := it4.AtFloatHistogram(nil) act4 = append(act4, floatResult{t: fts, h: fh}) for it4.Next() == ValFloatHistogram { - fts, fh := it4.AtFloatHistogram() + fts, fh := it4.AtFloatHistogram(nil) act4 = append(act4, floatResult{t: fts, h: fh}) } require.NoError(t, it4.Err()) @@ -272,7 +272,7 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) { it := c.Iterator(nil) var act []floatResult for it.Next() == ValFloatHistogram { - fts, fh := it.AtFloatHistogram() + fts, fh := it.AtFloatHistogram(nil) act = append(act, floatResult{t: fts, h: fh}) } require.NoError(t, it.Err()) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index ac84e7a1ef..cb09eda26d 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -558,7 +558,7 @@ func (a *HistogramAppender) recode( numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) for it.Next() == ValHistogram { - tOld, hOld := it.AtHistogram() + tOld, hOld := it.AtHistogram(nil) // We have to newly allocate slices for the modified buckets // here because they are kept by the appender until the next @@ -776,42 +776,96 @@ func (it *histogramIterator) At() (int64, float64) { panic("cannot call histogramIterator.At") } -func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { if value.IsStaleNaN(it.sum) { return it.t, &histogram.Histogram{Sum: it.sum} } - it.atHistogramCalled = true - return it.t, &histogram.Histogram{ - CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), - Count: it.cnt, - ZeroCount: it.zCnt, - Sum: it.sum, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pBuckets, - NegativeBuckets: it.nBuckets, + if h == nil { + it.atHistogramCalled = true + return it.t, &histogram.Histogram{ + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), + Count: it.cnt, + ZeroCount: it.zCnt, + Sum: it.sum, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, + } } + + h.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead) + h.Schema = it.schema + h.ZeroThreshold = it.zThreshold + h.ZeroCount = it.zCnt + h.Count = it.cnt + h.Sum = it.sum + + h.PositiveSpans = resize(h.PositiveSpans, len(it.pSpans)) + copy(h.PositiveSpans, it.pSpans) + + h.NegativeSpans = resize(h.NegativeSpans, len(it.nSpans)) + copy(h.NegativeSpans, it.nSpans) + + h.PositiveBuckets = resize(h.PositiveBuckets, len(it.pBuckets)) + copy(h.PositiveBuckets, it.pBuckets) + + h.NegativeBuckets = resize(h.NegativeBuckets, len(it.nBuckets)) + copy(h.NegativeBuckets, it.nBuckets) + + return it.t, h } -func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { if value.IsStaleNaN(it.sum) { return it.t, &histogram.FloatHistogram{Sum: it.sum} } - it.atFloatHistogramCalled = true - return it.t, &histogram.FloatHistogram{ - CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), - Count: float64(it.cnt), - ZeroCount: float64(it.zCnt), - Sum: it.sum, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pFloatBuckets, - NegativeBuckets: it.nFloatBuckets, + if fh == nil { + it.atFloatHistogramCalled = true + return it.t, &histogram.FloatHistogram{ + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), + Count: float64(it.cnt), + ZeroCount: float64(it.zCnt), + Sum: it.sum, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pFloatBuckets, + NegativeBuckets: it.nFloatBuckets, + } } + + fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead) + fh.Schema = it.schema + fh.ZeroThreshold = it.zThreshold + fh.ZeroCount = float64(it.zCnt) + fh.Count = float64(it.cnt) + fh.Sum = it.sum + + fh.PositiveSpans = resize(fh.PositiveSpans, len(it.pSpans)) + copy(fh.PositiveSpans, it.pSpans) + + fh.NegativeSpans = resize(fh.NegativeSpans, len(it.nSpans)) + copy(fh.NegativeSpans, it.nSpans) + + fh.PositiveBuckets = resize(fh.PositiveBuckets, len(it.pBuckets)) + var currentPositive float64 + for i, b := range it.pBuckets { + currentPositive += float64(b) + fh.PositiveBuckets[i] = currentPositive + } + + fh.NegativeBuckets = resize(fh.NegativeBuckets, len(it.nBuckets)) + var currentNegative float64 + for i, b := range it.nBuckets { + currentNegative += float64(b) + fh.NegativeBuckets[i] = currentNegative + } + + return it.t, fh } func (it *histogramIterator) AtT() int64 { @@ -1056,3 +1110,10 @@ func (it *histogramIterator) readSum() bool { } return true } + +func resize[T any](items []T, n int) []T { + if cap(items) < n { + return make([]T, n) + } + return items[:n] +} diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 53aee89db5..f7609c1936 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -141,8 +141,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) { require.NoError(t, it.Err()) var act []result for it.Next() == ValHistogram { - ts, h := it.AtHistogram() - fts, fh := it.AtFloatHistogram() + ts, h := it.AtHistogram(nil) + fts, fh := it.AtFloatHistogram(nil) require.Equal(t, ts, fts) act = append(act, result{t: ts, h: h, fh: fh}) } @@ -153,8 +153,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) { it2 := c.Iterator(it) var act2 []result for it2.Next() == ValHistogram { - ts, h := it2.AtHistogram() - fts, fh := it2.AtFloatHistogram() + ts, h := it2.AtHistogram(nil) + fts, fh := it2.AtFloatHistogram(nil) require.Equal(t, ts, fts) act2 = append(act2, result{t: ts, h: h, fh: fh}) } @@ -169,8 +169,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) { it3 := c.iterator(itX) var act3 []result for it3.Next() == ValHistogram { - ts, h := it3.AtHistogram() - fts, fh := it3.AtFloatHistogram() + ts, h := it3.AtHistogram(nil) + fts, fh := it3.AtFloatHistogram(nil) require.Equal(t, ts, fts) act3 = append(act3, result{t: ts, h: h, fh: fh}) } @@ -185,13 +185,13 @@ func TestHistogramChunkSameBuckets(t *testing.T) { // Below ones should not matter. require.Equal(t, ValHistogram, it4.Seek(exp[mid].t)) require.Equal(t, ValHistogram, it4.Seek(exp[mid].t)) - ts, h = it4.AtHistogram() - fts, fh := it4.AtFloatHistogram() + ts, h = it4.AtHistogram(nil) + fts, fh := it4.AtFloatHistogram(nil) require.Equal(t, ts, fts) act4 = append(act4, result{t: ts, h: h, fh: fh}) for it4.Next() == ValHistogram { - ts, h := it4.AtHistogram() - fts, fh := it4.AtFloatHistogram() + ts, h := it4.AtHistogram(nil) + fts, fh := it4.AtFloatHistogram(nil) require.Equal(t, ts, fts) act4 = append(act4, result{t: ts, h: h, fh: fh}) } @@ -284,8 +284,8 @@ func TestHistogramChunkBucketChanges(t *testing.T) { it := c.Iterator(nil) var act []result for it.Next() == ValHistogram { - ts, h := it.AtHistogram() - fts, fh := it.AtFloatHistogram() + ts, h := it.AtHistogram(nil) + fts, fh := it.AtFloatHistogram(nil) require.Equal(t, ts, fts) act = append(act, result{t: ts, h: h, fh: fh}) } @@ -897,7 +897,7 @@ func TestAtFloatHistogram(t *testing.T) { it := chk.Iterator(nil) i := int64(0) for it.Next() != ValNone { - ts, h := it.AtFloatHistogram() + ts, h := it.AtFloatHistogram(nil) require.Equal(t, i, ts) require.Equal(t, expOutput[i], h, "histogram %d unequal", i) i++ diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index d54e5dbab1..07b9238315 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -260,11 +260,11 @@ func (it *xorIterator) At() (int64, float64) { return it.t, it.val } -func (it *xorIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *xorIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic("cannot call xorIterator.AtHistogram") } -func (it *xorIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *xorIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { panic("cannot call xorIterator.AtFloatHistogram") } diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 2c6db3637a..543b98c289 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -226,10 +226,10 @@ func ChunkMetasToSamples(chunks []Meta) (result []Sample) { t, v := it.At() result = append(result, sample{t: t, f: v}) case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) result = append(result, sample{t: t, h: h}) case chunkenc.ValFloatHistogram: - t, fh := it.AtFloatHistogram() + t, fh := it.AtFloatHistogram(nil) result = append(result, sample{t: t, fh: fh}) default: panic("unexpected value type") diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index f33bb73c19..b2d2ea6e7f 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1007,10 +1007,10 @@ func TestCompaction_populateBlock(t *testing.T) { s.t, s.f = iter.At() samples = append(samples, s) case chunkenc.ValHistogram: - s.t, s.h = iter.AtHistogram() + s.t, s.h = iter.AtHistogram(nil) samples = append(samples, s) case chunkenc.ValFloatHistogram: - s.t, s.fh = iter.AtFloatHistogram() + s.t, s.fh = iter.AtFloatHistogram(nil) samples = append(samples, s) default: require.Fail(t, "unexpected value type") diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 3bc094a3da..9e543ed500 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -107,10 +107,10 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str ts, v := it.At() samples = append(samples, sample{t: ts, f: v}) case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) samples = append(samples, sample{t: ts, h: h}) case chunkenc.ValFloatHistogram: - ts, fh := it.AtFloatHistogram() + ts, fh := it.AtFloatHistogram(nil) samples = append(samples, sample{t: ts, fh: fh}) default: t.Fatalf("unknown sample type in query %s", typ.String()) @@ -6664,10 +6664,10 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { ts, v := it.At() slice = append(slice, sample{t: ts, f: v}) case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) slice = append(slice, sample{t: ts, h: h}) case chunkenc.ValFloatHistogram: - ts, h := it.AtFloatHistogram() + ts, h := it.AtFloatHistogram(nil) slice = append(slice, sample{t: ts, fh: h}) default: t.Fatalf("unexpected sample value type %d", typ) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5c2749bed2..653c53a740 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3377,10 +3377,10 @@ func TestAppendHistogram(t *testing.T) { for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) actHistograms = append(actHistograms, sample{t: ts, h: h}) case chunkenc.ValFloatHistogram: - ts, fh := it.AtFloatHistogram() + ts, fh := it.AtFloatHistogram(nil) actFloatHistograms = append(actFloatHistograms, sample{t: ts, fh: fh}) } } @@ -4025,10 +4025,10 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) actHistograms = append(actHistograms, timedHistogram{t: t, h: h}) case chunkenc.ValFloatHistogram: - t, h := it.AtFloatHistogram() + t, h := it.AtFloatHistogram(nil) actHistograms = append(actHistograms, timedHistogram{t: t, fh: h}) } } diff --git a/tsdb/querier.go b/tsdb/querier.go index 68aa957463..a692c98f1a 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -820,12 +820,12 @@ func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() } -func (p *populateWithDelSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - return p.curr.AtHistogram() +func (p *populateWithDelSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + return p.curr.AtHistogram(h) } -func (p *populateWithDelSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - return p.curr.AtFloatHistogram() +func (p *populateWithDelSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return p.curr.AtFloatHistogram(fh) } func (p *populateWithDelSeriesIterator) AtT() int64 { @@ -937,7 +937,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { break } var h *histogram.Histogram - t, h = p.currDelIter.AtHistogram() + t, h = p.currDelIter.AtHistogram(nil) _, _, app, err = app.AppendHistogram(nil, t, h, true) if err != nil { break @@ -968,7 +968,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { break } var h *histogram.FloatHistogram - t, h = p.currDelIter.AtFloatHistogram() + t, h = p.currDelIter.AtFloatHistogram(nil) _, _, app, err = app.AppendFloatHistogram(nil, t, h, true) if err != nil { break @@ -1054,7 +1054,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { case chunkenc.ValHistogram: { var v *histogram.Histogram - t, v = p.currDelIter.AtHistogram() + t, v = p.currDelIter.AtHistogram(nil) // No need to set prevApp as AppendHistogram will set the // counter reset header for the appender that's returned. newChunk, recoded, app, err = app.AppendHistogram(nil, t, v, false) @@ -1062,7 +1062,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { case chunkenc.ValFloatHistogram: { var v *histogram.FloatHistogram - t, v = p.currDelIter.AtFloatHistogram() + t, v = p.currDelIter.AtFloatHistogram(nil) // No need to set prevApp as AppendHistogram will set the // counter reset header for the appender that's returned. newChunk, recoded, app, err = app.AppendFloatHistogram(nil, t, v, false) @@ -1233,13 +1233,13 @@ func (it *DeletedIterator) At() (int64, float64) { return it.Iter.At() } -func (it *DeletedIterator) AtHistogram() (int64, *histogram.Histogram) { - t, h := it.Iter.AtHistogram() +func (it *DeletedIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + t, h := it.Iter.AtHistogram(h) return t, h } -func (it *DeletedIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - t, h := it.Iter.AtFloatHistogram() +func (it *DeletedIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + t, h := it.Iter.AtFloatHistogram(fh) return t, h } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 18d81b85b4..1ed59ef1ae 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -775,11 +775,11 @@ func (it *mockSampleIterator) At() (int64, float64) { return it.s[it.idx].T(), it.s[it.idx].F() } -func (it *mockSampleIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *mockSampleIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return it.s[it.idx].T(), it.s[it.idx].H() } -func (it *mockSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *mockSampleIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return it.s[it.idx].T(), it.s[it.idx].FH() } @@ -1822,12 +1822,12 @@ func checkCurrVal(t *testing.T, valType chunkenc.ValueType, it *populateWithDelS require.Equal(t, int64(expectedTs), ts) require.Equal(t, float64(expectedValue), v) case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) require.Equal(t, int64(expectedTs), ts) h.CounterResetHint = histogram.UnknownCounterReset require.Equal(t, tsdbutil.GenerateTestHistogram(expectedValue), h) case chunkenc.ValFloatHistogram: - ts, h := it.AtFloatHistogram() + ts, h := it.AtFloatHistogram(nil) require.Equal(t, int64(expectedTs), ts) h.CounterResetHint = histogram.UnknownCounterReset require.Equal(t, tsdbutil.GenerateTestFloatHistogram(expectedValue), h) diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index d4e43b73cc..f7b27c2e08 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -73,10 +73,10 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l t, v := it.At() ref, err = app.Append(ref, lset, t, v) case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, h, nil) case chunkenc.ValFloatHistogram: - t, fh := it.AtFloatHistogram() + t, fh := it.AtFloatHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, nil, fh) default: return "", fmt.Errorf("unknown sample type %s", typ.String()) diff --git a/web/federate.go b/web/federate.go index 22384a696c..62e8c97b74 100644 --- a/web/federate.go +++ b/web/federate.go @@ -123,7 +123,7 @@ Loop: case chunkenc.ValFloat: t, f = it.At() case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: - t, fh = it.AtFloatHistogram() + t, fh = it.AtFloatHistogram(nil) default: sample, ok := it.PeekBack(1) if !ok {