From 3c7de690598159b535ab1a4adf73aca44969a378 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 20 Sep 2022 18:16:45 +0100 Subject: [PATCH] storage: allow re-use of iterators Patterned after `Chunk.Iterator()`: pass the old iterator in so it can be re-used to avoid allocating a new object. (This commit does not do any re-use; it is just changing all the method signatures so re-use is possible in later commits.) Signed-off-by: Bryan Boreham --- cmd/promtool/backfill_test.go | 2 +- cmd/promtool/rules_test.go | 2 +- cmd/promtool/tsdb.go | 2 +- promql/engine.go | 16 ++++++++--- promql/test_test.go | 2 +- promql/value.go | 2 +- rules/manager.go | 2 +- rules/manager_test.go | 3 ++- scrape/scrape_test.go | 4 +-- storage/fanout_test.go | 6 +++-- storage/interface.go | 13 +++++---- storage/merge.go | 14 +++++----- storage/merge_test.go | 19 ++++++------- storage/remote/codec.go | 9 ++++--- storage/remote/codec_test.go | 2 +- storage/series.go | 26 +++++++++--------- tsdb/block_test.go | 8 +++--- tsdb/compact.go | 7 ++--- tsdb/db_test.go | 50 ++++++++++++++++++++++------------- tsdb/example_test.go | 2 +- tsdb/head_test.go | 24 ++++++++--------- tsdb/querier.go | 4 +-- tsdb/querier_test.go | 15 ++++++----- tsdb/tsdbblockutil.go | 3 ++- web/federate.go | 4 ++- 25 files changed, 140 insertions(+), 101 deletions(-) diff --git a/cmd/promtool/backfill_test.go b/cmd/promtool/backfill_test.go index 398f96766b..2c551abeb3 100644 --- a/cmd/promtool/backfill_test.go +++ b/cmd/promtool/backfill_test.go @@ -49,7 +49,7 @@ func queryAllSeries(t testing.TB, q storage.Querier, expectedMinTime, expectedMa samples := []backfillSample{} for ss.Next() { series := ss.At() - it := series.Iterator() + it := series.Iterator(nil) require.NoError(t, it.Err()) for it.Next() == chunkenc.ValFloat { ts, v := it.At() diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go index 0e60a20fb6..caa930616a 100644 --- a/cmd/promtool/rules_test.go +++ b/cmd/promtool/rules_test.go @@ -139,7 +139,7 @@ func TestBackfillRuleIntegration(t *testing.T) { } else { require.Equal(t, 3, len(series.Labels())) } - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { samplesCount++ ts, v := it.At() diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 6934fad49d..91b97f5c51 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -644,7 +644,7 @@ func dumpSamples(path string, mint, maxt int64) (err error) { for ss.Next() { series := ss.At() lbs := series.Labels() - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { ts, val := it.At() fmt.Printf("%s %g %d\n", lbs, val, ts) diff --git a/promql/engine.go b/promql/engine.go index b3ad14b3d7..0225f78d2a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1393,10 +1393,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { enh := &EvalNodeHelper{Out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. it := storage.NewBuffer(selRange) + var chkIter chunkenc.Iterator for i, s := range selVS.Series { ev.currentSamples -= len(points) points = points[:0] - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) metric := selVS.Series[i].Labels() // The last_over_time function acts like offset; thus, it // should keep the metric name. For all the other range @@ -1578,8 +1580,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } mat := make(Matrix, 0, len(e.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator for i, s := range e.Series { - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) ss := Series{ Metric: e.Series[i].Labels(), Points: getPointSlice(numSteps), @@ -1723,8 +1727,10 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect } vec := make(Vector, 0, len(node.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator for i, s := range node.Series { - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) t, v, h, ok := ev.vectorSelectorSingle(it, node, ts) if ok { @@ -1812,12 +1818,14 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } + var chkIter chunkenc.Iterator series := vs.Series for i, s := range series { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) ss := Series{ Metric: series[i].Labels(), } diff --git a/promql/test_test.go b/promql/test_test.go index 5c16e57a29..c5cb41ed9d 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -143,7 +143,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { got := Series{ Metric: storageSeries.Labels(), } - it := storageSeries.Iterator() + it := storageSeries.Iterator(nil) for it.Next() == chunkenc.ValFloat { t, v := it.At() got.Points = append(got.Points, Point{T: t, V: v}) diff --git a/promql/value.go b/promql/value.go index 507a5e6f15..78342859e3 100644 --- a/promql/value.go +++ b/promql/value.go @@ -363,7 +363,7 @@ func (ss *StorageSeries) Labels() labels.Labels { } // Iterator returns a new iterator of the data of the series. -func (ss *StorageSeries) Iterator() chunkenc.Iterator { +func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return newStorageSeriesIterator(ss.series) } diff --git a/rules/manager.go b/rules/manager.go index 42f1b59ce0..4b9c8150a0 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -807,7 +807,7 @@ func (g *Group) RestoreForState(ts time.Time) { // Series found for the 'for' state. var t int64 var v float64 - it := s.Iterator() + it := s.Iterator(nil) for it.Next() == chunkenc.ValFloat { t, v = it.At() } diff --git a/rules/manager_test.go b/rules/manager_test.go index 984bb81b9e..5c580caf71 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -592,12 +592,13 @@ func TestStaleness(t *testing.T) { // Convert a SeriesSet into a form usable with require.Equal. func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) { result := map[string][]promql.Point{} + var it chunkenc.Iterator for ss.Next() { series := ss.At() points := []promql.Point{} - it := series.Iterator() + it := series.Iterator(it) for it.Next() == chunkenc.ValFloat { t, v := it.At() points = append(points, promql.Point{T: t, V: v}) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index b22f7f0953..bb851bd9ec 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2959,7 +2959,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { c := 0 for series.Next() { - i := series.At().Iterator() + i := series.At().Iterator(nil) for i.Next() != chunkenc.ValNone { c++ } @@ -3032,7 +3032,7 @@ func TestScrapeReportLimit(t *testing.T) { var found bool for series.Next() { - i := series.At().Iterator() + i := series.At().Iterator(nil) for i.Next() == chunkenc.ValFloat { _, v := i.At() require.Equal(t, 1.0, v) diff --git a/storage/fanout_test.go b/storage/fanout_test.go index ee6623397b..4996e8f64a 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -86,11 +86,12 @@ func TestFanout_SelectSorted(t *testing.T) { result := make(map[int64]float64) var labelsResult labels.Labels + var iterator chunkenc.Iterator for seriesSet.Next() { series := seriesSet.At() seriesLabels := series.Labels() labelsResult = seriesLabels - iterator := series.Iterator() + iterator := series.Iterator(iterator) for iterator.Next() == chunkenc.ValFloat { timestamp, value := iterator.At() result[timestamp] = value @@ -112,11 +113,12 @@ func TestFanout_SelectSorted(t *testing.T) { result := make(map[int64]float64) var labelsResult labels.Labels + var iterator chunkenc.Iterator for seriesSet.Next() { series := seriesSet.At() seriesLabels := series.Labels() labelsResult = seriesLabels - iterator := series.Iterator() + iterator := series.Iterator(iterator) for iterator.Next() == chunkenc.ValFloat { timestamp, value := iterator.At() result[timestamp] = value diff --git a/storage/interface.go b/storage/interface.go index 22d3b41860..5f0be9db97 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -382,7 +382,7 @@ func (s mockSeries) Labels() labels.Labels { return labels.FromStrings(s.labelSet...) } -func (s mockSeries) Iterator() chunkenc.Iterator { +func (s mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator { return chunkenc.MockSeriesIterator(s.timestamps, s.values) } @@ -421,14 +421,17 @@ type Labels interface { } type SampleIterable interface { - // Iterator returns a new, independent iterator of the data of the series. - Iterator() chunkenc.Iterator + // Iterator returns an iterator of the data of the series. + // The iterator passed as argument is for re-use. + // Depending on implementation, the iterator can + // be re-used or a new iterator can be allocated. + Iterator(chunkenc.Iterator) chunkenc.Iterator } type ChunkIterable interface { - // Iterator returns a new, independent iterator that iterates over potentially overlapping + // Iterator returns an iterator that iterates over potentially overlapping // chunks of the series, sorted by min time. - Iterator() chunks.Iterator + Iterator(chunks.Iterator) chunks.Iterator } type Warnings []error diff --git a/storage/merge.go b/storage/merge.go index 258e4e3120..336d82c6f8 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -425,10 +425,10 @@ func ChainedSeriesMerge(series ...Series) Series { } return &SeriesEntry{ Lset: series[0].Labels(), - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator { iterators := make([]chunkenc.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return NewChainSampleIterator(iterators) }, @@ -607,10 +607,10 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC } return &ChunkSeriesEntry{ Lset: series[0].Labels(), - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { iterators := make([]chunks.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return &compactChunkIterator{ mergeFunc: mergeFunc, @@ -693,7 +693,7 @@ func (c *compactChunkIterator) Next() bool { } // Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here. - iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator() + iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator(nil) if !iter.Next() { if c.err = iter.Err(); c.err != nil { return false @@ -751,10 +751,10 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { } return &ChunkSeriesEntry{ Lset: series[0].Labels(), - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { iterators := make([]chunks.Iterator, 0, len(series)) for _, s := range series { - iterators = append(iterators, s.Iterator()) + iterators = append(iterators, s.Iterator(nil)) } return &concatenatingChunkIterator{ iterators: iterators, diff --git a/storage/merge_test.go b/storage/merge_test.go index a6576da137..407fc4ea55 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -202,8 +202,8 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { expectedSeries := tc.expected.At() require.Equal(t, expectedSeries.Labels(), actualSeries.Labels()) - expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(), nil) - actSmpl, actErr := ExpandSamples(actualSeries.Iterator(), nil) + expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(nil), nil) + actSmpl, actErr := ExpandSamples(actualSeries.Iterator(nil), nil) require.Equal(t, expErr, actErr) require.Equal(t, expSmpl, actSmpl) } @@ -370,8 +370,8 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { expectedSeries := tc.expected.At() require.Equal(t, expectedSeries.Labels(), actualSeries.Labels()) - expChks, expErr := ExpandChunks(expectedSeries.Iterator()) - actChks, actErr := ExpandChunks(actualSeries.Iterator()) + expChks, expErr := ExpandChunks(expectedSeries.Iterator(nil)) + actChks, actErr := ExpandChunks(actualSeries.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -533,8 +533,8 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) require.Equal(t, tc.expected.Labels(), merged.Labels()) - actChks, actErr := ExpandChunks(merged.Iterator()) - expChks, expErr := ExpandChunks(tc.expected.Iterator()) + actChks, actErr := ExpandChunks(merged.Iterator(nil)) + expChks, expErr := ExpandChunks(tc.expected.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -667,8 +667,8 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) require.Equal(t, tc.expected.Labels(), merged.Labels()) - actChks, actErr := ExpandChunks(merged.Iterator()) - expChks, expErr := ExpandChunks(tc.expected.Iterator()) + actChks, actErr := ExpandChunks(merged.Iterator(nil)) + expChks, expErr := ExpandChunks(tc.expected.Iterator(nil)) require.Equal(t, expErr, actErr) require.Equal(t, expChks, actChks) @@ -893,10 +893,11 @@ func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) { var err error var t int64 var v float64 + var iter chunkenc.Iterator for n := 0; n < b.N; n++ { seriesSet := makeSeriesSet() for seriesSet.Next() { - iter := seriesSet.At().Iterator() + iter = seriesSet.At().Iterator(iter) for iter.Next() == chunkenc.ValFloat { t, v = iter.At() } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 48c2d8615f..9b7516b877 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" ) // decodeReadLimit is the maximum size of a read request body in bytes. @@ -115,9 +116,10 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHi func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, storage.Warnings, error) { numSamples := 0 resp := &prompb.QueryResult{} + var iter chunkenc.Iterator for ss.Next() { series := ss.At() - iter := series.Iterator() + iter = series.Iterator(iter) samples := []prompb.Sample{} for iter.Next() == chunkenc.ValFloat { @@ -199,11 +201,12 @@ func StreamChunkedReadResponses( var ( chks []prompb.Chunk lbls []prompb.Label + iter chunks.Iterator ) for ss.Next() { series := ss.At() - iter := series.Iterator() + iter = series.Iterator(iter) lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) frameBytesLeft := maxBytesInFrame @@ -346,7 +349,7 @@ func (c *concreteSeries) Labels() labels.Labels { return labels.New(c.labels...) } -func (c *concreteSeries) Iterator() chunkenc.Iterator { +func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return newConcreteSeriersIterator(c) } diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index c806097c62..596eb0861c 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -215,7 +215,7 @@ func TestConcreteSeriesIterator(t *testing.T) { {Value: 4, Timestamp: 4}, }, } - it := series.Iterator() + it := series.Iterator(nil) // Seek to the first sample with ts=1. require.Equal(t, chunkenc.ValFloat, it.Seek(1)) diff --git a/storage/series.go b/storage/series.go index 3259dd4d06..87b1256f6b 100644 --- a/storage/series.go +++ b/storage/series.go @@ -27,25 +27,25 @@ import ( type SeriesEntry struct { Lset labels.Labels - SampleIteratorFn func() chunkenc.Iterator + SampleIteratorFn func(chunkenc.Iterator) chunkenc.Iterator } -func (s *SeriesEntry) Labels() labels.Labels { return s.Lset } -func (s *SeriesEntry) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() } +func (s *SeriesEntry) Labels() labels.Labels { return s.Lset } +func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return s.SampleIteratorFn(it) } type ChunkSeriesEntry struct { Lset labels.Labels - ChunkIteratorFn func() chunks.Iterator + ChunkIteratorFn func(chunks.Iterator) chunks.Iterator } -func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } -func (s *ChunkSeriesEntry) Iterator() chunks.Iterator { return s.ChunkIteratorFn() } +func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset } +func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) } // NewListSeries returns series entry with iterator that allows to iterate over provided samples. func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { return &SeriesEntry{ Lset: lset, - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { return NewListSeriesIterator(samples(s)) }, } @@ -56,7 +56,7 @@ func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkSeriesEntry { return &ChunkSeriesEntry{ Lset: lset, - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator { chks := make([]chunks.Meta, 0, len(samples)) for _, s := range samples { chks = append(chks, tsdbutil.ChunkFromSamples(s)) @@ -178,7 +178,7 @@ func (c *chunkSetToSeriesSet) Next() bool { return false } - iter := c.ChunkSeriesSet.At().Iterator() + iter := c.ChunkSeriesSet.At().Iterator(nil) c.sameSeriesChunks = c.sameSeriesChunks[:0] for iter.Next() { @@ -210,9 +210,9 @@ func (c *chunkSetToSeriesSet) Err() error { func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series { return &SeriesEntry{ Lset: labels, - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { // TODO(bwplotka): Can we provide any chunkenc buffer? - return chk.Chunk.Iterator(nil) + return chk.Chunk.Iterator(it) }, } } @@ -252,7 +252,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries { return &seriesToChunkEncoder{series} } -func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { +func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { var ( chk chunkenc.Chunk app chunkenc.Appender @@ -263,7 +263,7 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { chks := []chunks.Meta{} i := 0 - seriesIter := s.Series.Iterator() + seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { if typ != lastType || i >= seriesToChunkEncoderSplit { diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 6cb00b348a..c3a6ff5769 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) { // Check chunk errors during iter time. require.True(t, set.Next()) - it := set.At().Iterator() + it := set.At().Iterator(nil) require.Equal(t, chunkenc.ValNone, it.Next()) require.Equal(t, tc.iterErr.Error(), it.Err().Error()) }) @@ -505,11 +505,12 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str head, err := NewHead(nil, nil, w, nil, opts, nil) require.NoError(tb, err) + var it chunkenc.Iterator ctx := context.Background() app := head.Appender(ctx) for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() typ := it.Next() lastTyp := typ @@ -550,11 +551,12 @@ func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series oooSampleLabels := make([]labels.Labels, 0, len(series)) oooSamples := make([]tsdbutil.SampleSlice, 0, len(series)) + var it chunkenc.Iterator totalSamples := 0 app := head.Appender(context.Background()) for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() os := tsdbutil.SampleSlice{} count := 0 diff --git a/tsdb/compact.go b/tsdb/compact.go index 9fe50fda1d..f216ad46a4 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -746,8 +746,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } var ( - ref = storage.SeriesRef(0) - chks []chunks.Meta + ref = storage.SeriesRef(0) + chks []chunks.Meta + chksIter chunks.Iterator ) set := sets[0] @@ -765,7 +766,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } s := set.At() - chksIter := s.Iterator() + chksIter = s.Iterator(chksIter) chks = chks[:0] for chksIter.Next() { // We are not iterating in streaming way over chunk as diff --git a/tsdb/db_test.go b/tsdb/db_test.go index d4c2840c2a..cea4b6e362 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -93,12 +93,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str require.NoError(t, q.Close()) }() + var it chunkenc.Iterator result := map[string][]tsdbutil.Sample{} for ss.Next() { series := ss.At() samples := []tsdbutil.Sample{} - it := series.Iterator() + it = series.Iterator(it) for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { case chunkenc.ValFloat: @@ -133,12 +134,13 @@ func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Match require.NoError(t, q.Close()) }() + var it chunks.Iterator result := map[string][]chunks.Meta{} for ss.Next() { series := ss.At() chks := []chunks.Meta{} - it := series.Iterator() + it = series.Iterator(it) for it.Next() { chks = append(chks, it.At()) } @@ -454,8 +456,8 @@ Outer: require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -628,9 +630,10 @@ func TestDB_Snapshot(t *testing.T) { // sum values seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -676,9 +679,10 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { // Sum values. seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -770,8 +774,8 @@ Outer: require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -921,7 +925,7 @@ func TestDB_e2e(t *testing.T) { for ss.Next() { x := ss.At() - smpls, err := storage.ExpandSamples(x.Iterator(), newSample) + smpls, err := storage.ExpandSamples(x.Iterator(nil), newSample) require.NoError(t, err) if len(smpls) > 0 { @@ -1108,12 +1112,13 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+")) actualSeries := 0 + var chunksIt chunks.Iterator for set.Next() { actualSeries++ actualChunks := 0 - chunksIt := set.At().Iterator() + chunksIt = set.At().Iterator(chunksIt) for chunksIt.Next() { actualChunks++ } @@ -1205,8 +1210,8 @@ func TestTombstoneClean(t *testing.T) { require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -1479,11 +1484,12 @@ func TestSizeRetention(t *testing.T) { // Add some data to the WAL. headApp := db.Head().Appender(context.Background()) var aSeries labels.Labels + var it chunkenc.Iterator for _, m := range headBlocks { series := genSeries(100, 10, m.MinTime, m.MaxTime+1) for _, s := range series { aSeries = s.Labels() - it := s.Iterator() + it = s.Iterator(it) for it.Next() == chunkenc.ValFloat { tim, v := it.At() _, err := headApp.Append(0, s.Labels(), tim, v) @@ -1691,10 +1697,11 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, storage.Warnings, error) { resultLabels := []labels.Labels{} resultSamples := map[string][]sample{} + var it chunkenc.Iterator for ss.Next() { series := ss.At() samples := []sample{} - it := series.Iterator() + it = series.Iterator(it) for it.Next() == chunkenc.ValFloat { t, v := it.At() samples = append(samples, sample{t: t, v: v}) @@ -2500,10 +2507,11 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { // Sum the values. seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) + var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { _, v := series.At() sum += v @@ -2946,10 +2954,11 @@ func TestCompactHead(t *testing.T) { defer func() { require.NoError(t, querier.Close()) }() seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"}) + var series chunkenc.Iterator var actSamples []sample for seriesSet.Next() { - series := seriesSet.At().Iterator() + series = seriesSet.At().Iterator(series) for series.Next() == chunkenc.ValFloat { time, val := series.At() actSamples = append(actSamples, sample{int64(time), val, nil, nil}) @@ -3347,7 +3356,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t actualSeries++ // Get the iterator and call Next() so that we're sure the chunk is loaded. - it := seriesSet.At().Iterator() + it := seriesSet.At().Iterator(nil) it.Next() it.At() @@ -3477,11 +3486,13 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) // Iterate all series and get their chunks. + var it chunks.Iterator var chunks []chunkenc.Chunk actualSeries := 0 for seriesSet.Next() { actualSeries++ - for it := seriesSet.At().Iterator(); it.Next(); { + it = seriesSet.At().Iterator(it) + for it.Next() { chunks = append(chunks, it.At().Chunk) } } @@ -6025,13 +6036,14 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { ctx := context.Background() + var it chunkenc.Iterator exp := make(map[string][]tsdbutil.Sample) for _, series := range blockSeries { createBlock(t, db.Dir(), series) for _, s := range series { key := s.Labels().String() - it := s.Iterator() + it = s.Iterator(it) slice := exp[key] for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { diff --git a/tsdb/example_test.go b/tsdb/example_test.go index c33bf6dc0c..da0e37923d 100644 --- a/tsdb/example_test.go +++ b/tsdb/example_test.go @@ -67,7 +67,7 @@ func Example() { series := ss.At() fmt.Println("series:", series.Labels().String()) - it := series.Iterator() + it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { _, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below) fmt.Println("sample", v) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 9b8eb0278c..59824ae087 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -924,8 +924,8 @@ func TestHeadDeleteSimple(t *testing.T) { require.Equal(t, expSeries.Labels(), actSeries.Labels()) - smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -959,7 +959,7 @@ func TestDeleteUntilCurMax(t *testing.T) { res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series is not present") s := res.At() - it := s.Iterator() + it := s.Iterator(nil) require.Equal(t, chunkenc.ValNone, it.Next(), "expected no samples") for res.Next() { } @@ -976,7 +976,7 @@ func TestDeleteUntilCurMax(t *testing.T) { res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series don't exist") exps := res.At() - it = exps.Iterator() + it = exps.Iterator(nil) resSamples, err := storage.ExpandSamples(it, newSample) require.NoError(t, err) require.Equal(t, []tsdbutil.Sample{sample{11, 1, nil, nil}}, resSamples) @@ -1163,7 +1163,7 @@ func TestDelete_e2e(t *testing.T) { eok, rok := expSs.Next(), ss.Next() // Skip a series if iterator is empty. if rok { - for ss.At().Iterator().Next() == chunkenc.ValNone { + for ss.At().Iterator(nil).Next() == chunkenc.ValNone { rok = ss.Next() if !rok { break @@ -1177,8 +1177,8 @@ func TestDelete_e2e(t *testing.T) { sexp := expSs.At() sres := ss.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) } @@ -2635,7 +2635,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { <-time.After(3 * time.Second) // Now consume after compaction when it's gone. - it := s.Iterator() + it := s.Iterator(nil) for it.Next() == chunkenc.ValFloat { _, _ = it.At() } @@ -2643,7 +2643,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { require.NoError(t, it.Err()) for ss.Next() { s = ss.At() - it := s.Iterator() + it = s.Iterator(it) for it.Next() == chunkenc.ValFloat { _, _ = it.At() } @@ -2841,7 +2841,7 @@ func TestAppendHistogram(t *testing.T) { s := ss.At() require.False(t, ss.Next()) - it := s.Iterator() + it := s.Iterator(nil) actHistograms := make([]timedHistogram, 0, len(expHistograms)) for it.Next() == chunkenc.ValHistogram { t, h := it.AtHistogram() @@ -3304,7 +3304,7 @@ func TestHistogramStaleSample(t *testing.T) { s := ss.At() require.False(t, ss.Next()) - it := s.Iterator() + it := s.Iterator(nil) actHistograms := make([]timedHistogram, 0, len(expHistograms)) for it.Next() == chunkenc.ValHistogram { t, h := it.AtHistogram() @@ -3581,7 +3581,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, ss.Next()) s := ss.At() - it := s.Iterator() + it := s.Iterator(nil) expIdx := 0 loop: for { diff --git a/tsdb/querier.go b/tsdb/querier.go index cc765903c0..3ae1c4f1e2 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -838,7 +838,7 @@ func (b *blockSeriesSet) At() storage.Series { currIterFn := b.currIterFn return &storage.SeriesEntry{ Lset: b.currLabels, - SampleIteratorFn: func() chunkenc.Iterator { + SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator { return currIterFn().toSeriesIterator() }, } @@ -872,7 +872,7 @@ func (b *blockChunkSeriesSet) At() storage.ChunkSeries { currIterFn := b.currIterFn return &storage.ChunkSeriesEntry{ Lset: b.currLabels, - ChunkIteratorFn: func() chunks.Iterator { + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { return currIterFn().toChunkSeriesIterator() }, } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index ffb24b17bb..20e4c2f8fd 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -194,8 +194,8 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C sres := res.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) require.Equal(t, errExp, errRes) require.Equal(t, smplExp, smplRes) @@ -230,9 +230,9 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C require.Equal(t, sexpChks.Labels(), sres.Labels()) - chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator()) + chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator(nil)) rmChunkRefs(chksExp) - chksRes, errRes := storage.ExpandChunks(sres.Iterator()) + chksRes, errRes := storage.ExpandChunks(sres.Iterator(nil)) rmChunkRefs(chksRes) require.Equal(t, errExp, errRes) require.Equal(t, chksExp, chksRes) @@ -1433,9 +1433,10 @@ func BenchmarkQuerySeek(b *testing.B) { b.ResetTimer() b.ReportAllocs() + var it chunkenc.Iterator ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) for ss.Next() { - it := ss.At().Iterator() + it = ss.At().Iterator(it) for t := mint; t <= maxt; t++ { it.Seek(t) } @@ -2042,11 +2043,13 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la for i := 0; i < b.N; i++ { ss := q.Select(false, nil, selectors...) var actualExpansions int + var it chunkenc.Iterator for ss.Next() { s := ss.At() s.Labels() - it := s.Iterator() + it = s.Iterator(it) for it.Next() != chunkenc.ValNone { + _, _ = it.At() } actualExpansions++ } diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 777db5e90e..8117f431c5 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -49,10 +49,11 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l const commitAfter = 10000 ctx := context.Background() app := w.Appender(ctx) + var it chunkenc.Iterator for _, s := range series { ref := storage.SeriesRef(0) - it := s.Iterator() + it = s.Iterator(it) lset := s.Labels() typ := it.Next() lastTyp := typ diff --git a/web/federate.go b/web/federate.go index 5ba68fa28f..baa3b58665 100644 --- a/web/federate.go +++ b/web/federate.go @@ -102,12 +102,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) + var chkIter chunkenc.Iterator for set.Next() { s := set.At() // TODO(fabxc): allow fast path for most recent sample either // in the storage itself or caching layer in Prometheus. - it.Reset(s.Iterator()) + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) var t int64 var v float64