diff --git a/model/dto.go b/model/dto.go index 80db6806b9..113ba0676d 100644 --- a/model/dto.go +++ b/model/dto.go @@ -21,6 +21,7 @@ import ( dto "github.com/matttproud/prometheus/model/generated" "io" "sort" + "time" ) func SampleToMetricDTO(s *Sample) *dto.Metric { @@ -146,3 +147,14 @@ func MessageToFingerprintDTO(message proto.Message) (*dto.Fingerprint, error) { return nil, errors.New("Unknown error in generating FingerprintDTO from message.") } + +func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample { + s := &Sample{ + Value: SampleValue(*v.Value), + Timestamp: *t, + } + + s.Metric = *m + + return s +} diff --git a/model/metric.go b/model/metric.go index fdb2802b37..e73e2597d2 100644 --- a/model/metric.go +++ b/model/metric.go @@ -14,6 +14,7 @@ package model import ( + "fmt" "time" ) @@ -45,6 +46,10 @@ type Metric map[LabelName]LabelValue // remedied down the road. type SampleValue float32 +func (s SampleValue) String() string { + return fmt.Sprintf("%f", s) +} + type Sample struct { Metric Metric Value SampleValue diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 05c1865c24..55685c686f 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -19,8 +19,8 @@ import ( ) type StalenessPolicy struct { - AllowStale bool - InterpolationInterval time.Duration + AllowStale bool + MaximumStaleness time.Duration } // MetricPersistence is a system for storing metric samples in a persistence diff --git a/storage/metric/leveldb/reading.go b/storage/metric/leveldb/reading.go index 62cc306c26..eeb8eee601 100644 --- a/storage/metric/leveldb/reading.go +++ b/storage/metric/leveldb/reading.go @@ -14,6 +14,7 @@ package leveldb import ( + "bytes" "code.google.com/p/goprotobuf/proto" "errors" "github.com/matttproud/prometheus/coding" @@ -209,8 +210,224 @@ func (l *LevelDBMetricPersistence) GetFirstValue(m *model.Metric) (*model.Sample panic("not implemented") } -func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, s *metric.StalenessPolicy) (*model.Sample, error) { - panic("not implemented") +func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { + yDelta := y2 - y1 + xDelta := x2.Sub(x1) + + dDt := yDelta / float32(xDelta) + offset := float32(e.Sub(x1)) + + return y1 + (offset * dDt) +} + +type iterator interface { + Close() + Key() []byte + Next() + Prev() + Seek([]byte) + SeekToFirst() + SeekToLast() + Valid() bool + Value() []byte +} + +func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err error) { + byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + i.Seek(byteKey) + if !i.Valid() { + return + } + + var ( + retrievedKey *dto.SampleKey = &dto.SampleKey{} + ) + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + if *retrievedKey.Fingerprint.Signature != *k.Fingerprint.Signature { + return + } + + if bytes.Equal(retrievedKey.Timestamp, k.Timestamp) { + return true, nil + } + + i.Prev() + if !i.Valid() { + return + } + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + b = *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature + + return +} + +func doesKeyHavePrecursor(k *dto.SampleKey, i iterator) (b bool, err error) { + byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + i.Seek(byteKey) + + if !i.Valid() { + i.SeekToFirst() + } + + var ( + retrievedKey *dto.SampleKey = &dto.SampleKey{} + ) + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature + if !signaturesEqual { + return + } + + keyTime := indexable.DecodeTime(k.Timestamp) + retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp) + + return retrievedTime.Before(keyTime), nil +} + +func doesKeyHaveSuccessor(k *dto.SampleKey, i iterator) (b bool, err error) { + byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + i.Seek(byteKey) + + if !i.Valid() { + i.SeekToLast() + } + + var ( + retrievedKey *dto.SampleKey = &dto.SampleKey{} + ) + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature + if !signaturesEqual { + return + } + + keyTime := indexable.DecodeTime(k.Timestamp) + retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp) + + return retrievedTime.After(keyTime), nil +} + +func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, s *metric.StalenessPolicy) (sample *model.Sample, err error) { + d := model.MetricToDTO(m) + + f, err := model.MessageToFingerprintDTO(d) + if err != nil { + return + } + + // Candidate for Refactoring + k := &dto.SampleKey{ + Fingerprint: f, + Timestamp: indexable.EncodeTime(*t), + } + + e, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + iterator, closer, err := l.metricSamples.GetIterator() + if err != nil { + return + } + defer closer.Close() + + iterator.Seek(e) + + var ( + firstKey *dto.SampleKey = &dto.SampleKey{} + firstValue *dto.SampleValue = nil + ) + + within, err := isKeyInsideRecordedInterval(k, iterator) + if err != nil || !within { + return + } + + for iterator = iterator; iterator.Valid(); iterator.Prev() { + err := proto.Unmarshal(iterator.Key(), firstKey) + if err != nil { + return nil, err + } + + if *firstKey.Fingerprint.Signature == *k.Fingerprint.Signature { + firstValue = &dto.SampleValue{} + err := proto.Unmarshal(iterator.Value(), firstValue) + if err != nil { + return nil, err + } + + if indexable.DecodeTime(firstKey.Timestamp).Equal(indexable.DecodeTime(k.Timestamp)) { + return model.SampleFromDTO(m, t, firstValue), nil + } + break + } + } + + var ( + secondKey *dto.SampleKey = &dto.SampleKey{} + secondValue *dto.SampleValue = nil + ) + + iterator.Next() + if !iterator.Valid() { + return + } + + err = proto.Unmarshal(iterator.Key(), secondKey) + if err != nil { + + return + } + + if *secondKey.Fingerprint.Signature == *k.Fingerprint.Signature { + secondValue = &dto.SampleValue{} + err = proto.Unmarshal(iterator.Value(), secondValue) + if err != nil { + return + } + } + + firstTime := indexable.DecodeTime(firstKey.Timestamp) + secondTime := indexable.DecodeTime(secondKey.Timestamp) + interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t) + emission := &dto.SampleValue{ + Value: &interpolated, + } + + return model.SampleFromDTO(m, t, emission), nil } func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (*model.SampleSet, error) { diff --git a/storage/metric/leveldb/rule_integration_test.go b/storage/metric/leveldb/rule_integration_test.go new file mode 100644 index 0000000000..077374c36a --- /dev/null +++ b/storage/metric/leveldb/rule_integration_test.go @@ -0,0 +1,136 @@ +package leveldb + +import ( + "fmt" + "github.com/matttproud/prometheus/model" + "github.com/matttproud/prometheus/storage/metric" + "io/ioutil" + "os" + "testing" + "time" +) + +func TestGetValueAtTime(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + m := model.Metric{ + "name": "age_in_years", + } + + appendErr := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC), + Metric: m, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + p := &metric.StalenessPolicy{ + AllowStale: false, + } + + d := time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC) + s, sErr := persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s == nil { + t.Error("a sample should be returned") + } + + if s.Value != model.SampleValue(0) { + t.Error("an incorrect sample value was returned") + } + + if s.Timestamp != d { + t.Error("an incorrect timestamp for the sample was returned") + } + + d = time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC) + + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s != nil { + t.Error("no sample should be returned") + } + + d = time.Date(1983, 3, 30, 0, 0, 0, 0, time.UTC) + + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s != nil { + t.Error("no sample should be returned") + } + + appendErr = persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(1), + Timestamp: time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC), + Metric: m, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + d = time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC) + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s == nil { + t.Error("a sample should be returned") + } + + if s.Value != model.SampleValue(1) { + t.Error("an incorrect sample value was returned") + } + + if s.Timestamp != d { + t.Error("an incorrect timestamp for the sample was returned") + } + + d = time.Date(1984, 9, 24, 0, 0, 0, 0, time.UTC) + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s == nil { + t.Error("a sample should be returned") + } + + if fmt.Sprintf("%f", s.Value) != model.SampleValue(0.487671).String() { + t.Errorf("an incorrect sample value was returned: %s\n", s.Value) + } + + if s.Timestamp != d { + t.Error("an incorrect timestamp for the sample was returned") + } +}