diff --git a/storage/metric/curator.go b/storage/metric/curator.go index d9920cea8d..33b047cd5f 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -206,10 +206,10 @@ func (w watermarkFilter) shouldStop() bool { return len(w.stop) != 0 } -func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (remark *model.CurationRemark, err error) { +func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (*model.CurationRemark, error) { rawSignature, err := processor.Signature() if err != nil { - return + return nil, err } curationKey := model.CurationKey{ @@ -220,30 +220,17 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge }.ToDTO() curationValue := &dto.CurationValue{} - rawKey := coding.NewPBEncoder(curationKey) - - has, err := states.Has(rawKey) + present, err := states.Get(curationKey, curationValue) if err != nil { - return + return nil, err } - if !has { - return + if !present { + return nil, nil } - rawCurationValue, err := states.Get(rawKey) - if err != nil { - return - } + remark := model.NewCurationRemarkFromDTO(curationValue) - err = proto.Unmarshal(rawCurationValue, curationValue) - if err != nil { - return - } - - baseRemark := model.NewCurationRemarkFromDTO(curationValue) - remark = &baseRemark - - return + return &remark, nil } func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) { @@ -386,7 +373,7 @@ func (w watermarkOperator) refreshCurationRemark(f *model.Fingerprint, finished LastCompletionTimestamp: finished, }.ToDTO() - err = w.curationState.Put(coding.NewPBEncoder(curationKey), coding.NewPBEncoder(curationValue)) + err = w.curationState.Put(curationKey, curationValue) return } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index d4bfc6cb33..511748e251 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -14,25 +14,25 @@ package metric import ( - "code.google.com/p/goprotobuf/proto" "flag" "fmt" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage" - index "github.com/prometheus/prometheus/storage/raw/index/leveldb" - "github.com/prometheus/prometheus/storage/raw/leveldb" - "github.com/prometheus/prometheus/utility" "log" "sort" "sync" "time" + + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + index "github.com/prometheus/prometheus/storage/raw/index/leveldb" + + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw/leveldb" + "github.com/prometheus/prometheus/utility" ) -const ( - sortConcurrency = 2 -) +const sortConcurrency = 2 type LevelDBMetricPersistence struct { CurationRemarks *leveldb.LevelDBPersistence @@ -302,7 +302,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) + batch.Put(key, value) } err = l.labelNameToFingerprints.Commit(batch) @@ -375,7 +375,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) + batch.Put(key, value) } err = l.labelSetToFingerprints.Commit(batch) @@ -401,9 +401,7 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewPBEncoder(fingerprint.ToDTO()) - value := coding.NewPBEncoder(model.MetricToDTO(metric)) - batch.Put(key, value) + batch.Put(fingerprint.ToDTO(), model.MetricToDTO(metric)) } err = l.fingerprintToMetrics.Commit(batch) @@ -414,6 +412,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri return } +var existenceIdentity = &dto.MembershipIndexValue{} + // indexMetrics takes groups of samples, determines which ones contain metrics // that are unknown to the storage stack, and then proceeds to update all // affected indices. @@ -465,10 +465,8 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri batch := leveldb.NewBatch() defer batch.Close() - // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewPBEncoder(model.MetricToDTO(metric)) - batch.Put(key, key) + batch.Put(model.MetricToDTO(metric), existenceIdentity) } err = l.metricMembershipIndex.Commit(batch) @@ -492,27 +490,23 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger mutationCount := 0 for fingerprint, samples := range groups { - keyEncoded := coding.NewPBEncoder(fingerprint.ToDTO()) value := &dto.MetricHighWatermark{} newestSampleTimestamp := samples[len(samples)-1].Timestamp - - raw, err := l.MetricHighWatermarks.Get(keyEncoded) + present, err := l.MetricHighWatermarks.Get(fingerprint.ToDTO(), value) if err != nil { return err } - - if raw != nil { - err = proto.Unmarshal(raw, value) - if err != nil { - return err - } - - if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { - continue - } + if !present { + continue } + + // BUG(matt): Repace this with watermark management. + if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { + continue + } + value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewPBEncoder(value)) + batch.Put(fingerprint.ToDTO(), value) mutationCount++ } @@ -583,7 +577,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) + samplesBatch.Put(key, value) } } @@ -656,8 +650,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }(time.Now()) - dtoKey := coding.NewPBEncoder(dto) - value, err = l.metricMembershipIndex.Has(dtoKey) + value, err = l.metricMembershipIndex.Has(dto) return } @@ -669,8 +662,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }(time.Now()) - dtoKey := coding.NewPBEncoder(dto) - value, err = l.labelSetToFingerprints.Has(dtoKey) + value, err = l.labelSetToFingerprints.Has(dto) return } @@ -682,8 +674,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }(time.Now()) - dtoKey := coding.NewPBEncoder(dto) - value, err = l.labelNameToFingerprints.Has(dtoKey) + value, err = l.labelNameToFingerprints.Has(dto) return } @@ -698,15 +689,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewPBEncoder(labelSetDTO)) + unmarshaled := &dto.FingerprintCollection{} + present, err := l.labelSetToFingerprints.Get(labelSetDTO, unmarshaled) if err != nil { return fps, err } - - unmarshaled := &dto.FingerprintCollection{} - err = proto.Unmarshal(f, unmarshaled) - if err != nil { - return fps, err + if !present { + return nil, nil } set := utility.Set{} @@ -743,16 +732,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }(time.Now()) - raw, err := l.labelNameToFingerprints.Get(coding.NewPBEncoder(model.LabelNameToDTO(&labelName))) - if err != nil { - return - } - unmarshaled := &dto.FingerprintCollection{} - - err = proto.Unmarshal(raw, unmarshaled) + present, err := l.labelNameToFingerprints.Get(model.LabelNameToDTO(&labelName), unmarshaled) if err != nil { - return + return nil, err + } + if !present { + return nil, nil } for _, m := range unmarshaled.Member { @@ -760,7 +746,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L fps = append(fps, fp) } - return + return fps, nil } func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) { @@ -770,15 +756,13 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }(time.Now()) - raw, err := l.fingerprintToMetrics.Get(coding.NewPBEncoder(model.FingerprintToDTO(f))) - if err != nil { - return - } - unmarshaled := &dto.Metric{} - err = proto.Unmarshal(raw, unmarshaled) + present, err := l.fingerprintToMetrics.Get(model.FingerprintToDTO(f), unmarshaled) if err != nil { - return + return nil, err + } + if !present { + return nil, nil } m = model.Metric{} @@ -787,7 +771,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value) } - return + return m, nil } func (l LevelDBMetricPersistence) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values { diff --git a/storage/metric/processor.go b/storage/metric/processor.go index d913c69b17..1133b1ec31 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -14,14 +14,16 @@ package metric import ( - "code.google.com/p/goprotobuf/proto" "fmt" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/model" + "time" + + "code.google.com/p/goprotobuf/proto" + dto "github.com/prometheus/prometheus/model/generated" + + "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" - "time" ) // processor models a post-processing agent that performs work given a sample @@ -153,8 +155,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize: if !keyDropped { - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) keyDropped = true } pendingSamples = append(pendingSamples, unactedSamples...) @@ -165,15 +166,12 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi // If the number of pending writes equals the target group size case len(pendingSamples) == p.MinimumGroupSize: newSampleKey := pendingSamples.ToSampleKey(fingerprint) - key := coding.NewPBEncoder(newSampleKey.ToDTO()) - value := coding.NewPBEncoder(pendingSamples.ToDTO()) - pendingBatch.Put(key, value) + pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO()) pendingMutations++ lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) if len(unactedSamples) > 0 { if !keyDropped { - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) keyDropped = true } @@ -190,8 +188,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize: if !keyDropped { - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) keyDropped = true } remainder := p.MinimumGroupSize - len(pendingSamples) @@ -211,9 +208,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi if len(unactedSamples) > 0 || len(pendingSamples) > 0 { pendingSamples = append(pendingSamples, unactedSamples...) newSampleKey := pendingSamples.ToSampleKey(fingerprint) - key := coding.NewPBEncoder(newSampleKey.ToDTO()) - value := coding.NewPBEncoder(pendingSamples.ToDTO()) - pendingBatch.Put(key, value) + pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO()) pendingSamples = model.Values{} pendingMutations++ lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) @@ -320,24 +315,20 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist pendingBatch = nil case !sampleKey.MayContain(stopAt): - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) lastCurated = sampleKey.LastTimestamp sampleValues = model.Values{} pendingMutations++ case sampleKey.MayContain(stopAt): - key := coding.NewPBEncoder(sampleKey.ToDTO()) - pendingBatch.Drop(key) + pendingBatch.Drop(sampleKey.ToDTO()) pendingMutations++ sampleValues = sampleValues.TruncateBefore(stopAt) if len(sampleValues) > 0 { sampleKey = sampleValues.ToSampleKey(fingerprint) lastCurated = sampleKey.FirstTimestamp - newKey := coding.NewPBEncoder(sampleKey.ToDTO()) - newValue := coding.NewPBEncoder(sampleValues.ToDTO()) - pendingBatch.Put(newKey, newValue) + pendingBatch.Put(sampleKey.ToDTO(), sampleValues.ToDTO()) pendingMutations++ } else { lastCurated = sampleKey.LastTimestamp diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 2eeb12250e..d85ff7d2d6 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -14,15 +14,17 @@ package metric import ( - "code.google.com/p/goprotobuf/proto" "fmt" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage/raw/leveldb" - fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" "testing" "time" + + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" + + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage/raw/leveldb" ) type curationState struct { @@ -56,40 +58,40 @@ type out struct { sampleGroups []sampleGroup } -func (c curationState) Get() (key, value coding.Encoder) { +func (c curationState) Get() (key, value proto.Message) { signature, err := c.processor.Signature() if err != nil { panic(err) } - key = coding.NewPBEncoder(model.CurationKey{ + key = model.CurationKey{ Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint), ProcessorMessageRaw: signature, ProcessorMessageTypeName: c.processor.Name(), IgnoreYoungerThan: c.ignoreYoungerThan, - }.ToDTO()) + }.ToDTO() - value = coding.NewPBEncoder(model.CurationRemark{ + value = model.CurationRemark{ LastCompletionTimestamp: c.lastCurated, - }.ToDTO()) + }.ToDTO() return } -func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewPBEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewPBEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) +func (w watermarkState) Get() (key, value proto.Message) { + key = model.NewFingerprintFromRowKey(w.fingerprint).ToDTO() + value = model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO() return } -func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewPBEncoder(model.SampleKey{ +func (s sampleGroup) Get() (key, value proto.Message) { + key = model.SampleKey{ Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint), FirstTimestamp: s.values[0].Timestamp, LastTimestamp: s.values[len(s.values)-1].Timestamp, SampleCount: uint32(len(s.values)), - }.ToDTO()) + }.ToDTO() - value = coding.NewPBEncoder(s.values.ToDTO()) + value = s.values.ToDTO() return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 1d76e63fa8..bee4ce5f18 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -19,8 +19,6 @@ import ( "sort" "time" - "code.google.com/p/goprotobuf/proto" - dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/coding" @@ -341,18 +339,12 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e wm, ok := t.wmCache.Get(f) if !ok { - rowKey := coding.NewPBEncoder(f.ToDTO()) - raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey) + value := &dto.MetricHighWatermark{} + present, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value) if err != nil { return false, err } - if raw != nil { - value := &dto.MetricHighWatermark{} - err = proto.Unmarshal(raw, value) - if err != nil { - return false, err - } - + if present { wmTime := time.Unix(*value.Timestamp, 0).UTC() t.wmCache.Set(f, &Watermarks{High: wmTime}) return wmTime.Before(i), nil diff --git a/storage/raw/index/interface.go b/storage/raw/index/interface.go index 11967fa00d..5a87fb264f 100644 --- a/storage/raw/index/interface.go +++ b/storage/raw/index/interface.go @@ -13,13 +13,11 @@ package index -import ( - "github.com/prometheus/prometheus/coding" -) +import "code.google.com/p/goprotobuf/proto" type MembershipIndex interface { - Has(key coding.Encoder) (bool, error) - Put(key coding.Encoder) error - Drop(key coding.Encoder) error + Has(key proto.Message) (bool, error) + Put(key proto.Message) error + Drop(key proto.Message) error Close() } diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index 99590eee69..7eb55bae27 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -14,20 +14,15 @@ package leveldb import ( - "github.com/prometheus/prometheus/coding" + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" ) -type indexValue struct{} - -func (i *indexValue) MustEncode() []byte { - return []byte{} -} - -var ( - existenceValue = &indexValue{} -) +var existenceValue = &dto.MembershipIndexValue{} type LevelDBMembershipIndex struct { persistence *leveldb.LevelDBPersistence @@ -37,16 +32,16 @@ func (l *LevelDBMembershipIndex) Close() { l.persistence.Close() } -func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) { - return l.persistence.Has(key) +func (l *LevelDBMembershipIndex) Has(k proto.Message) (bool, error) { + return l.persistence.Has(k) } -func (l *LevelDBMembershipIndex) Drop(key coding.Encoder) error { - return l.persistence.Drop(key) +func (l *LevelDBMembershipIndex) Drop(k proto.Message) error { + return l.persistence.Drop(k) } -func (l *LevelDBMembershipIndex) Put(key coding.Encoder) error { - return l.persistence.Put(key, existenceValue) +func (l *LevelDBMembershipIndex) Put(k proto.Message) error { + return l.persistence.Put(k, existenceValue) } func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) { diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 42cb049bbc..c369724b88 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -14,7 +14,8 @@ package raw import ( - "github.com/prometheus/prometheus/coding" + "code.google.com/p/goprotobuf/proto" + "github.com/prometheus/prometheus/storage" ) @@ -25,14 +26,14 @@ type Persistence interface { // persistence. Close() // Has informs the user whether a given key exists in the database. - Has(key coding.Encoder) (bool, error) + Has(key proto.Message) (bool, error) // Get retrieves the key from the database if it exists or returns nil if // it is absent. - Get(key coding.Encoder) ([]byte, error) + Get(key, value proto.Message) (present bool, err error) // Drop removes the key from the database. - Drop(key coding.Encoder) error + Drop(key proto.Message) error // Put sets the key to a given value. - Put(key, value coding.Encoder) error + Put(key, value proto.Message) error // ForEach is responsible for iterating through all records in the database // until one of the following conditions are met: // @@ -41,7 +42,7 @@ type Persistence interface { // 3.) A FilterResult of STOP is emitted by the Filter. // // Decoding errors for an entity cause that entity to be skipped. - ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) + ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error) // Commit applies the Batch operations to the database. Commit(Batch) error } @@ -54,7 +55,7 @@ type Batch interface { // batch mutation. Close() // Put follows the same protocol as Persistence.Put. - Put(key, value coding.Encoder) + Put(key, value proto.Message) // Drop follows the same protocol as Persistence.Drop. - Drop(key coding.Encoder) + Drop(key proto.Message) } diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index 6b82af2f0f..ed5449788e 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -15,7 +15,10 @@ package leveldb import ( "fmt" + + "code.google.com/p/goprotobuf/proto" "github.com/jmhodges/levigo" + "github.com/prometheus/prometheus/coding" ) @@ -31,25 +34,23 @@ func NewBatch() *batch { } } -func (b *batch) Drop(key coding.Encoder) { - keyEncoded := key.MustEncode() - b.drops++ +func (b *batch) Drop(key proto.Message) { + b.batch.Delete(coding.NewPBEncoder(key).MustEncode()) - b.batch.Delete(keyEncoded) + b.drops++ } -func (b *batch) Put(key, value coding.Encoder) { - keyEncoded := key.MustEncode() - valueEncoded := value.MustEncode() +func (b *batch) Put(key, value proto.Message) { + b.batch.Put(coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode()) + b.puts++ - b.batch.Put(keyEncoded, valueEncoded) } -func (b batch) Close() { +func (b *batch) Close() { b.batch.Close() } -func (b batch) String() string { +func (b *batch) String() string { return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops) } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index c37e90890b..d68bca53fa 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -16,11 +16,14 @@ package leveldb import ( "flag" "fmt" + "time" + + "code.google.com/p/goprotobuf/proto" "github.com/jmhodges/levigo" + "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" - "time" ) var ( @@ -250,38 +253,37 @@ func (l *LevelDBPersistence) Close() { return } -func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) { - key := value.MustEncode() - - return l.storage.Get(l.readOptions, key) -} - -func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) { - raw, err := l.Get(value) +func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) { + raw, err := l.storage.Get(l.readOptions, coding.NewPBEncoder(k).MustEncode()) if err != nil { - return + return false, err + } + if raw == nil { + return false, nil } - h = raw != nil + if v == nil { + return true, nil + } - return + err = proto.Unmarshal(raw, v) + if err != nil { + return true, err + } + + return true, nil } -func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) { - key := value.MustEncode() - err = l.storage.Delete(l.writeOptions, key) - - return +func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) { + return l.Get(k, nil) } -func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) { - keyEncoded := key.MustEncode() +func (l *LevelDBPersistence) Drop(k proto.Message) error { + return l.storage.Delete(l.writeOptions, coding.NewPBEncoder(k).MustEncode()) +} - valueEncoded := value.MustEncode() - - err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded) - - return +func (l *LevelDBPersistence) Put(key, value proto.Message) error { + return l.storage.Put(l.writeOptions, coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode()) } func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index f93aecb081..cef9e77cf1 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -14,7 +14,8 @@ package test import ( - "github.com/prometheus/prometheus/coding" + "code.google.com/p/goprotobuf/proto" + "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility/test" ) @@ -28,7 +29,7 @@ type ( // Pair models a prospective (key, value) double that will be committed to // a database. Pair interface { - Get() (key, value coding.Encoder) + Get() (key, value proto.Message) } // Pairs models a list of Pair for disk committing. @@ -47,7 +48,7 @@ type ( // data to build. HasNext() (has bool) // Next emits the next (key, value) double for storage. - Next() (key coding.Encoder, value coding.Encoder) + Next() (key, value proto.Message) } preparer struct { @@ -88,7 +89,7 @@ func (f cassetteFactory) HasNext() bool { return f.index < f.count } -func (f *cassetteFactory) Next() (key, value coding.Encoder) { +func (f *cassetteFactory) Next() (key, value proto.Message) { key, value = f.pairs[f.index].Get() f.index++ diff --git a/tools/dumper/main.go b/tools/dumper/main.go index 1951b44c1f..1b9afb325d 100644 --- a/tools/dumper/main.go +++ b/tools/dumper/main.go @@ -19,17 +19,20 @@ package main import ( - "code.google.com/p/goprotobuf/proto" "encoding/csv" "flag" "fmt" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/metric" "log" "os" "strconv" + + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/prometheus/model/generated" + + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/metric" ) var ( diff --git a/web/web.go b/web/web.go index d5375677a0..4d7bf98862 100644 --- a/web/web.go +++ b/web/web.go @@ -102,7 +102,6 @@ func getEmbeddedTemplate(name string) (*template.Template, error) { return t, nil } - func getTemplate(name string) (t *template.Template, err error) { if *useLocalAssets { t, err = getLocalTemplate(name)