From e4ec263bcc11493953c75d1b2e7bc78fd0463e05 Mon Sep 17 00:00:00 2001 From: Julien Levesy Date: Fri, 1 Dec 2023 23:26:38 +0100 Subject: [PATCH] fix(wlog/watcher): read segment synchronously when not tailing (#13224) Signed-off-by: Julien Levesy Signed-off-by: Callum Styan Co-authored-by: Callum Styan --- CHANGELOG.md | 1 + tsdb/wlog/watcher.go | 28 +++++++++---------- tsdb/wlog/watcher_test.go | 58 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c541dd7050..71b8c97fe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## unreleased +* [ENHANCEMENT] TSDB: Make the wlog watcher read segments synchronously when not tailing. #13224 * [BUGFIX] Agent: Participate in notify calls. #13223 ## 2.48.0 / 2023-11-16 diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 5689602e74..c9f8a4599f 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -65,7 +65,7 @@ type WriteTo interface { SeriesReset(int) } -// Used to notifier the watcher that data has been written so that it can read. +// Used to notify the watcher that data has been written so that it can read. type WriteNotified interface { Notify() } @@ -398,8 +398,16 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { reader := NewLiveReader(w.logger, w.readerMetrics, segment) - readTicker := time.NewTicker(readTimeout) - defer readTicker.Stop() + size := int64(math.MaxInt64) + if !tail { + var err error + size, err = getSegmentSize(w.walDir, segmentNum) + if err != nil { + return errors.Wrap(err, "getSegmentSize") + } + + return w.readAndHandleError(reader, segmentNum, tail, size) + } checkpointTicker := time.NewTicker(checkpointPeriod) defer checkpointTicker.Stop() @@ -407,18 +415,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { segmentTicker := time.NewTicker(segmentCheckPeriod) defer segmentTicker.Stop() - // If we're replaying the segment we need to know the size of the file to know - // when to return from watch and move on to the next segment. - size := int64(math.MaxInt64) - if !tail { - segmentTicker.Stop() - checkpointTicker.Stop() - var err error - size, err = getSegmentSize(w.walDir, segmentNum) - if err != nil { - return errors.Wrap(err, "getSegmentSize") - } - } + readTicker := time.NewTicker(readTimeout) + defer readTicker.Stop() gcSem := make(chan struct{}, 1) for { diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index bc6a10126e..fc665b57d6 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -630,3 +630,61 @@ func TestCheckpointSeriesReset(t *testing.T) { }) } } + +func TestRun_StartupTime(t *testing.T) { + const pageSize = 32 * 1024 + const segments = 10 + const seriesCount = 20 + const samplesCount = 300 + + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(string(compress), func(t *testing.T) { + dir := t.TempDir() + + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, pageSize, compress) + require.NoError(t, err) + + for i := 0; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + } + require.NoError(t, w.Close()) + + wt := newWriteToMock() + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher.MaxSegment = segments + + watcher.setMetrics() + startTime := time.Now() + + err = watcher.Run() + require.Less(t, time.Since(startTime), readTimeout) + require.NoError(t, err) + }) + } +}