From 4419399e4e831052bc9b8b4f26a8e7cf337091b0 Mon Sep 17 00:00:00 2001 From: Fiona Liao Date: Tue, 12 Sep 2023 20:31:10 +0100 Subject: [PATCH] Do WBL mmap marker replay concurrently (#12801) * Benchmark WBL Extended WAL benchmark test with WBL parts too - added basic cases for OOO handling - a percentage of series have a percentage of samples set as OOO ones. Signed-off-by: Fiona Liao --- tsdb/head_test.go | 116 ++++++++++++++++++++++++++++++++++++++++------ tsdb/head_wal.go | 65 ++++++++++---------------- 2 files changed, 127 insertions(+), 54 deletions(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 62022be50c..da7cbd5498 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -135,7 +135,7 @@ func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) { } } -func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) { +func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}) { var enc record.Encoder for _, r := range recs { switch v := r.(type) { @@ -147,6 +147,8 @@ func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) { require.NoError(t, w.Log(enc.Tombstones(v, nil))) case []record.RefExemplar: require.NoError(t, w.Log(enc.Exemplars(v, nil))) + case []record.RefMmapMarker: + require.NoError(t, w.Log(enc.MmapMarkers(v, nil))) } } } @@ -197,13 +199,18 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { return recs } -func BenchmarkLoadWAL(b *testing.B) { +func BenchmarkLoadWLs(b *testing.B) { cases := []struct { // Total series is (batches*seriesPerBatch). batches int seriesPerBatch int samplesPerSeries int mmappedChunkT int64 + // The first oooSeriesPct*seriesPerBatch series in a batch are selected as "OOO" series. + oooSeriesPct float64 + // The first oooSamplesPct*samplesPerSeries samples in an OOO series are written as OOO samples. + oooSamplesPct float64 + oooCapMax int64 }{ { // Less series and more samples. 2 hour WAL with 1 second scrape interval. batches: 10, @@ -226,6 +233,31 @@ func BenchmarkLoadWAL(b *testing.B) { samplesPerSeries: 480, mmappedChunkT: 3800, }, + { // A lot of OOO samples (50% series with 50% of samples being OOO). + batches: 10, + seriesPerBatch: 1000, + samplesPerSeries: 480, + oooSeriesPct: 0.5, + oooSamplesPct: 0.5, + oooCapMax: DefaultOutOfOrderCapMax, + }, + { // Fewer OOO samples (10% of series with 10% of samples being OOO). + batches: 10, + seriesPerBatch: 1000, + samplesPerSeries: 480, + oooSeriesPct: 0.1, + oooSamplesPct: 0.1, + }, + { // 2 hour WAL with 15 second scrape interval, and mmapped chunks up to last 100 samples. + // Four mmap markers per OOO series: 480 * 0.3 = 144, 144 / 32 (DefaultOutOfOrderCapMax) = 4. + batches: 100, + seriesPerBatch: 1000, + samplesPerSeries: 480, + mmappedChunkT: 3800, + oooSeriesPct: 0.2, + oooSamplesPct: 0.3, + oooCapMax: DefaultOutOfOrderCapMax, + }, } labelsPerSeries := 5 @@ -241,12 +273,17 @@ func BenchmarkLoadWAL(b *testing.B) { continue } lastExemplarsPerSeries = exemplarsPerSeries - b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT), + b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax), func(b *testing.B) { dir := b.TempDir() - w, err := wlog.New(nil, nil, dir, wlog.CompressionNone) + wal, err := wlog.New(nil, nil, dir, wlog.CompressionNone) require.NoError(b, err) + var wbl *wlog.WL + if c.oooSeriesPct != 0 { + wbl, err = wlog.New(nil, nil, dir, wlog.CompressionNone) + require.NoError(b, err) + } // Write series. refSeries := make([]record.RefSeries, 0, c.seriesPerBatch) @@ -260,22 +297,33 @@ func BenchmarkLoadWAL(b *testing.B) { } refSeries = append(refSeries, record.RefSeries{Ref: chunks.HeadSeriesRef(i) * 101, Labels: labels.FromMap(lbls)}) } - populateTestWAL(b, w, []interface{}{refSeries}) + populateTestWL(b, wal, []interface{}{refSeries}) } // Write samples. refSamples := make([]record.RefSample, 0, c.seriesPerBatch) + + oooSeriesPerBatch := int(float64(c.seriesPerBatch) * c.oooSeriesPct) + oooSamplesPerSeries := int(float64(c.samplesPerSeries) * c.oooSamplesPct) + for i := 0; i < c.samplesPerSeries; i++ { for j := 0; j < c.batches; j++ { refSamples = refSamples[:0] - for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { + + k := j * c.seriesPerBatch + // Skip appending the first oooSamplesPerSeries samples for the series in the batch that + // should have OOO samples. OOO samples are appended after all the in-order samples. + if i < oooSamplesPerSeries { + k += oooSeriesPerBatch + } + for ; k < (j+1)*c.seriesPerBatch; k++ { refSamples = append(refSamples, record.RefSample{ Ref: chunks.HeadSeriesRef(k) * 101, T: int64(i) * 10, V: float64(i) * 100, }) } - populateTestWAL(b, w, []interface{}{refSamples}) + populateTestWL(b, wal, []interface{}{refSamples}) } } @@ -292,6 +340,10 @@ func BenchmarkLoadWAL(b *testing.B) { // Create one mmapped chunk per series, with one sample at the given time. s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, cOpts) + // There's only one head chunk because only a single sample is appended. mmapChunks() + // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with + // the sample at c.mmappedChunkT is mmapped. + s.cutNewHeadChunk(c.mmappedChunkT, chunkenc.EncXOR, c.mmappedChunkT) s.mmapChunks(chunkDiskMapper) } require.NoError(b, chunkDiskMapper.Close()) @@ -310,7 +362,39 @@ func BenchmarkLoadWAL(b *testing.B) { Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), }) } - populateTestWAL(b, w, []interface{}{refExemplars}) + populateTestWL(b, wal, []interface{}{refExemplars}) + } + } + + // Write OOO samples and mmap markers. + refMarkers := make([]record.RefMmapMarker, 0, oooSeriesPerBatch) + refSamples = make([]record.RefSample, 0, oooSeriesPerBatch) + for i := 0; i < oooSamplesPerSeries; i++ { + shouldAddMarkers := c.oooCapMax != 0 && i != 0 && int64(i)%c.oooCapMax == 0 + + for j := 0; j < c.batches; j++ { + refSamples = refSamples[:0] + if shouldAddMarkers { + refMarkers = refMarkers[:0] + } + for k := j * c.seriesPerBatch; k < (j*c.seriesPerBatch)+oooSeriesPerBatch; k++ { + ref := chunks.HeadSeriesRef(k) * 101 + if shouldAddMarkers { + // loadWBL() checks that the marker's MmapRef is less than or equal to the ref + // for the last mmap chunk. Setting MmapRef to 0 to always pass that check. + refMarkers = append(refMarkers, record.RefMmapMarker{Ref: ref, MmapRef: 0}) + } + refSamples = append(refSamples, record.RefSample{ + Ref: ref, + T: int64(i) * 10, + V: float64(i) * 100, + }) + } + if shouldAddMarkers { + populateTestWL(b, wbl, []interface{}{refMarkers}) + } + populateTestWL(b, wal, []interface{}{refSamples}) + populateTestWL(b, wbl, []interface{}{refSamples}) } } @@ -320,13 +404,19 @@ func BenchmarkLoadWAL(b *testing.B) { for i := 0; i < b.N; i++ { opts := DefaultHeadOptions() opts.ChunkRange = 1000 - opts.ChunkDirRoot = w.Dir() - h, err := NewHead(nil, nil, w, nil, opts, nil) + opts.ChunkDirRoot = dir + if c.oooCapMax > 0 { + opts.OutOfOrderCapMax.Store(c.oooCapMax) + } + h, err := NewHead(nil, nil, wal, wbl, opts, nil) require.NoError(b, err) h.Init(0) } b.StopTimer() - w.Close() + wal.Close() + if wbl != nil { + wbl.Close() + } }) } } @@ -563,7 +653,7 @@ func TestHead_ReadWAL(t *testing.T) { require.NoError(t, head.Close()) }() - populateTestWAL(t, w, entries) + populateTestWL(t, w, entries) require.NoError(t, head.Init(math.MinInt64)) require.Equal(t, uint64(101), head.lastSeriesID.Load()) @@ -1037,7 +1127,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { require.NoError(t, head.Close()) }() - populateTestWAL(t, w, entries) + populateTestWL(t, w, entries) require.NoError(t, head.Init(math.MinInt64)) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 56e9884d4a..804060ad55 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -762,7 +762,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. } for i := 0; i < concurrency; i++ { if len(shards[i]) > 0 { - processors[i].input <- shards[i] + processors[i].input <- wblSubsetProcessorInputItem{samples: shards[i]} shards[i] = nil } } @@ -790,23 +790,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. continue } idx := uint64(ms.ref) % uint64(concurrency) - // It is possible that some old sample is being processed in processWALSamples that - // could cause race below. So we wait for the goroutine to empty input the buffer and finish - // processing all old samples after emptying the buffer. - processors[idx].waitUntilIdle() - // Lock the subset so we can modify the series object - processors[idx].mx.Lock() - - // All samples till now have been m-mapped. Hence clear out the headChunk. - // In case some samples slipped through and went into m-map chunks because of changed - // chunk size parameters, we are not taking care of that here. - // TODO(codesome): see if there is a way to avoid duplicate m-map chunks if - // the size of ooo chunk was reduced between restart. - if ms.ooo != nil { - ms.ooo.oooHeadChunk = nil - } - - processors[idx].mx.Unlock() + processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms} } default: panic(fmt.Errorf("unexpected decodedCh type: %T", d)) @@ -858,14 +842,18 @@ func isErrLoadOOOWal(err error) bool { } type wblSubsetProcessor struct { - mx sync.Mutex // Take this lock while modifying series in the subset. - input chan []record.RefSample + input chan wblSubsetProcessorInputItem output chan []record.RefSample } +type wblSubsetProcessorInputItem struct { + mmappedSeries *memSeries + samples []record.RefSample +} + func (wp *wblSubsetProcessor) setup() { wp.output = make(chan []record.RefSample, 300) - wp.input = make(chan []record.RefSample, 300) + wp.input = make(chan wblSubsetProcessorInputItem, 300) } func (wp *wblSubsetProcessor) closeAndDrain() { @@ -892,9 +880,17 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { oooCapMax := h.opts.OutOfOrderCapMax.Load() // We don't check for minValidTime for ooo samples. mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) - for samples := range wp.input { - wp.mx.Lock() - for _, s := range samples { + for in := range wp.input { + if in.mmappedSeries != nil && in.mmappedSeries.ooo != nil { + // All samples till now have been m-mapped. Hence clear out the headChunk. + // In case some samples slipped through and went into m-map chunks because of changed + // chunk size parameters, we are not taking care of that here. + // TODO(codesome): see if there is a way to avoid duplicate m-map chunks if + // the size of ooo chunk was reduced between restart. + in.mmappedSeries.ooo.oooHeadChunk = nil + continue + } + for _, s := range in.samples { ms := h.series.getByID(s.Ref) if ms == nil { unknownRefs++ @@ -914,8 +910,10 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { } } } - wp.mx.Unlock() - + select { + case wp.output <- in.samples: + default: + } } h.updateMinOOOMaxOOOTime(mint, maxt) @@ -923,21 +921,6 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { return unknownRefs } -func (wp *wblSubsetProcessor) waitUntilIdle() { - select { - case <-wp.output: // Allow output side to drain to avoid deadlock. - default: - } - wp.input <- []record.RefSample{} - for len(wp.input) != 0 { - time.Sleep(10 * time.Microsecond) - select { - case <-wp.output: // Allow output side to drain to avoid deadlock. - default: - } - } -} - const ( chunkSnapshotRecordTypeSeries uint8 = 1 chunkSnapshotRecordTypeTombstones uint8 = 2