diff --git a/storage/buffer_test.go b/storage/buffer_test.go index a03081ab6a..2a1b872489 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -201,6 +201,17 @@ type mockSeries struct { iterator func() SeriesIterator } +func newMockSeries(lset labels.Labels, samples []sample) Series { + return &mockSeries{ + labels: func() labels.Labels { + return lset + }, + iterator: func() SeriesIterator { + return newListSeriesIterator(samples) + }, + } +} + func (m *mockSeries) Labels() labels.Labels { return m.labels() } func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } diff --git a/storage/fanout.go b/storage/fanout.go index ae74ad2ca2..40be2536d6 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -303,6 +303,10 @@ type mergeSeriesSet struct { // NewMergeSeriesSet returns a new series set that merges (deduplicates) // series returned by the input series sets when iterating. func NewMergeSeriesSet(sets []SeriesSet) SeriesSet { + if len(sets) == 1 { + return sets[0] + } + // Sets need to be pre-advanced, so we can introspect the label of the // series under the cursor. var h seriesSetHeap @@ -340,6 +344,9 @@ func (c *mergeSeriesSet) Next() bool { } func (c *mergeSeriesSet) At() Series { + if len(c.currentSets) == 1 { + return c.currentSets[0].At() + } series := []Series{} for _, seriesSet := range c.currentSets { series = append(series, seriesSet.At()) diff --git a/storage/fanout_test.go b/storage/fanout_test.go index c8094ea333..66d86fc938 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -14,6 +14,7 @@ package storage import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -218,13 +219,53 @@ func (m *mockSeriesSet) Err() error { return nil } -func newMockSeries(lset labels.Labels, samples []sample) Series { - return &mockSeries{ - labels: func() labels.Labels { - return lset - }, - iterator: func() SeriesIterator { - return newListSeriesIterator(samples) - }, +var result []sample + +func makeSeriesSet(numSeries, numSamples int) SeriesSet { + series := []Series{} + for j := 0; j < numSeries; j++ { + labels := labels.Labels{{Name: "foo", Value: fmt.Sprintf("bar%d", j)}} + samples := []sample{} + for k := 0; k < numSamples; k++ { + samples = append(samples, sample{t: int64(k), v: float64(k)}) + } + series = append(series, newMockSeries(labels, samples)) + } + return newMockSeriesSet(series...) +} + +func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet { + seriesSets := []SeriesSet{} + for i := 0; i < numSeriesSets; i++ { + seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples)) + } + return NewMergeSeriesSet(seriesSets) +} + +func benchmarkDrain(seriesSet SeriesSet, b *testing.B) { + for n := 0; n < b.N; n++ { + for seriesSet.Next() { + result = drainSamples(seriesSet.At().Iterator()) + } + } +} + +func BenchmarkNoMergeSeriesSet_100_100(b *testing.B) { + seriesSet := makeSeriesSet(100, 100) + benchmarkDrain(seriesSet, b) +} + +func BenchmarkMergeSeriesSet(b *testing.B) { + for _, bm := range []struct { + numSeriesSets, numSeries, numSamples int + }{ + {1, 100, 100}, + {10, 100, 100}, + {100, 100, 100}, + } { + seriesSet := makeMergeSeriesSet(bm.numSeriesSets, bm.numSeries, bm.numSamples) + b.Run(fmt.Sprintf("%d_%d_%d", bm.numSeriesSets, bm.numSeries, bm.numSamples), func(b *testing.B) { + benchmarkDrain(seriesSet, b) + }) } }