diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 88ba8f59b1..f1ee14500a 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -16,7 +16,6 @@ package metric import ( "code.google.com/p/goprotobuf/proto" "flag" - "fmt" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" @@ -200,10 +199,6 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) } func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { - c := len(samples) - if c > 1 { - fmt.Printf("Appending %d samples...", c) - } begin := time.Now() defer func() { duration := time.Now().Sub(begin) @@ -244,6 +239,58 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err doneSorting.Wait() + var ( + doneCommitting = sync.WaitGroup{} + ) + + go func() { + doneCommitting.Add(1) + samplesBatch := leveldb.NewBatch() + defer samplesBatch.Close() + defer doneCommitting.Done() + + for fingerprint, group := range fingerprintToSamples { + for { + lengthOfGroup := len(group) + + if lengthOfGroup == 0 { + break + } + + take := maximumChunkSize + if lengthOfGroup < take { + take = lengthOfGroup + } + + chunk := group[0:take] + group = group[take:lengthOfGroup] + + key := &dto.SampleKey{ + Fingerprint: fingerprint.ToDTO(), + Timestamp: indexable.EncodeTime(chunk[0].Timestamp), + LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), + SampleCount: proto.Uint32(uint32(take)), + } + + value := &dto.SampleValueSeries{} + for _, sample := range chunk { + value.Value = append(value.Value, &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(sample.Timestamp.Unix()), + Value: proto.Float32(float32(sample.Value)), + }) + } + + samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + } + } + + err = l.metricSamples.Commit(samplesBatch) + + if err != nil { + panic(err) + } + }() + var ( absentFingerprints = map[model.Fingerprint]model.Samples{} ) @@ -454,48 +501,8 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err } } - samplesBatch := leveldb.NewBatch() - defer samplesBatch.Close() + doneCommitting.Wait() - for fingerprint, group := range fingerprintToSamples { - for { - lengthOfGroup := len(group) - - if lengthOfGroup == 0 { - break - } - - take := maximumChunkSize - if lengthOfGroup < take { - take = lengthOfGroup - } - - chunk := group[0:take] - group = group[take:lengthOfGroup] - - key := &dto.SampleKey{ - Fingerprint: fingerprint.ToDTO(), - Timestamp: indexable.EncodeTime(chunk[0].Timestamp), - LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), - SampleCount: proto.Uint32(uint32(take)), - } - - value := &dto.SampleValueSeries{} - for _, sample := range chunk { - value.Value = append(value.Value, &dto.SampleValueSeries_Value{ - Timestamp: proto.Int64(sample.Timestamp.Unix()), - Value: proto.Float32(float32(sample.Value)), - }) - } - - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) - } - } - - err = l.metricSamples.Commit(samplesBatch) - if err != nil { - panic(err) - } return } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 413680a34d..e04f7e9e21 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -62,7 +62,6 @@ func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilt if s.values.Len() == 0 { return } - iterator := s.values.SeekToLast() defer iterator.Close() @@ -91,7 +90,6 @@ func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilt break } } - if !iterator.Previous() { break } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index b9632bb39e..0f1c394c86 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -202,9 +202,10 @@ func (t *tieredStorage) Flush() { // Write all pending appends. func (t *tieredStorage) flush() (err error) { + // Trim and old values to reduce iterative write costs. + t.flushMemory() t.writeMemory() t.flushMemory() - return } @@ -283,7 +284,11 @@ func (f *memoryToDiskFlusher) Flush() { for i := 0; i < length; i++ { samples = append(samples, <-f.toDiskQueue) } + start := time.Now() f.disk.AppendSamples(samples) + if false { + fmt.Printf("Took %s to append...\n", time.Since(start)) + } } func (f memoryToDiskFlusher) Close() { @@ -382,8 +387,13 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { continue } + if seriesFrontier.lastSupertime.Before(operation.StartsAt()) && !seriesFrontier.lastTime.Before(operation.StartsAt()) { + targetKey.Timestamp = indexable.EncodeTime(seriesFrontier.lastSupertime) + } else { + targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt()) + } + targetKey.Fingerprint = scanJob.fingerprint.ToDTO() - targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt()) rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 10e3e14abf..0c38322a91 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -14,6 +14,7 @@ package metric import ( + "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/utility/test" "io/ioutil" @@ -22,6 +23,24 @@ import ( "time" ) +func sampleIncrement(from, to time.Time, interval time.Duration, m model.Metric) (v []model.Sample) { + var ( + i model.SampleValue = 0 + ) + + for from.Before(to) { + v = append(v, model.Sample{ + Metric: m, + Value: i, + Timestamp: from, + }) + + from = from.Add(interval) + } + + return +} + func testMakeView(t test.Tester) { type in struct { atTime []getValuesAtTimeOp @@ -104,28 +123,132 @@ func testMakeView(t test.Tester) { }, }, }, + { + data: []model.Sample{ + { + Metric: metric, + Value: 0, + Timestamp: instant, + }, + { + Metric: metric, + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Metric: metric, + Value: 2, + Timestamp: instant.Add(time.Second * 2), + }, + }, + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant.Add(time.Second), + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant.Add(time.Second), + Value: 1, + }, + { + Timestamp: instant.Add(time.Second * 2), + Value: 2, + }, + }, + }, + }, + }, + { + data: []model.Sample{ + { + Metric: metric, + Value: 0, + Timestamp: instant, + }, + { + Metric: metric, + Value: 1, + Timestamp: instant.Add(time.Second * 2), + }, + { + Metric: metric, + Value: 2, + Timestamp: instant.Add(time.Second * 4), + }, + }, + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant.Add(time.Second), + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant, + Value: 0, + }, + { + Timestamp: instant.Add(time.Second * 2), + Value: 1, + }, + }, + }, + }, + }, + { + data: []model.Sample{ + { + Metric: metric, + Value: 0, + Timestamp: instant, + }, + { + Metric: metric, + Value: 1, + Timestamp: instant.Add(time.Second * 2), + }, + { + Metric: metric, + Value: 2, + Timestamp: instant.Add(time.Second * 4), + }, + }, + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant.Add(time.Second * 3), + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant.Add(time.Second * 2), + Value: 1, + }, + { + Timestamp: instant.Add(time.Second * 4), + Value: 2, + }, + }, + }, + }, + }, // { - // data: []model.Sample{ - // { - // Metric: metric, - // Value: 0, - // Timestamp: instant, - // }, - // { - // Metric: metric, - // Value: 1, - // Timestamp: instant.Add(time.Second), - // }, - // { - // Metric: metric, - // Value: 2, - // Timestamp: instant.Add(time.Second * 2), - // }, - // }, + // data: sampleIncrement(instant, instant.Add(14*24*time.Hour), time.Second, metric), // in: in{ // atTime: []getValuesAtTimeOp{ // { - // time: instant.Add(time.Second), + // time: instant.Add(time.Second * 3), // }, // }, // }, @@ -133,11 +256,11 @@ func testMakeView(t test.Tester) { // atTime: [][]model.SamplePair{ // { // { - // Timestamp: instant.Add(time.Second), + // Timestamp: instant.Add(time.Second * 2), // Value: 1, // }, // { - // Timestamp: instant.Add(time.Second * 2), + // Timestamp: instant.Add(time.Second * 4), // Value: 2, // }, // }, @@ -150,7 +273,7 @@ func testMakeView(t test.Tester) { for i, scenario := range scenarios { var ( temporary, _ = ioutil.TempDir("", "test_make_view") - tiered = NewTieredStorage(100, 100, 100, time.Second, time.Second, 0*time.Second, temporary) + tiered = NewTieredStorage(5000000, 250, 1000, 5*time.Second, 15*time.Second, 0*time.Second, temporary) ) if tiered == nil { @@ -171,7 +294,9 @@ func testMakeView(t test.Tester) { } } + start := time.Now() tiered.Flush() + fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data)) requestBuilder := NewViewRequestBuilder()