diff --git a/storage/merge.go b/storage/merge.go index 3581f61d0c..e3026cf5ee 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -16,6 +16,7 @@ package storage import ( "bytes" "container/heap" + "math" "sort" "strings" "sync" @@ -418,8 +419,7 @@ func (h *genericSeriesSetHeap) Pop() interface{} { // with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective // this never happens. // -// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead -// to handle overlaps between series. +// It's optimized for non-overlap cases as well. func ChainedSeriesMerge(series ...Series) Series { if len(series) == 0 { return nil @@ -438,16 +438,20 @@ func ChainedSeriesMerge(series ...Series) Series { // chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps // order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same -// timestamp are dropped. +// timestamp are dropped. It's optimized for non-overlap cases as well. type chainSampleIterator struct { iterators []chunkenc.Iterator h samplesIteratorHeap + + curr chunkenc.Iterator + lastt int64 } func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { return &chainSampleIterator{ iterators: iterators, h: nil, + lastt: math.MinInt64, } } @@ -458,47 +462,74 @@ func (c *chainSampleIterator) Seek(t int64) bool { heap.Push(&c.h, iter) } } - return len(c.h) > 0 + if len(c.h) > 0 { + c.curr = heap.Pop(&c.h).(chunkenc.Iterator) + return true + } + c.curr = nil + return false } func (c *chainSampleIterator) At() (t int64, v float64) { - if len(c.h) == 0 { - panic("chainSampleIterator.At() called after .Next() returned false.") + if c.curr == nil { + panic("chainSampleIterator.At() called before first .Next() or after .Next() returned false.") } - - return c.h[0].At() + return c.curr.At() } func (c *chainSampleIterator) Next() bool { if c.h == nil { - for _, iter := range c.iterators { + c.h = samplesIteratorHeap{} + // We call c.curr.Next() as the first thing below. + // So, we don't call Next() on it here. + c.curr = c.iterators[0] + for _, iter := range c.iterators[1:] { if iter.Next() { heap.Push(&c.h, iter) } } - - return len(c.h) > 0 } - if len(c.h) == 0 { + if c.curr == nil { return false } - currt, _ := c.At() - for len(c.h) > 0 { - nextt, _ := c.h[0].At() - // All but one of the overlapping samples will be dropped. - if nextt != currt { - break + var currt int64 + for { + if c.curr.Next() { + currt, _ = c.curr.At() + if currt == c.lastt { + // Ignoring sample for the same timestamp. + continue + } + if len(c.h) == 0 { + // curr is the only iterator remaining, + // no need to check with the heap. + break + } + + // Check current iterator with the top of the heap. + if nextt, _ := c.h[0].At(); currt < nextt { + // Current iterator has smaller timestamp than the heap. + break + } + // Current iterator does not hold the smallest timestamp. + heap.Push(&c.h, c.curr) + } else if len(c.h) == 0 { + // No iterator left to iterate. + c.curr = nil + return false } - iter := heap.Pop(&c.h).(chunkenc.Iterator) - if iter.Next() { - heap.Push(&c.h, iter) + c.curr = heap.Pop(&c.h).(chunkenc.Iterator) + currt, _ = c.curr.At() + if currt != c.lastt { + break } } - return len(c.h) > 0 + c.lastt = currt + return true } func (c *chainSampleIterator) Err() error { diff --git a/storage/merge_test.go b/storage/merge_test.go index 984c4d5f20..2cad88781f 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -448,10 +448,10 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { name: "three in chained overlap", input: []ChunkSeries{ NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 6}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}), NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}), }, - expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 6}, sample{10, 10}}), + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 66}, sample{10, 10}}), }, { name: "three in chained overlap complex",