diff --git a/querier.go b/querier.go index 2602b72324..ee38f20291 100644 --- a/querier.go +++ b/querier.go @@ -2,6 +2,7 @@ package tsdb import ( "fmt" + "sort" "strings" "github.com/fabxc/tsdb/chunks" @@ -439,36 +440,73 @@ func (s *chainedSeries) Iterator() SeriesIterator { } // chainedSeriesIterator implements a series iterater over a list -// of time-sorted, non-overlapping chunks. +// of time-sorted, non-overlapping iterators. type chainedSeriesIterator struct { - series []SeriesIterator + mints []int64 // minimum timestamps for each iterator + series []SeriesIterator // iterators in time order + + i int + cur SeriesIterator } func (it *chainedSeriesIterator) Seek(t int64) bool { - return false -} + x := sort.Search(len(it.mints), func(i int) bool { return it.mints[i] >= t }) -func (it *chainedSeriesIterator) Values() (t int64, v float64) { - return 0, 0 + if x == len(it.mints) { + return false + } + if it.mints[x] == t { + if x == 0 { + return false + } + x-- + } + + it.i = x + it.cur = it.series[x] + + for it.cur.Next() { + t0, _ := it.cur.Values() + if t0 >= t { + break + } + } + return false } func (it *chainedSeriesIterator) Next() bool { - return false + if it.cur.Next() { + return true + } + if err := it.cur.Err(); err != nil { + return false + } + if it.i == len(it.series)-1 { + return false + } + + it.i++ + it.cur = it.series[it.i] + + return it.Next() +} + +func (it *chainedSeriesIterator) Values() (t int64, v float64) { + return it.cur.Values() } func (it *chainedSeriesIterator) Err() error { - return nil + return it.cur.Err() } // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - // minTimes []int64 + mints []int64 // minimum timestamps for each iterator chunks []chunks.Chunk i int cur chunks.Iterator - err error } func newChunkSeriesIterator(cs []chunks.Chunk) *chunkSeriesIterator { @@ -480,10 +518,25 @@ func newChunkSeriesIterator(cs []chunks.Chunk) *chunkSeriesIterator { } func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { - // TODO(fabxc): skip to relevant chunk. - for it.Next() { - if ts, _ := it.Values(); ts >= t { - return true + x := sort.Search(len(it.mints), func(i int) bool { return it.mints[i] >= t }) + + if x == len(it.mints) { + return false + } + if it.mints[x] == t { + if x == 0 { + return false + } + x-- + } + + it.i = x + it.cur = it.chunks[x].Iterator() + + for it.cur.Next() { + t0, _ := it.cur.Values() + if t0 >= t { + break } } return false