From 84acfed061c30f7558446d136892b9ed48a82d7b Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 14 Mar 2013 16:55:50 -0700 Subject: [PATCH] Extract finding unindexed metrics. --- model/dto.go | 8 +-- storage/metric/instrumentation.go | 3 +- storage/metric/leveldb.go | 83 ++++++++++++++++++++----------- 3 files changed, 59 insertions(+), 35 deletions(-) diff --git a/model/dto.go b/model/dto.go index 07ec7da49b..aa2904f6f5 100644 --- a/model/dto.go +++ b/model/dto.go @@ -47,11 +47,11 @@ func SampleToMetricDTO(s *Sample) *dto.Metric { } } -func MetricToDTO(m *Metric) *dto.Metric { - metricLength := len(*m) +func MetricToDTO(m Metric) *dto.Metric { + metricLength := len(m) labelNames := make([]string, 0, metricLength) - for labelName := range *m { + for labelName := range m { labelNames = append(labelNames, string(labelName)) } @@ -61,7 +61,7 @@ func MetricToDTO(m *Metric) *dto.Metric { for _, labelName := range labelNames { l := LabelName(labelName) - labelValue := (*m)[l] + labelValue := m[l] labelPair := &dto.LabelPair{ Name: proto.String(string(labelName)), Value: proto.String(string(labelValue)), diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 8e4fdc5cf7..701354e5d5 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -31,6 +31,7 @@ const ( appendLabelPairFingerprint = "append_label_pair_fingerprint" appendSample = "append_sample" appendSamples = "append_samples" + findUnindexedMetrics = "find_unindexed_metrics" flushMemory = "flush_memory" getBoundaryValues = "get_boundary_values" getFingerprintsForLabelName = "get_fingerprints_for_label_name" @@ -43,7 +44,7 @@ const ( hasLabelName = "has_label_name" hasLabelPair = "has_label_pair" indexMetric = "index_metric" - indexSamples = "index_samples" + indexMetrics = "index_metrics" rebuildDiskFrontier = "rebuild_disk_frontier" renderView = "render_view" setLabelNameFingerprints = "set_label_name_fingerprints" diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 726650e073..5772f636d7 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -239,38 +239,58 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl return fingerprintToSamples } -// indexSamples takes groups of samples, determines which ones contain metrics -// that are unknown to the storage stack, and then proceeds to update all -// affected indices. -func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]model.Samples) (err error) { +// findUnindexedMetrics scours the metric membership index for each given Metric +// in the keyspace and returns a map of Fingerprint-Metric pairs that are +// absent. +func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fingerprint]model.Metric) (unindexed map[model.Fingerprint]model.Metric, err error) { begin := time.Now() defer func() { duration := time.Since(begin) - recordOutcome(duration, err, map[string]string{operation: indexSamples, result: success}, map[string]string{operation: indexSamples, result: failure}) + recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure}) }() - var ( - absentFingerprints = map[model.Fingerprint]model.Samples{} - ) + unindexed = make(map[model.Fingerprint]model.Metric) // Determine which metrics are unknown in the database. - - for fingerprint, samples := range groups { - sample := samples[0] - metricDTO := model.SampleToMetricDTO(&sample) - indexHas, err := l.hasIndexMetric(metricDTO) + for fingerprint, metric := range candidates { + var ( + dto = model.MetricToDTO(metric) + indexHas, err = l.hasIndexMetric(dto) + ) if err != nil { panic(err) } if !indexHas { - absentFingerprints[fingerprint] = samples + unindexed[fingerprint] = metric } } + return +} + +// indexMetrics takes groups of samples, determines which ones contain metrics +// that are unknown to the storage stack, and then proceeds to update all +// affected indices. +func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerprint]model.Metric) (err error) { + begin := time.Now() + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure}) + }() + + var ( + absentMetrics map[model.Fingerprint]model.Metric + ) + + absentMetrics, err = l.findUnindexedMetrics(fingerprints) + if err != nil { + panic(err) + } + // TODO: For the missing fingerprints, determine what label names and pairs // are absent and act accordingly and append fingerprints. - var ( doneBuildingLabelNameIndex = make(chan interface{}) doneBuildingLabelPairIndex = make(chan interface{}) @@ -280,8 +300,7 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod go func() { labelNameFingerprints := map[model.LabelName]utility.Set{} - for fingerprint, samples := range absentFingerprints { - metric := samples[0].Metric + for fingerprint, metric := range absentMetrics { for labelName := range metric { fingerprintSet, ok := labelNameFingerprints[labelName] if !ok { @@ -340,8 +359,7 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod go func() { labelPairFingerprints := map[model.LabelPair]utility.Set{} - for fingerprint, samples := range absentFingerprints { - metric := samples[0].Metric + for fingerprint, metric := range absentMetrics { for labelName, labelValue := range metric { labelPair := model.LabelPair{ Name: labelName, @@ -420,16 +438,14 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod // Update the Metric existence index. - if len(absentFingerprints) > 0 { + if len(absentMetrics) > 0 { batch := leveldb.NewBatch() defer batch.Close() - for fingerprint, samples := range absentFingerprints { - for _, sample := range samples { - key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) - value := coding.NewProtocolBufferEncoder(model.SampleToMetricDTO(&sample)) - batch.Put(key, value) - } + for fingerprint, metric := range absentMetrics { + key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) + value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + batch.Put(key, value) } err = l.fingerprintToMetrics.Commit(batch) @@ -445,9 +461,8 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod defer batch.Close() // WART: We should probably encode simple fingerprints. - for _, samples := range absentFingerprints { - sample := samples[0] - key := coding.NewProtocolBufferEncoder(model.SampleToMetricDTO(&sample)) + for _, metric := range absentMetrics { + key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) batch.Put(key, key) } @@ -477,7 +492,15 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err ) go func(groups map[model.Fingerprint]model.Samples) { - indexErrChan <- l.indexSamples(groups) + var ( + metrics = map[model.Fingerprint]model.Metric{} + ) + + for fingerprint, samples := range groups { + metrics[fingerprint] = samples[0].Metric + } + + indexErrChan <- l.indexMetrics(metrics) }(fingerprintToSamples) go func() {