diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index ed39690dfa..7e7f6e3ab9 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -498,6 +498,10 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err } func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { + if i == nil { + panic("nil iterator") + } + k = &dto.SampleKey{} rawKey := i.Key() if rawKey == nil { @@ -509,6 +513,10 @@ func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { } func extractSampleValue(i iterator) (v *dto.SampleValueSeries, err error) { + if i == nil { + panic("nil iterator") + } + v = &dto.SampleValueSeries{} err = proto.Unmarshal(i.Value(), v) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 590a0e4546..b9632bb39e 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -124,7 +124,6 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat } func (t *tieredStorage) rebuildDiskFrontier() (err error) { - fmt.Println("a1") begin := time.Now() defer func() { duration := time.Now().Sub(begin) @@ -354,11 +353,11 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { // standingOperations ops ) - fmt.Printf("Starting scan of %s...\n", scanJob) + // fmt.Printf("Starting scan of %s...\n", scanJob) if !(t.diskFrontier == nil || scanJob.fingerprint.Less(t.diskFrontier.firstFingerprint) || t.diskFrontier.lastFingerprint.Less(scanJob.fingerprint)) { - fmt.Printf("Using diskFrontier %s\n", t.diskFrontier) + // fmt.Printf("Using diskFrontier %s\n", t.diskFrontier) seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) - fmt.Printf("Using seriesFrontier %s\n", seriesFrontier) + // fmt.Printf("Using seriesFrontier %s\n", seriesFrontier) if err != nil { panic(err) } @@ -372,14 +371,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { for _, operation := range scanJob.operations { if seriesFrontier.lastTime.Before(operation.StartsAt()) { - fmt.Printf("operation %s occurs after %s; aborting...\n", operation, seriesFrontier.lastTime) + // fmt.Printf("operation %s occurs after %s; aborting...\n", operation, seriesFrontier.lastTime) break } scanJob.operations = scanJob.operations[1:len(scanJob.operations)] if operation.StartsAt().Before(seriesFrontier.firstSupertime) { - fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime) + // fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime) continue } @@ -388,6 +387,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + // XXX: Use frontiers to manage out of range queries. iterator.Seek(rawKey) foundKey, err = extractSampleKey(iterator) @@ -401,13 +401,11 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { ) if !((operation.StartsAt().Before(fst)) || lst.Before(operation.StartsAt())) { - fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey) + // fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey) foundValue, err = extractSampleValue(iterator) if err != nil { panic(err) } - - fmt.Printf("f -> %s\n", foundValue) } else if operation.StartsAt().Before(fst) { fmt.Printf("operation %s may occur in next entity; fast forwarding...\n", operation) panic("oops") @@ -423,14 +421,20 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { index = sort.Search(elementCount, searcher) ) - foundValue.Value = foundValue.Value[index:elementCount] + if index != elementCount { + if index > 0 { + index-- + } + foundValue.Value = foundValue.Value[index:elementCount] + } switch operation.(type) { case getValuesAtTimeOp: if len(foundValue.Value) > 0 { view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[0].Timestamp, 0), model.SampleValue(*foundValue.Value[0].Value)) } if len(foundValue.Value) > 1 { + view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[1].Timestamp, 0), model.SampleValue(*foundValue.Value[1].Value)) } default: panic("unhandled") diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go new file mode 100644 index 0000000000..10e3e14abf --- /dev/null +++ b/storage/metric/tiered_test.go @@ -0,0 +1,225 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/utility/test" + "io/ioutil" + "os" + "testing" + "time" +) + +func testMakeView(t test.Tester) { + type in struct { + atTime []getValuesAtTimeOp + atInterval []getValuesAtIntervalOp + alongRange []getValuesAlongRangeOp + } + + type out struct { + atTime [][]model.SamplePair + atInterval [][]model.SamplePair + alongRange [][]model.SamplePair + } + var ( + instant = time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC) + metric = model.Metric{"name": "request_count"} + fingerprint = model.NewFingerprintFromMetric(metric) + scenarios = []struct { + data []model.Sample + in in + out out + }{ + { + data: []model.Sample{ + { + Metric: metric, + Value: 0, + Timestamp: instant, + }, + }, + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant, + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant, + Value: 0, + }, + }, + }, + }, + }, + { + data: []model.Sample{ + { + Metric: metric, + Value: 0, + Timestamp: instant, + }, + { + Metric: metric, + Value: 1, + Timestamp: instant.Add(time.Second), + }, + }, + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant, + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant, + Value: 0, + }, + { + Timestamp: instant.Add(time.Second), + Value: 1, + }, + }, + }, + }, + }, + // { + // data: []model.Sample{ + // { + // Metric: metric, + // Value: 0, + // Timestamp: instant, + // }, + // { + // Metric: metric, + // Value: 1, + // Timestamp: instant.Add(time.Second), + // }, + // { + // Metric: metric, + // Value: 2, + // Timestamp: instant.Add(time.Second * 2), + // }, + // }, + // in: in{ + // atTime: []getValuesAtTimeOp{ + // { + // time: instant.Add(time.Second), + // }, + // }, + // }, + // out: out{ + // atTime: [][]model.SamplePair{ + // { + // { + // Timestamp: instant.Add(time.Second), + // Value: 1, + // }, + // { + // Timestamp: instant.Add(time.Second * 2), + // Value: 2, + // }, + // }, + // }, + // }, + // }, + } + ) + + for i, scenario := range scenarios { + var ( + temporary, _ = ioutil.TempDir("", "test_make_view") + tiered = NewTieredStorage(100, 100, 100, time.Second, time.Second, 0*time.Second, temporary) + ) + + if tiered == nil { + t.Fatalf("%d. tiered == nil", i) + } + + go tiered.Serve() + defer tiered.Drain() + + defer func() { + os.RemoveAll(temporary) + }() + + for j, datum := range scenario.data { + err := tiered.AppendSample(datum) + if err != nil { + t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) + } + } + + tiered.Flush() + + requestBuilder := NewViewRequestBuilder() + + for _, atTime := range scenario.in.atTime { + requestBuilder.GetMetricAtTime(fingerprint, atTime.time) + } + + for _, atInterval := range scenario.in.atInterval { + requestBuilder.GetMetricAtInterval(fingerprint, atInterval.from, atInterval.through, atInterval.interval) + } + + for _, alongRange := range scenario.in.alongRange { + requestBuilder.GetMetricRange(fingerprint, alongRange.from, alongRange.through) + } + + v, err := tiered.MakeView(requestBuilder, time.Second*5) + + if err != nil { + t.Fatalf("%d. failed due to %s", i, err) + } + + for j, atTime := range scenario.in.atTime { + actual := v.GetValueAtTime(fingerprint, atTime.time) + + if len(actual) != len(scenario.out.atTime[j]) { + t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(scenario.out.atTime[j]), len(actual)) + } + + for k, value := range scenario.out.atTime[j] { + if value.Value != actual[k].Value { + t.Fatalf("%d.%d.%d expected %d value, got %d", i, j, k, value.Value, actual[j].Value) + } + if !value.Timestamp.Equal(actual[k].Timestamp) { + t.Fatalf("%d.%d.%d expected %s timestamp, got %s", i, j, k, value.Timestamp, actual[j].Timestamp) + } + } + } + + tiered.Drain() + } +} + +func TestMakeView(t *testing.T) { + testMakeView(t) +} + +func BenchmarkMakeView(b *testing.B) { + for i := 0; i < b.N; i++ { + testMakeView(b) + } +} diff --git a/storage/metric/view.go b/storage/metric/view.go index 1ef196a823..e1b99ecb24 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -148,13 +148,13 @@ func (v view) GetValueAtTime(f model.Fingerprint, t time.Time) (s []model.Sample s = append(s, model.SamplePair{ Timestamp: time.Time(iterator.Key().(skipListTime)), - Value: iterator.Value().(model.SampleValue), + Value: iterator.Value().(value).get(), }) - if iterator.Next() { + if iterator.Previous() { s = append(s, model.SamplePair{ Timestamp: time.Time(iterator.Key().(skipListTime)), - Value: iterator.Value().(model.SampleValue), + Value: iterator.Value().(value).get(), }) }