diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 1c76e38877..525515221d 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -262,10 +262,8 @@ func (w *Watcher) loop() { // Run the watcher, which will tail the WAL until the quit channel is closed // or an error case is hit. func (w *Watcher) Run() error { - _, lastSegment, err := w.firstAndLast() - if err != nil { - return fmt.Errorf("wal.Segments: %w", err) - } + var lastSegment int + var err error // We want to ensure this is false across iterations since // Run will be called again if there was a failure to read the WAL. @@ -296,9 +294,17 @@ func (w *Watcher) Run() error { w.currentSegmentMetric.Set(float64(currentSegment)) level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment) + // Reset the value of lastSegment each iteration, this is to avoid having to wait too long for + // between reads if we're reading a segment that is not the most recent segment after startup. + _, lastSegment, err = w.firstAndLast() + if err != nil { + return fmt.Errorf("wal.Segments: %w", err) + } + tail := currentSegment >= lastSegment + // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) { + if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) { return err } diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index b10f8f8f8c..871640a7cf 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -58,29 +58,47 @@ type writeToMock struct { floatHistogramsAppended int seriesLock sync.Mutex seriesSegmentIndexes map[chunks.HeadSeriesRef]int + + // delay reads with a short sleep + delay time.Duration } func (wtm *writeToMock) Append(s []record.RefSample) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.samplesAppended += len(s) return true } func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.exemplarsAppended += len(e) return true } func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.histogramsAppended += len(h) return true } func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.floatHistogramsAppended += len(fh) return true } func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.UpdateSeriesSegment(series, index) } @@ -110,9 +128,10 @@ func (wtm *writeToMock) checkNumSeries() int { return len(wtm.seriesSegmentIndexes) } -func newWriteToMock() *writeToMock { +func newWriteToMock(delay time.Duration) *writeToMock { return &writeToMock{ seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), + delay: delay, } } @@ -209,7 +228,7 @@ func TestTailSamples(t *testing.T) { first, last, err := Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true) watcher.SetStartTime(now) @@ -294,7 +313,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() @@ -383,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) readTimeout = time.Second - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() @@ -454,7 +473,7 @@ func TestReadCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() @@ -523,7 +542,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { require.NoError(t, err) } - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = -1 @@ -596,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) { require.NoError(t, err) readTimeout = time.Second - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = -1 go watcher.Start() @@ -675,7 +694,7 @@ func TestRun_StartupTime(t *testing.T) { } require.NoError(t, w.Close()) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = segments @@ -688,3 +707,93 @@ func TestRun_StartupTime(t *testing.T) { }) } } + +func TestRun_AvoidNotifyWhenBehind(t *testing.T) { + const pageSize = 32 * 1024 + const segments = 10 + const seriesCount = 20 + const samplesCount = 300 + + // This test can take longer than intended to finish in cloud CI. + readTimeout := 10 * time.Second + + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(string(compress), func(t *testing.T) { + dir := t.TempDir() + + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, pageSize, compress) + require.NoError(t, err) + var wg sync.WaitGroup + // add one segment initially to ensure there's a value > 0 for the last segment id + for i := 0; i < 1; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + } + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + } + }() + + wt := newWriteToMock(time.Millisecond) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher.MaxSegment = segments + + watcher.setMetrics() + startTime := time.Now() + err = watcher.Run() + wg.Wait() + require.Less(t, time.Since(startTime), readTimeout) + require.NoError(t, err) + require.NoError(t, w.Close()) + }) + } +}