diff --git a/storage/merge.go b/storage/merge.go index 3581f61d0c..9ef8690a7b 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -16,6 +16,7 @@ package storage import ( "bytes" "container/heap" + "fmt" "sort" "strings" "sync" @@ -418,8 +419,7 @@ func (h *genericSeriesSetHeap) Pop() interface{} { // with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective // this never happens. // -// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead -// to handle overlaps between series. +// It's optimized for non-overlap cases as well. func ChainedSeriesMerge(series ...Series) Series { if len(series) == 0 { return nil @@ -438,10 +438,12 @@ func ChainedSeriesMerge(series ...Series) Series { // chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps // order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same -// timestamp are dropped. +// timestamp are dropped. It's optimized for non-overlap cases as well. type chainSampleIterator struct { iterators []chunkenc.Iterator h samplesIteratorHeap + + curr chunkenc.Iterator } func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { @@ -462,43 +464,42 @@ func (c *chainSampleIterator) Seek(t int64) bool { } func (c *chainSampleIterator) At() (t int64, v float64) { - if len(c.h) == 0 { + if c.h == nil { panic("chainSampleIterator.At() called after .Next() returned false.") } - - return c.h[0].At() + return c.curr.At() } func (c *chainSampleIterator) Next() bool { if c.h == nil { - for _, iter := range c.iterators { + c.curr = c.iterators[0] + for _, iter := range c.iterators[1:] { if iter.Next() { heap.Push(&c.h, iter) } } - - return len(c.h) > 0 + } else if len(c.h) == 0 { + return c.curr != nil && c.curr.Next() } - if len(c.h) == 0 { - return false - } - - currt, _ := c.At() - for len(c.h) > 0 { - nextt, _ := c.h[0].At() - // All but one of the overlapping samples will be dropped. - if nextt != currt { - break + for { + if c.curr.Next() { + currt, _ := c.curr.At() + nextt, _ := c.h[0].At() + if currt < nextt { + return true + } + if currt == nextt { + fmt.Println("same ts", currt, nextt) + // Ignoring sample. + continue + } + heap.Push(&c.h, c.curr) } - iter := heap.Pop(&c.h).(chunkenc.Iterator) - if iter.Next() { - heap.Push(&c.h, iter) - } + c.curr = heap.Pop(&c.h).(chunkenc.Iterator) + return true } - - return len(c.h) > 0 } func (c *chainSampleIterator) Err() error {