diff --git a/promql/engine_test.go b/promql/engine_test.go index e6794f455b..ad3bb0513d 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -253,13 +253,23 @@ load 10s { Query: "metric", Result: Matrix{Series{ - Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 2, T: 2000}}, + Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, Metric: labels.FromStrings("__name__", "metric")}, }, Start: time.Unix(0, 0), End: time.Unix(2, 0), Interval: time.Second, }, + { + Query: "metric", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + Start: time.Unix(0, 0), + End: time.Unix(10, 0), + Interval: 5 * time.Second, + }, } for _, c := range cases { diff --git a/storage/buffer.go b/storage/buffer.go index d3687d94fd..0159cfa3fb 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -13,7 +13,9 @@ package storage -import "math" +import ( + "math" +) // BufferedSeriesIterator wraps an iterator with a look-back buffer. type BufferedSeriesIterator struct { @@ -21,6 +23,7 @@ type BufferedSeriesIterator struct { buf *sampleRing lastTime int64 + ok bool } // NewBuffer returns a new iterator that buffers the values within the time range @@ -30,6 +33,7 @@ func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { it: it, buf: newSampleRing(delta, 16), lastTime: math.MinInt64, + ok: true, } it.Next() @@ -56,8 +60,8 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { if t0 > b.lastTime { b.buf.reset() - ok := b.it.Seek(t0) - if !ok { + b.ok = b.it.Seek(t0) + if !b.ok { return false } b.lastTime, _ = b.Values() @@ -77,14 +81,19 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { // Next advances the iterator to the next element. func (b *BufferedSeriesIterator) Next() bool { + if !b.ok { + return false + } + // Add current element to buffer before advancing. b.buf.add(b.it.At()) - ok := b.it.Next() - if ok { + b.ok = b.it.Next() + if b.ok { b.lastTime, _ = b.Values() } - return ok + + return b.ok } // Values returns the current element of the iterator. diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 6855a1daea..a03081ab6a 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -137,6 +137,26 @@ func TestBufferedSeriesIterator(t *testing.T) { require.False(t, it.Next(), "next succeeded unexpectedly") } +// At() should not be called once Next() returns false. +func TestBufferedSeriesIteratorNoBadAt(t *testing.T) { + done := false + + m := &mockSeriesIterator{ + seek: func(int64) bool { return false }, + at: func() (int64, float64) { + require.False(t, done) + done = true + return 0, 0 + }, + next: func() bool { return !done }, + err: func() error { return nil }, + } + + it := NewBuffer(m, 60) + it.Next() + it.Next() +} + func BenchmarkBufferedSeriesIterator(b *testing.B) { var ( samples []sample @@ -165,16 +185,16 @@ func BenchmarkBufferedSeriesIterator(b *testing.B) { } type mockSeriesIterator struct { - seek func(int64) bool - values func() (int64, float64) - next func() bool - err func() error + seek func(int64) bool + at func() (int64, float64) + next func() bool + err func() error } -func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } -func (m *mockSeriesIterator) Values() (int64, float64) { return m.values() } -func (m *mockSeriesIterator) Next() bool { return m.next() } -func (m *mockSeriesIterator) Err() error { return m.err() } +func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } +func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } +func (m *mockSeriesIterator) Next() bool { return m.next() } +func (m *mockSeriesIterator) Err() error { return m.err() } type mockSeries struct { labels func() labels.Labels