From 2d5de99fbf757a798f6628e89c074f40ee5596f3 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 21 Jun 2013 10:16:41 +0200 Subject: [PATCH 1/4] Regard in-memory series as new. This commit ensures that series that exist only in-memory and not on-disk are not regarded as too old for operation exclusion. --- storage/metric/memory.go | 14 ++++++++++++-- storage/metric/tiered.go | 25 ++++++++++++++++++++----- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 252d1ac60e..00feb4f3f0 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -14,11 +14,12 @@ package metric import ( - "github.com/prometheus/prometheus/model" - "github.com/prometheus/prometheus/utility" "sort" "sync" "time" + + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/utility" ) // Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of @@ -370,6 +371,15 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (mod return metric, nil } +func (s *memorySeriesStorage) HasFingerprint(f *model.Fingerprint) bool { + s.RLock() + defer s.RUnlock() + + _, has := s.fingerprintToSeries[*f] + + return has +} + func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values { s.RLock() defer s.RUnlock() diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index aba2511b86..ccbf8b13fb 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -309,20 +309,35 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e // BUG(julius): Make this configurable by query layer. i = i.Add(-stalenessLimit) - wm, ok := t.wmCache.Get(f) - if !ok { + wm, cacheHit := t.wmCache.Get(f) + if !cacheHit { value := &dto.MetricHighWatermark{} - present, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value) + diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value) if err != nil { return false, err } - if present { + + if diskHit { wmTime := time.Unix(*value.Timestamp, 0).UTC() t.wmCache.Set(f, &Watermarks{High: wmTime}) return wmTime.Before(i), nil } - return true, nil + + if !t.memoryArena.HasFingerprint(f) { + return true, nil + } + + samples := t.memoryArena.CloneSamples(f) + if len(samples) == 0 { + return true, nil + } + + newest := samples[0].Timestamp + t.wmCache.Set(f, &Watermarks{High: newest}) + + return false, nil } + return wm.High.Before(i), nil } From ee840904d2f09e7d6ca444b170fc0e9fc3658dc3 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 21 Jun 2013 11:32:53 +0200 Subject: [PATCH 2/4] Code Review: !Before -> After. --- storage/metric/leveldb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 7de237e170..09f27e3c90 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -506,7 +506,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger } // BUG(matt): Repace this with watermark management. - if !newestSampleTimestamp.Before(time.Unix(value.GetTimestamp(), 0)) { + if newestSampleTimestamp.After(time.Unix(value.GetTimestamp(), 0)) { value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) batch.Put(fingerprint.ToDTO(), value) } From 5daa0a09ea40425a5e68aa9f9e8f34e26d6b9849 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 21 Jun 2013 18:34:08 +0200 Subject: [PATCH 3/4] Code Review: Swap ordering of watermark getting. A test for Julius. --- storage/metric/tiered.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index ccbf8b13fb..61b03b6753 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -311,6 +311,16 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e wm, cacheHit := t.wmCache.Get(f) if !cacheHit { + if t.memoryArena.HasFingerprint(f) { + samples := t.memoryArena.CloneSamples(f) + if len(samples) > 0 { + newest := samples[0].Timestamp + t.wmCache.Set(f, &Watermarks{High: newest}) + + return newest.Before(i), nil + } + } + value := &dto.MetricHighWatermark{} diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value) if err != nil { @@ -320,21 +330,10 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e if diskHit { wmTime := time.Unix(*value.Timestamp, 0).UTC() t.wmCache.Set(f, &Watermarks{High: wmTime}) + return wmTime.Before(i), nil } - if !t.memoryArena.HasFingerprint(f) { - return true, nil - } - - samples := t.memoryArena.CloneSamples(f) - if len(samples) == 0 { - return true, nil - } - - newest := samples[0].Timestamp - t.wmCache.Set(f, &Watermarks{High: newest}) - return false, nil } From ecb9c7bb9d26cd0ad085272701733571986503cc Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 21 Jun 2013 21:17:50 +0200 Subject: [PATCH 4/4] Code Review: Swap ordering of elements. --- storage/metric/tiered.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 61b03b6753..5c509e8a56 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -314,7 +314,7 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e if t.memoryArena.HasFingerprint(f) { samples := t.memoryArena.CloneSamples(f) if len(samples) > 0 { - newest := samples[0].Timestamp + newest := samples[len(samples)-1].Timestamp t.wmCache.Set(f, &Watermarks{High: newest}) return newest.Before(i), nil