From a73f061d3c6169a08e0736cd0749f0bf9f9cb16f Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sat, 8 Jun 2013 10:27:44 +0200 Subject: [PATCH] Persist solely Protocol Buffers. An design question was open for me in the beginning was whether to serialize other types to disk, but Protocol Buffers quickly won out, which allows us to drop support for other types. This is a good start to cleaning up a lot of cruft in the storage stack and can let us eventually decouple the various moving parts into separate subsystems for easier reasoning. This commit is not strictly required, but it is a start to making the rest a lot more enjoyable to interact with. --- storage/metric/curator.go | 31 +++----- storage/metric/leveldb.go | 110 ++++++++++++--------------- storage/metric/processor.go | 37 ++++----- storage/metric/processor_test.go | 38 ++++----- storage/metric/tiered.go | 14 +--- storage/raw/index/interface.go | 10 +-- storage/raw/index/leveldb/leveldb.go | 27 +++---- storage/raw/interface.go | 17 +++-- storage/raw/leveldb/batch.go | 21 ++--- storage/raw/leveldb/leveldb.go | 50 ++++++------ storage/raw/leveldb/test/fixtures.go | 9 ++- tools/dumper/main.go | 13 ++-- web/web.go | 1 - 13 files changed, 167 insertions(+), 211 deletions(-) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index d9920cea8d..33b047cd5f 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -206,10 +206,10 @@ func (w watermarkFilter) shouldStop() bool { return len(w.stop) != 0 } -func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (remark *model.CurationRemark, err error) { +func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (*model.CurationRemark, error) { rawSignature, err := processor.Signature() if err != nil { - return + return nil, err } curationKey := model.CurationKey{ @@ -220,30 +220,17 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge }.ToDTO() curationValue := &dto.CurationValue{} - rawKey := coding.NewPBEncoder(curationKey) - - has, err := states.Has(rawKey) + present, err := states.Get(curationKey, curationValue) if err != nil { - return + return nil, err } - if !has { - return + if !present { + return nil, nil } - rawCurationValue, err := states.Get(rawKey) - if err != nil { - return - } + remark := model.NewCurationRemarkFromDTO(curationValue) - err = proto.Unmarshal(rawCurationValue, curationValue) - if err != nil { - return - } - - baseRemark := model.NewCurationRemarkFromDTO(curationValue) - remark = &baseRemark - - return + return &remark, nil } func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) { @@ -386,7 +373,7 @@ func (w watermarkOperator) refreshCurationRemark(f *model.Fingerprint, finished LastCompletionTimestamp: finished, }.ToDTO() - err = w.curationState.Put(coding.NewPBEncoder(curationKey), coding.NewPBEncoder(curationValue)) + err = w.curationState.Put(curationKey, curationValue) return } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index d4bfc6cb33..511748e251 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -14,25 +14,25 @@ package metric import ( - "code.google.com/p/goprotobuf/proto" "flag" "fmt" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage" - index "github.com/prometheus/prometheus/storage/raw/index/leveldb" - "github.com/prometheus/prometheus/storage/raw/leveldb" - "github.com/prometheus/prometheus/utility" "log" "sort" "sync" "time" + + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + index "github.com/prometheus/prometheus/storage/raw/index/leveldb" + + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw/leveldb" + "github.com/prometheus/prometheus/utility" ) -const ( - sortConcurrency = 2 -) +const sortConcurrency = 2 type LevelDBMetricPersistence struct { CurationRemarks *leveldb.LevelDBPersistence @@ -302,7 +302,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) + batch.Put(key, value) } err = l.labelNameToFingerprints.Commit(batch) @@ -375,7 +375,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) + batch.Put(key, value) } err = l.labelSetToFingerprints.Commit(batch) @@ -401,9 +401,7 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewPBEncoder(fingerprint.ToDTO()) - value := coding.NewPBEncoder(model.MetricToDTO(metric)) - batch.Put(key, value) + batch.Put(fingerprint.ToDTO(), model.MetricToDTO(metric)) } err = l.fingerprintToMetrics.Commit(batch) @@ -414,6 +412,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri return } +var existenceIdentity = &dto.MembershipIndexValue{} + // indexMetrics takes groups of samples, determines which ones contain metrics // that are unknown to the storage stack, and then proceeds to update all // affected indices. @@ -465,10 +465,8 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri batch := leveldb.NewBatch() defer batch.Close() - // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewPBEncoder(model.MetricToDTO(metric)) - batch.Put(key, key) + batch.Put(model.MetricToDTO(metric), existenceIdentity) } err = l.metricMembershipIndex.Commit(batch) @@ -492,27 +490,23 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger mutationCount := 0 for fingerprint, samples := range groups { - keyEncoded := coding.NewPBEncoder(fingerprint.ToDTO()) value := &dto.MetricHighWatermark{} newestSampleTimestamp := samples[len(samples)-1].Timestamp - - raw, err := l.MetricHighWatermarks.Get(keyEncoded) + present, err := l.MetricHighWatermarks.Get(fingerprint.ToDTO(), value) if err != nil { return err } - - if raw != nil { - err = proto.Unmarshal(raw, value) - if err != nil { - return err - } - - if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { - continue - } + if !present { + continue } + + // BUG(matt): Repace this with watermark management. + if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { + continue + } + value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewPBEncoder(value)) + batch.Put(fingerprint.ToDTO(), value) mutationCount++ } @@ -583,7 +577,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) + samplesBatch.Put(key, value) } } @@ -656,8 +650,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }(time.Now()) - dtoKey := coding.NewPBEncoder(dto) - value, err = l.metricMembershipIndex.Has(dtoKey) + value, err = l.metricMembershipIndex.Has(dto) return } @@ -669,8 +662,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }(time.Now()) - dtoKey := coding.NewPBEncoder(dto) - value, err = l.labelSetToFingerprints.Has(dtoKey) + value, err = l.labelSetToFingerprints.Has(dto) return } @@ -682,8 +674,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }(time.Now()) - dtoKey := coding.NewPBEncoder(dto) - value, err = l.labelNameToFingerprints.Has(dtoKey) + value, err = l.labelNameToFingerprints.Has(dto) return } @@ -698,15 +689,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewPBEncoder(labelSetDTO)) + unmarshaled := &dto.FingerprintCollection{} + present, err := l.labelSetToFingerprints.Get(labelSetDTO, unmarshaled) if err != nil { return fps, err } - - unmarshaled := &dto.FingerprintCollection{} - err = proto.Unmarshal(f, unmarshaled) - if err != nil { - return fps, err + if !present { + return nil, nil } set := utility.Set{} @@ -743,16 +732,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }(time.Now()) - raw, err := l.labelNameToFingerprints.Get(coding.NewPBEncoder(model.LabelNameToDTO(&labelName))) - if err != nil { - return - } - unmarshaled := &dto.FingerprintCollection{} - - err = proto.Unmarshal(raw, unmarshaled) + present, err := l.labelNameToFingerprints.Get(model.LabelNameToDTO(&labelName), unmarshaled) if err != nil { - return + return nil, err + } + if !present { + return nil, nil } for _, m := range unmarshaled.Member { @@ -760,7 +746,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L fps = append(fps, fp) } - return + return fps, nil } func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) { @@ -770,15 +756,13 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }(time.Now()) - raw, err := l.fingerprintToMetrics.Get(coding.NewPBEncoder(model.FingerprintToDTO(f))) - if err != nil { - return - } - unmarshaled := &dto.Metric{} - err = proto.Unmarshal(raw, unmarshaled) + present, err := l.fingerprintToMetrics.Get(model.FingerprintToDTO(f), unmarshaled) if err != nil { - return + return nil, err + } + if !present { + return nil, nil } m = model.Metric{} @@ -787,7 +771,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value) } - return + return m, nil } func (l LevelDBMetricPersistence) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values { diff --git a/storage/metric/processor.go b/storage/metric/processor.go index d913c69b17..1133b1ec31 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -14,14 +14,16 @@ package metric import ( - "code.google.com/p/goprotobuf/proto" "fmt" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/model" + "time" + + "code.google.com/p/goprotobuf/proto" + dto "github.com/prometheus/prometheus/model/generated" + + "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" - "time" ) // processor models a post-processing agent that performs work given a sample @@ -153,8 +155,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize: if !keyDropped { - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) keyDropped = true } pendingSamples = append(pendingSamples, unactedSamples...) @@ -165,15 +166,12 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi // If the number of pending writes equals the target group size case len(pendingSamples) == p.MinimumGroupSize: newSampleKey := pendingSamples.ToSampleKey(fingerprint) - key := coding.NewPBEncoder(newSampleKey.ToDTO()) - value := coding.NewPBEncoder(pendingSamples.ToDTO()) - pendingBatch.Put(key, value) + pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO()) pendingMutations++ lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) if len(unactedSamples) > 0 { if !keyDropped { - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) keyDropped = true } @@ -190,8 +188,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize: if !keyDropped { - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) keyDropped = true } remainder := p.MinimumGroupSize - len(pendingSamples) @@ -211,9 +208,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi if len(unactedSamples) > 0 || len(pendingSamples) > 0 { pendingSamples = append(pendingSamples, unactedSamples...) newSampleKey := pendingSamples.ToSampleKey(fingerprint) - key := coding.NewPBEncoder(newSampleKey.ToDTO()) - value := coding.NewPBEncoder(pendingSamples.ToDTO()) - pendingBatch.Put(key, value) + pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO()) pendingSamples = model.Values{} pendingMutations++ lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) @@ -320,24 +315,20 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist pendingBatch = nil case !sampleKey.MayContain(stopAt): - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) lastCurated = sampleKey.LastTimestamp sampleValues = model.Values{} pendingMutations++ case sampleKey.MayContain(stopAt): - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) pendingMutations++ sampleValues = sampleValues.TruncateBefore(stopAt) if len(sampleValues) > 0 { sampleKey = sampleValues.ToSampleKey(fingerprint) lastCurated = sampleKey.FirstTimestamp - newKey := coding.NewPBEncoder(sampleKey.ToDTO()) - newValue := coding.NewPBEncoder(sampleValues.ToDTO()) - pendingBatch.Put(newKey, newValue) + pendingBatch.Put(sampleKey.ToDTO(), sampleValues.ToDTO()) pendingMutations++ } else { lastCurated = sampleKey.LastTimestamp diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 2eeb12250e..d85ff7d2d6 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -14,15 +14,17 @@ package metric import ( - "code.google.com/p/goprotobuf/proto" "fmt" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage/raw/leveldb" - fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" "testing" "time" + + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" + + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage/raw/leveldb" ) type curationState struct { @@ -56,40 +58,40 @@ type out struct { sampleGroups []sampleGroup } -func (c curationState) Get() (key, value coding.Encoder) { +func (c curationState) Get() (key, value proto.Message) { signature, err := c.processor.Signature() if err != nil { panic(err) } - key = coding.NewPBEncoder(model.CurationKey{ + key = model.CurationKey{ Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint), ProcessorMessageRaw: signature, ProcessorMessageTypeName: c.processor.Name(), IgnoreYoungerThan: c.ignoreYoungerThan, - }.ToDTO()) + }.ToDTO() - value = coding.NewPBEncoder(model.CurationRemark{ + value = model.CurationRemark{ LastCompletionTimestamp: c.lastCurated, - }.ToDTO()) + }.ToDTO() return } -func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewPBEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewPBEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) +func (w watermarkState) Get() (key, value proto.Message) { + key = model.NewFingerprintFromRowKey(w.fingerprint).ToDTO() + value = model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO() return } -func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewPBEncoder(model.SampleKey{ +func (s sampleGroup) Get() (key, value proto.Message) { + key = model.SampleKey{ Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint), FirstTimestamp: s.values[0].Timestamp, LastTimestamp: s.values[len(s.values)-1].Timestamp, SampleCount: uint32(len(s.values)), - }.ToDTO()) + }.ToDTO() - value = coding.NewPBEncoder(s.values.ToDTO()) + value = s.values.ToDTO() return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 1d76e63fa8..bee4ce5f18 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -19,8 +19,6 @@ import ( "sort" "time" - "code.google.com/p/goprotobuf/proto" - dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/coding" @@ -341,18 +339,12 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e wm, ok := t.wmCache.Get(f) if !ok { - rowKey := coding.NewPBEncoder(f.ToDTO()) - raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey) + value := &dto.MetricHighWatermark{} + present, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value) if err != nil { return false, err } - if raw != nil { - value := &dto.MetricHighWatermark{} - err = proto.Unmarshal(raw, value) - if err != nil { - return false, err - } - + if present { wmTime := time.Unix(*value.Timestamp, 0).UTC() t.wmCache.Set(f, &Watermarks{High: wmTime}) return wmTime.Before(i), nil diff --git a/storage/raw/index/interface.go b/storage/raw/index/interface.go index 11967fa00d..5a87fb264f 100644 --- a/storage/raw/index/interface.go +++ b/storage/raw/index/interface.go @@ -13,13 +13,11 @@ package index -import ( - "github.com/prometheus/prometheus/coding" -) +import "code.google.com/p/goprotobuf/proto" type MembershipIndex interface { - Has(key coding.Encoder) (bool, error) - Put(key coding.Encoder) error - Drop(key coding.Encoder) error + Has(key proto.Message) (bool, error) + Put(key proto.Message) error + Drop(key proto.Message) error Close() } diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index 99590eee69..7eb55bae27 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -14,20 +14,15 @@ package leveldb import ( - "github.com/prometheus/prometheus/coding" + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" ) -type indexValue struct{} - -func (i *indexValue) MustEncode() []byte { - return []byte{} -} - -var ( - existenceValue = &indexValue{} -) +var existenceValue = &dto.MembershipIndexValue{} type LevelDBMembershipIndex struct { persistence *leveldb.LevelDBPersistence @@ -37,16 +32,16 @@ func (l *LevelDBMembershipIndex) Close() { l.persistence.Close() } -func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) { - return l.persistence.Has(key) +func (l *LevelDBMembershipIndex) Has(k proto.Message) (bool, error) { + return l.persistence.Has(k) } -func (l *LevelDBMembershipIndex) Drop(key coding.Encoder) error { - return l.persistence.Drop(key) +func (l *LevelDBMembershipIndex) Drop(k proto.Message) error { + return l.persistence.Drop(k) } -func (l *LevelDBMembershipIndex) Put(key coding.Encoder) error { - return l.persistence.Put(key, existenceValue) +func (l *LevelDBMembershipIndex) Put(k proto.Message) error { + return l.persistence.Put(k, existenceValue) } func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) { diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 42cb049bbc..c369724b88 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -14,7 +14,8 @@ package raw import ( - "github.com/prometheus/prometheus/coding" + "code.google.com/p/goprotobuf/proto" + "github.com/prometheus/prometheus/storage" ) @@ -25,14 +26,14 @@ type Persistence interface { // persistence. Close() // Has informs the user whether a given key exists in the database. - Has(key coding.Encoder) (bool, error) + Has(key proto.Message) (bool, error) // Get retrieves the key from the database if it exists or returns nil if // it is absent. - Get(key coding.Encoder) ([]byte, error) + Get(key, value proto.Message) (present bool, err error) // Drop removes the key from the database. - Drop(key coding.Encoder) error + Drop(key proto.Message) error // Put sets the key to a given value. - Put(key, value coding.Encoder) error + Put(key, value proto.Message) error // ForEach is responsible for iterating through all records in the database // until one of the following conditions are met: // @@ -41,7 +42,7 @@ type Persistence interface { // 3.) A FilterResult of STOP is emitted by the Filter. // // Decoding errors for an entity cause that entity to be skipped. - ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) + ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error) // Commit applies the Batch operations to the database. Commit(Batch) error } @@ -54,7 +55,7 @@ type Batch interface { // batch mutation. Close() // Put follows the same protocol as Persistence.Put. - Put(key, value coding.Encoder) + Put(key, value proto.Message) // Drop follows the same protocol as Persistence.Drop. - Drop(key coding.Encoder) + Drop(key proto.Message) } diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index 6b82af2f0f..ed5449788e 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -15,7 +15,10 @@ package leveldb import ( "fmt" + + "code.google.com/p/goprotobuf/proto" "github.com/jmhodges/levigo" + "github.com/prometheus/prometheus/coding" ) @@ -31,25 +34,23 @@ func NewBatch() *batch { } } -func (b *batch) Drop(key coding.Encoder) { - keyEncoded := key.MustEncode() - b.drops++ +func (b *batch) Drop(key proto.Message) { + b.batch.Delete(coding.NewPBEncoder(key).MustEncode()) - b.batch.Delete(keyEncoded) + b.drops++ } -func (b *batch) Put(key, value coding.Encoder) { - keyEncoded := key.MustEncode() - valueEncoded := value.MustEncode() +func (b *batch) Put(key, value proto.Message) { + b.batch.Put(coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode()) + b.puts++ - b.batch.Put(keyEncoded, valueEncoded) } -func (b batch) Close() { +func (b *batch) Close() { b.batch.Close() } -func (b batch) String() string { +func (b *batch) String() string { return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops) } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index c37e90890b..d68bca53fa 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -16,11 +16,14 @@ package leveldb import ( "flag" "fmt" + "time" + + "code.google.com/p/goprotobuf/proto" "github.com/jmhodges/levigo" + "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" - "time" ) var ( @@ -250,38 +253,37 @@ func (l *LevelDBPersistence) Close() { return } -func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) { - key := value.MustEncode() - - return l.storage.Get(l.readOptions, key) -} - -func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) { - raw, err := l.Get(value) +func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) { + raw, err := l.storage.Get(l.readOptions, coding.NewPBEncoder(k).MustEncode()) if err != nil { - return + return false, err + } + if raw == nil { + return false, nil } - h = raw != nil + if v == nil { + return true, nil + } - return + err = proto.Unmarshal(raw, v) + if err != nil { + return true, err + } + + return true, nil } -func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) { - key := value.MustEncode() - err = l.storage.Delete(l.writeOptions, key) - - return +func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) { + return l.Get(k, nil) } -func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) { - keyEncoded := key.MustEncode() +func (l *LevelDBPersistence) Drop(k proto.Message) error { + return l.storage.Delete(l.writeOptions, coding.NewPBEncoder(k).MustEncode()) +} - valueEncoded := value.MustEncode() - - err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded) - - return +func (l *LevelDBPersistence) Put(key, value proto.Message) error { + return l.storage.Put(l.writeOptions, coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode()) } func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index f93aecb081..cef9e77cf1 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -14,7 +14,8 @@ package test import ( - "github.com/prometheus/prometheus/coding" + "code.google.com/p/goprotobuf/proto" + "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility/test" ) @@ -28,7 +29,7 @@ type ( // Pair models a prospective (key, value) double that will be committed to // a database. Pair interface { - Get() (key, value coding.Encoder) + Get() (key, value proto.Message) } // Pairs models a list of Pair for disk committing. @@ -47,7 +48,7 @@ type ( // data to build. HasNext() (has bool) // Next emits the next (key, value) double for storage. - Next() (key coding.Encoder, value coding.Encoder) + Next() (key, value proto.Message) } preparer struct { @@ -88,7 +89,7 @@ func (f cassetteFactory) HasNext() bool { return f.index < f.count } -func (f *cassetteFactory) Next() (key, value coding.Encoder) { +func (f *cassetteFactory) Next() (key, value proto.Message) { key, value = f.pairs[f.index].Get() f.index++ diff --git a/tools/dumper/main.go b/tools/dumper/main.go index 1951b44c1f..1b9afb325d 100644 --- a/tools/dumper/main.go +++ b/tools/dumper/main.go @@ -19,17 +19,20 @@ package main import ( - "code.google.com/p/goprotobuf/proto" "encoding/csv" "flag" "fmt" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/metric" "log" "os" "strconv" + + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/metric" ) var ( diff --git a/web/web.go b/web/web.go index d5375677a0..4d7bf98862 100644 --- a/web/web.go +++ b/web/web.go @@ -102,7 +102,6 @@ func getEmbeddedTemplate(name string) (*template.Template, error) { return t, nil } - func getTemplate(name string) (t *template.Template, err error) { if *useLocalAssets { t, err = getLocalTemplate(name)