From 9b83d8330a5da11ba3c50b42d891c0e505566be7 Mon Sep 17 00:00:00 2001 From: Fiona Liao Date: Mon, 26 Apr 2021 23:43:22 +0100 Subject: [PATCH] Fix memSafeIterator.Seek() (#8748) * Add range query test cases This includes a couple of failing ones that double count some points due to the iterator seek bug. Co-authored-by: Oleg Zaytsev Signed-off-by: Fiona Liao * Add Seek() implementation for memSafeIterator Previously, calling memSafeIterator.Seek() would call the Seek() method on its embedded iterator. This was causing the embedded iterator and the memSafeIterator to get out of sync because when the embedded Seek() moved to the next element of the embedded iterator, memSafeIterator didn't "know" about it. memSafeIterator has to "know" when the embedded iterator has moved to be able to work out when it should be reading from its buffer rather than the embedded iterator. Used same logic as for xorIterator.Seek() (which in runtime is used as the embedded iterator) - return false if the iterator has an error and try to move to next element if the required time hasn't been reached, or if no elements have been read yet. The memSafeIterator.Next() method is being called so memSafeIterator.i is always accurate. Signed-off-by: Fiona Liao * Add tsdb package test Signed-off-by: Fiona Liao Co-authored-by: Oleg Zaytsev --- promql/engine_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++ tsdb/head.go | 17 +++++++ tsdb/head_test.go | 68 ++++++++++++++++++++++++++ 3 files changed, 193 insertions(+) diff --git a/promql/engine_test.go b/promql/engine_test.go index 9dea17315f..d1afb7eb3e 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -2327,3 +2327,111 @@ func TestEngineOptsValidation(t *testing.T) { } } } + +func TestRangeQuery(t *testing.T) { + cases := []struct { + Name string + Load string + Query string + Result parser.Value + Start time.Time + End time.Time + Interval time.Duration + }{ + { + Name: "sum_over_time with all values", + Load: `load 30s + bar 0 1 10 100 1000`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 60 * time.Second, + }, + { + Name: "sum_over_time with trailing values", + Load: `load 30s + bar 0 1 10 100 1000 0 0 0 0`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 60 * time.Second, + }, + { + Name: "sum_over_time with all values long", + Load: `load 30s + bar 0 1 10 100 1000 10000 100000 1000000 10000000`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}, {V: 110000, T: 180000}, {V: 11000000, T: 240000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(240, 0), + Interval: 60 * time.Second, + }, + { + Name: "sum_over_time with all values random", + Load: `load 30s + bar 5 17 42 2 7 905 51`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 5, T: 0}, {V: 59, T: 60000}, {V: 9, T: 120000}, {V: 956, T: 180000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(180, 0), + Interval: 60 * time.Second, + }, + { + Name: "metric query", + Load: `load 30s + metric 1+1x4`, + Query: "metric", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 3, T: 60000}, {V: 5, T: 120000}}, + Metric: labels.Labels{labels.Label{Name: "__name__", Value: "metric"}}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 1 * time.Minute, + }, + { + Name: "metric query with trailing values", + Load: `load 30s + metric 1+1x8`, + Query: "metric", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 3, T: 60000}, {V: 5, T: 120000}}, + Metric: labels.Labels{labels.Label{Name: "__name__", Value: "metric"}}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 1 * time.Minute, + }, + } + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + test, err := NewTest(t, c.Load) + require.NoError(t, err) + defer test.Close() + + err = test.Run() + require.NoError(t, err) + + qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval) + require.NoError(t, err) + + res := qry.Exec(test.Context()) + require.NoError(t, res.Err) + require.Equal(t, c.Result, res.Value) + }) + } +} diff --git a/tsdb/head.go b/tsdb/head.go index 2a300ce2dd..595cd48d74 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2538,6 +2538,23 @@ type memSafeIterator struct { buf [4]sample } +func (it *memSafeIterator) Seek(t int64) bool { + if it.Err() != nil { + return false + } + + ts, _ := it.At() + + for t > ts || it.i == -1 { + if !it.Next() { + return false + } + ts, _ = it.At() + } + + return true +} + func (it *memSafeIterator) Next() bool { if it.i+1 >= it.stopAfter { return false diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5e1d73c2f7..dfd916b761 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2067,3 +2067,71 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { require.Equal(b, 9, len(actualValues)) } } + +func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { + dir, err := ioutil.TempDir("", "iterator_seek") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) + require.NoError(t, err) + defer func() { + require.NoError(t, chunkDiskMapper.Close()) + }() + + s := newMemSeries(labels.Labels{}, 1, 500, nil) + + for i := 0; i < 7; i++ { + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) + require.True(t, ok, "sample append failed") + } + + it := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) + _, ok := it.(*memSafeIterator) + require.True(t, ok) + + // First point. + ok = it.Seek(0) + require.True(t, ok) + ts, val := it.At() + require.Equal(t, int64(0), ts) + require.Equal(t, float64(0), val) + + // Advance one point. + ok = it.Next() + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(1), ts) + require.Equal(t, float64(1), val) + + // Seeking an older timestamp shouldn't cause the iterator to go backwards. + ok = it.Seek(0) + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(1), ts) + require.Equal(t, float64(1), val) + + // Seek into the buffer. + ok = it.Seek(3) + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(3), ts) + require.Equal(t, float64(3), val) + + // Iterate through the rest of the buffer. + for i := 4; i < 7; i++ { + ok = it.Next() + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(i), ts) + require.Equal(t, float64(i), val) + } + + // Run out of elements in the iterator. + ok = it.Next() + require.False(t, ok) + ok = it.Seek(7) + require.False(t, ok) +}