From a73202012b30954afac494cd0626ea2eb7c3860d Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 11 Mar 2026 10:17:13 +0100 Subject: [PATCH] tsdb/wlog[PERF]: optimize WAL watcher reads (up to 540x less B/op; 13000x less allocs/op) (#18250) See the detailed analysis https://docs.google.com/document/d/1efVAMcEw7-R_KatHHcobcFBlNsre-DoThVHI8AO2SDQ/edit?tab=t.0 I ran extensive benchmarks using synthetic data as well as real WAL segments pulled from the prombench runs. All benchmarks are here https://github.com/prometheus/prometheus/compare/bwplotka/wal-reuse?expand=1 * optimization(tsdb/wlog): reuse Ref* buffers across WAL watchers' reads Signed-off-by: bwplotka * optimization(tsdb/wlog): avoid expensive error wraps Signed-off-by: bwplotka * optimization(tsdb/wlog): reuse array for filtering Signed-off-by: bwplotka * fmt Signed-off-by: bwplotka * lint fix Signed-off-by: bwplotka * tsdb/record: add test for clear() on histograms Signed-off-by: bwplotka * updated WriteTo with what's currently expected Signed-off-by: bwplotka --------- Signed-off-by: bwplotka --- storage/remote/queue_manager.go | 3 +- storage/remote/queue_manager_test.go | 6 +- storage/remote/write.go | 5 ++ tsdb/head.go | 13 +-- tsdb/record/buffers.go | 115 +++++++++++++++++++++++++++ tsdb/record/buffers_test.go | 50 ++++++++++++ tsdb/wlog/watcher.go | 98 +++++++++++++++-------- tsdb/wlog/watcher_test.go | 16 ++-- 8 files changed, 253 insertions(+), 53 deletions(-) create mode 100644 tsdb/record/buffers.go create mode 100644 tsdb/record/buffers_test.go diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e0a2b7a43a..e650b0b5fd 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -488,6 +488,7 @@ func NewQueueManager( enableNativeHistogramRemoteWrite bool, enableTypeAndUnitLabels bool, protoMsg remoteapi.WriteMessageType, + recordBuf *record.BuffersPool, ) *QueueManager { if logger == nil { logger = promslog.NewNopLogger() @@ -537,7 +538,7 @@ func NewQueueManager( walMetadata := t.protoMsg != remoteapi.WriteV1MessageType - t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata) + t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata, recordBuf) // The current MetadataWatcher implementation is mutually exclusive // with the new approach, which stores metadata as WAL records and diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index dbabfe6ff3..ed8415d36c 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -302,7 +302,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg remoteapi.WriteMessageType) *QueueManager { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg, record.NewBuffersPool()) return m } @@ -770,7 +770,7 @@ func TestDisableReshardOnRetry(t *testing.T) { } ) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, nil) m.StoreSeries(recs.Series, 0) // Attempt to samples while the manager is running. We immediately stop the @@ -1346,7 +1346,7 @@ func BenchmarkStoreSeries(b *testing.B) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, record.NewBuffersPool()) m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs diff --git a/storage/remote/write.go b/storage/remote/write.go index 6a336dc06b..1e87bf9f9c 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wlog" ) @@ -72,6 +73,8 @@ type WriteStorage struct { scraper ReadyScrapeManager quit chan struct{} + recordBuf *record.BuffersPool + // For timestampTracker. highestTimestamp *maxTimestamp enableTypeAndUnitLabels bool @@ -102,6 +105,7 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet. Deprecated, check prometheus_remote_storage_queue_highest_timestamp_seconds which is more accurate.", }), }, + recordBuf: record.NewBuffersPool(), enableTypeAndUnitLabels: enableTypeAndUnitLabels, } if reg != nil { @@ -215,6 +219,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rwConf.SendNativeHistograms, rws.enableTypeAndUnitLabels, rwConf.ProtobufMessage, + rws.recordBuf, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/tsdb/head.go b/tsdb/head.go index e6d75e109f..33a32cad8a 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -79,12 +79,13 @@ type Head struct { // This should be typecasted to chunks.ChunkDiskMapperRef after loading. minOOOMmapRef atomic.Uint64 - metrics *headMetrics - opts *HeadOptions - wal, wbl *wlog.WL - exemplarMetrics *ExemplarMetrics - exemplars ExemplarStorage - logger *slog.Logger + metrics *headMetrics + opts *HeadOptions + wal, wbl *wlog.WL + exemplarMetrics *ExemplarMetrics + exemplars ExemplarStorage + logger *slog.Logger + // TODO(bwplotka): Consider using record.Pools that's reused with WAL watchers. refSeriesPool zeropool.Pool[[]record.RefSeries] floatsPool zeropool.Pool[[]record.RefSample] exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef] diff --git a/tsdb/record/buffers.go b/tsdb/record/buffers.go new file mode 100644 index 0000000000..699b1a836b --- /dev/null +++ b/tsdb/record/buffers.go @@ -0,0 +1,115 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package record + +import ( + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/util/zeropool" +) + +// BuffersPool offers pool of zero-ed record buffers. +type BuffersPool struct { + series zeropool.Pool[[]RefSeries] + samples zeropool.Pool[[]RefSample] + exemplars zeropool.Pool[[]RefExemplar] + histograms zeropool.Pool[[]RefHistogramSample] + floatHistograms zeropool.Pool[[]RefFloatHistogramSample] + metadata zeropool.Pool[[]RefMetadata] +} + +// NewBuffersPool returns a new BuffersPool object. +func NewBuffersPool() *BuffersPool { + return &BuffersPool{} +} + +func (p *BuffersPool) GetRefSeries(capacity int) []RefSeries { + b := p.series.Get() + if b == nil { + return make([]RefSeries, 0, capacity) + } + return b +} + +func (p *BuffersPool) PutRefSeries(b []RefSeries) { + for i := range b { // Zero out to avoid retaining label data. + b[i].Labels = labels.EmptyLabels() + } + p.series.Put(b[:0]) +} + +func (p *BuffersPool) GetSamples(capacity int) []RefSample { + b := p.samples.Get() + if b == nil { + return make([]RefSample, 0, capacity) + } + return b +} + +func (p *BuffersPool) PutSamples(b []RefSample) { + p.samples.Put(b[:0]) +} + +func (p *BuffersPool) GetExemplars(capacity int) []RefExemplar { + b := p.exemplars.Get() + if b == nil { + return make([]RefExemplar, 0, capacity) + } + return b +} + +func (p *BuffersPool) PutExemplars(b []RefExemplar) { + for i := range b { // Zero out to avoid retaining label data. + b[i].Labels = labels.EmptyLabels() + } + p.exemplars.Put(b[:0]) +} + +func (p *BuffersPool) GetHistograms(capacity int) []RefHistogramSample { + b := p.histograms.Get() + if b == nil { + return make([]RefHistogramSample, 0, capacity) + } + return b +} + +func (p *BuffersPool) PutHistograms(b []RefHistogramSample) { + clear(b) + p.histograms.Put(b[:0]) +} + +func (p *BuffersPool) GetFloatHistograms(capacity int) []RefFloatHistogramSample { + b := p.floatHistograms.Get() + if b == nil { + return make([]RefFloatHistogramSample, 0, capacity) + } + return b +} + +func (p *BuffersPool) PutFloatHistograms(b []RefFloatHistogramSample) { + clear(b) + p.floatHistograms.Put(b[:0]) +} + +func (p *BuffersPool) GetMetadata(capacity int) []RefMetadata { + b := p.metadata.Get() + if b == nil { + return make([]RefMetadata, 0, capacity) + } + return b +} + +func (p *BuffersPool) PutMetadata(b []RefMetadata) { + clear(b) + p.metadata.Put(b[:0]) +} diff --git a/tsdb/record/buffers_test.go b/tsdb/record/buffers_test.go new file mode 100644 index 0000000000..dd55818f64 --- /dev/null +++ b/tsdb/record/buffers_test.go @@ -0,0 +1,50 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package record + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" +) + +func TestBuffersPool_PtrClear(t *testing.T) { + pool := NewBuffersPool() + + h := pool.GetHistograms(1) + h = append(h, RefHistogramSample{ + H: &histogram.Histogram{Schema: 1244124}, + }) + pool.PutHistograms(h) + + h2 := pool.GetHistograms(1) + require.Empty(t, h2) + require.Equal(t, 1, cap(h2)) + h2 = h2[:1] // extend to capacity to check previously stored item + require.Nil(t, h2[0].H) + + fh := pool.GetFloatHistograms(1) + fh = append(fh, RefFloatHistogramSample{ + FH: &histogram.FloatHistogram{Schema: 1244521}, + }) + pool.PutFloatHistograms(fh) + + fh2 := pool.GetFloatHistograms(1) + require.Empty(t, fh2) + require.Equal(t, 1, cap(fh2)) + fh2 = fh2[:1] // extend to capacity + require.Nil(t, fh2[0].FH) +} diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index a841a44fc8..2eeaf0dd99 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -45,11 +45,15 @@ var ( ) // WriteTo is an interface used by the Watcher to send the samples it's read -// from the WAL on to somewhere else. Functions will be called concurrently -// and it is left to the implementer to make sure they are safe. +// from the WAL on to somewhere else. +// +// Implementations must: +// * Ensure it's safe for concurrent goroutine use. +// * Ensure slices are not reused after method calls. type WriteTo interface { - // Append and AppendExemplar should block until the samples are fully accepted, - // whether enqueued in memory or successfully written to it's final destination. + // Append and all the rest Append* methods should block until + // the samples are fully accepted e.g. enqueued in memory. + // // Once returned, the WAL Watcher will not attempt to pass that data again. Append([]record.RefSample) bool AppendExemplars([]record.RefExemplar) bool @@ -60,9 +64,10 @@ type WriteTo interface { // UpdateSeriesSegment and SeriesReset are intended for // garbage-collection: - // First we call UpdateSeriesSegment on all current series. + // * First we call UpdateSeriesSegment on all current series. + // * Then SeriesReset is called. UpdateSeriesSegment([]record.RefSeries, int) - // Then SeriesReset is called to allow the deletion of all series + // SeriesReset is called to allow the deletion of all series // created in a segment lower than the argument. SeriesReset(int) } @@ -85,6 +90,7 @@ type WatcherMetrics struct { type Watcher struct { name string writer WriteTo + recordBuf *record.BuffersPool logger *slog.Logger walDir string lastCheckpoint string @@ -187,12 +193,25 @@ func (m *WatcherMetrics) Unregister() { } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger *slog.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher { +func NewWatcher( + metrics *WatcherMetrics, + readerMetrics *LiveReaderMetrics, + logger *slog.Logger, + name string, + writer WriteTo, + dir string, + sendExemplars, sendHistograms, sendMetadata bool, + recordBuf *record.BuffersPool, +) *Watcher { if logger == nil { logger = promslog.NewNopLogger() } + if recordBuf == nil { + recordBuf = record.NewBuffersPool() + } return &Watcher{ logger: logger, + recordBuf: recordBuf, writer: writer, metrics: metrics, readerMetrics: readerMetrics, @@ -492,19 +511,24 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { // Read from a segment and pass the details to w.writer. // Also used with readCheckpoint - implements segmentReadFn. +// TODO(bwplotka): Rename tail to !onlySeries; extremely confusing and easy to miss. func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { - var ( - dec = record.NewDecoder(labels.NewSymbolTable(), w.logger) // One table per WAL segment means it won't grow indefinitely. - series []record.RefSeries - samples []record.RefSample - samplesToSend []record.RefSample - exemplars []record.RefExemplar - histograms []record.RefHistogramSample - histogramsToSend []record.RefHistogramSample - floatHistograms []record.RefFloatHistogramSample - floatHistogramsToSend []record.RefFloatHistogramSample - metadata []record.RefMetadata - ) + series := w.recordBuf.GetRefSeries(512) + samples := w.recordBuf.GetSamples(512) + exemplars := w.recordBuf.GetExemplars(512) + histograms := w.recordBuf.GetHistograms(512) + floatHistograms := w.recordBuf.GetFloatHistograms(512) + metadata := w.recordBuf.GetMetadata(512) + defer func() { + w.recordBuf.PutRefSeries(series) + w.recordBuf.PutSamples(samples) + w.recordBuf.PutExemplars(exemplars) + w.recordBuf.PutHistograms(histograms) + w.recordBuf.PutFloatHistograms(floatHistograms) + w.recordBuf.PutMetadata(metadata) + }() + + dec := record.NewDecoder(labels.NewSymbolTable(), w.logger) // One table per WAL segment means it won't grow indefinitely. for r.Next() && !isClosed(w.quit) { var err error rec := r.Record() @@ -530,6 +554,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.recordDecodeFailsMetric.Inc() return err } + // Reuse the underlying array for efficiency. + // It's valid to do, because we override elements that we no longer need to read when filtering. + samplesToSend := samples[:0] for _, s := range samples { if s.T > w.startTimestamp { if !w.sendSamples { @@ -542,7 +569,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } if len(samplesToSend) > 0 { w.writer.Append(samplesToSend) - samplesToSend = samplesToSend[:0] } case record.Exemplars: @@ -575,6 +601,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.recordDecodeFailsMetric.Inc() return err } + // Reuse the underlying array for efficiency. + // It's valid to do, because we override elements that we no longer need to read when filtering. + histogramsToSend := histograms[:0] for _, h := range histograms { if h.T > w.startTimestamp { if !w.sendSamples { @@ -587,7 +616,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } if len(histogramsToSend) > 0 { w.writer.AppendHistograms(histogramsToSend) - histogramsToSend = histogramsToSend[:0] } case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: @@ -603,6 +631,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.recordDecodeFailsMetric.Inc() return err } + // Reuse the underlying array for efficiency. + // It's valid to do, because we override elements that we no longer need to read when filtering. + floatHistogramsToSend := floatHistograms[:0] for _, fh := range floatHistograms { if fh.T > w.startTimestamp { if !w.sendSamples { @@ -615,7 +646,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } if len(floatHistogramsToSend) > 0 { w.writer.AppendFloatHistograms(floatHistogramsToSend) - floatHistogramsToSend = floatHistogramsToSend[:0] } case record.Metadata: @@ -637,19 +667,18 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { // We're not interested in other types of records. } } - if err := r.Err(); err != nil { - return fmt.Errorf("segment %d: %w", segmentNum, err) - } - return nil + // NOTE: r.Err == io.EOF is a common case when tailing. + // Don't wrap error, callers are expected to handle EOF and wrap accordingly. + return r.Err() } // Go through all series in a segment updating the segmentNum, so we can delete older series. // Used with readCheckpoint - implements segmentReadFn. func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error { - var ( - dec = record.NewDecoder(labels.NewSymbolTable(), w.logger) // Needed for decoding; labels do not outlive this function. - series []record.RefSeries - ) + series := w.recordBuf.GetRefSeries(512) + defer w.recordBuf.PutRefSeries(series) + + dec := record.NewDecoder(labels.NewSymbolTable(), w.logger) // Needed for decoding; labels do not outlive this function. for r.Next() && !isClosed(w.quit) { rec := r.Record() w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc() @@ -671,10 +700,9 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error // We're only interested in series. } } - if err := r.Err(); err != nil { - return fmt.Errorf("segment %d: %w", segmentNum, err) - } - return nil + // NOTE: r.Err == io.EOF is a common case when tailing. + // Don't wrap error, callers are expected to handle EOF and wrap accordingly. + return r.Err() } func (w *Watcher) SetStartTime(t time.Time) { @@ -712,7 +740,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err err = readFn(w, r, index, false) sr.Close() if err != nil && !errors.Is(err, io.EOF) { - return fmt.Errorf("readSegment: %w", err) + return fmt.Errorf("readSegment %d: %w", index, err) } if r.Offset() != size { diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 51185f2215..abf5187b65 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -253,7 +253,7 @@ func TestWatcher_Tail(t *testing.T) { // Start watcher to that reads into a mock. wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true) + watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true, nil) // Update the time because we just created samples around "now" time and watcher // only starts watching after that time. watcher.SetStartTime(now) @@ -386,7 +386,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) go watcher.Start() expected := seriesCount @@ -475,7 +475,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { require.NoError(t, err) overwriteReadTimeout(t, time.Second) wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) go watcher.Start() expected := seriesCount * 2 @@ -547,7 +547,7 @@ func TestReadCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) go watcher.Start() expectedSeries := seriesCount @@ -616,7 +616,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -689,7 +689,7 @@ func TestCheckpointSeriesReset(t *testing.T) { overwriteReadTimeout(t, time.Second) wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) watcher.MaxSegment = -1 go watcher.Start() @@ -769,7 +769,7 @@ func TestRun_StartupTime(t *testing.T) { require.NoError(t, w.Close()) wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) watcher.MaxSegment = segments watcher.SetMetrics() @@ -840,7 +840,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { // Set up the watcher and run it in the background. wt := newWriteToMock(time.Millisecond) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) watcher.SetMetrics() watcher.MaxSegment = segmentsToRead