From c1b547a90e7f36cd937479311cedc90bceecec12 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 17 Jan 2017 00:59:38 +0000 Subject: [PATCH] Only checkpoint chunkdescs and series that need persisting. (#2340) This decreases checkpoint size by not checkpointing things that don't actually need checkpointing. This is fully compatible with the v2 checkpoint format, as it makes series appear as though the only chunksdescs in memory are those that need persisting. --- storage/local/persistence.go | 44 +++++++++++++------------------ storage/local/persistence_test.go | 41 ++++++---------------------- 2 files changed, 26 insertions(+), 59 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index ef173f2119..4d27334969 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -668,8 +668,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap fpLocker.Lock(m.fp) defer fpLocker.Unlock(m.fp) - if len(m.series.chunkDescs) == 0 { - // This series was completely purged or archived in the meantime. Ignore. + chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark + if len(m.series.chunkDescs) == 0 || chunksToPersist == 0 { + // This series was completely purged or archived in the meantime or has + // no chunks that need persisting. Ignore. return } realNumberOfSeries++ @@ -688,7 +690,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = w.Write(buf); err != nil { return } - if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil { + // persistWatermark. We only checkpoint chunks that need persisting, so + // this is always 0. + if _, err = codable.EncodeVarint(w, int64(0)); err != nil { return } if m.series.modTime.IsZero() { @@ -700,37 +704,25 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } } - if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil { + // chunkDescsOffset. + if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil { return } if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil { return } - if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil { + // Number of chunkDescs. + if _, err = codable.EncodeVarint(w, int64(chunksToPersist)); err != nil { return } - for i, chunkDesc := range m.series.chunkDescs { - if i < m.series.persistWatermark { - if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil { - return - } - lt, err := chunkDesc.LastTime() - if err != nil { - return - } - if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { - return - } - } else { - // This is a non-persisted chunk. Fully marshal it. - if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { - return - } - if err = chunkDesc.C.Marshal(w); err != nil { - return - } + for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] { + if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { + return } - p.checkpointChunksWritten.Observe(float64(len(m.series.chunkDescs) - m.series.persistWatermark)) + if err = chunkDesc.C.Marshal(w); err != nil { + return + } + p.checkpointChunksWritten.Observe(float64(chunksToPersist)) } // Series is checkpointed now, so declare it clean. In case the entire // checkpoint fails later on, this is fine, as the storage's series diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 2b37857668..27f620366b 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -493,8 +493,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin if err != nil { t.Fatal(err) } - if loadedSM.length() != 4 { - t.Errorf("want 4 series in map, got %d", loadedSM.length()) + if loadedSM.length() != 3 { + t.Errorf("want 3 series in map, got %d", loadedSM.length()) } if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS1.metric, m1) { @@ -518,28 +518,6 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin } else { t.Errorf("couldn't find %v in loaded map", m1) } - if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok { - if !reflect.DeepEqual(loadedS3.metric, m3) { - t.Errorf("want metric %v, got %v", m3, loadedS3.metric) - } - if loadedS3.head().C != nil { - t.Error("head chunk not evicted") - } - if loadedS3.chunkDescsOffset != 0 { - t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset) - } - if !loadedS3.headChunkClosed { - t.Error("headChunkClosed is false") - } - if loadedS3.head().ChunkFirstTime != 2 { - t.Errorf("want ChunkFirstTime in head chunk to be 2, got %d", loadedS3.head().ChunkFirstTime) - } - if loadedS3.head().ChunkLastTime != 2 { - t.Errorf("want ChunkLastTime in head chunk to be 2, got %d", loadedS3.head().ChunkLastTime) - } - } else { - t.Errorf("couldn't find %v in loaded map", m3) - } if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS4.metric, m4) { t.Errorf("want metric %v, got %v", m4, loadedS4.metric) @@ -594,20 +572,17 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin if !reflect.DeepEqual(loadedS5.metric, m5) { t.Errorf("want metric %v, got %v", m5, loadedS5.metric) } - if got, want := len(loadedS5.chunkDescs), chunkCountS5; got != want { + if got, want := len(loadedS5.chunkDescs), chunkCountS5-3; got != want { t.Errorf("got %d chunkDescs, want %d", got, want) } - if got, want := loadedS5.persistWatermark, 3; got != want { + if got, want := loadedS5.persistWatermark, 0; got != want { t.Errorf("got persistWatermark %d, want %d", got, want) } - if !loadedS5.chunkDescs[2].IsEvicted() { - t.Error("3rd chunk not evicted") + if loadedS5.chunkDescs[0].IsEvicted() { + t.Error("1st chunk evicted") } - if loadedS5.chunkDescs[3].IsEvicted() { - t.Error("4th chunk evicted") - } - if loadedS5.chunkDescsOffset != 0 { - t.Errorf("want chunkDescsOffset 0, got %d", loadedS5.chunkDescsOffset) + if loadedS5.chunkDescsOffset != 3 { + t.Errorf("want chunkDescsOffset 3, got %d", loadedS5.chunkDescsOffset) } if loadedS5.headChunkClosed { t.Error("headChunkClosed is true")