diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 9dd50ee601..0a9dff82df 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -189,6 +189,7 @@ type QueueManager struct { sentBatchDuration prometheus.Observer succeededSamplesTotal prometheus.Counter retriedSamplesTotal prometheus.Counter + shardCapacity prometheus.Gauge } // NewQueueManager builds a new QueueManager. @@ -219,27 +220,11 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), - - highestSentTimestampMetric: &maxGauge{ - Gauge: queueHighestSentTimestamp.WithLabelValues(name), - }, - pendingSamplesMetric: queuePendingSamples.WithLabelValues(name), - enqueueRetriesMetric: enqueueRetriesTotal.WithLabelValues(name), - droppedSamplesTotal: droppedSamplesTotal.WithLabelValues(name), - numShardsMetric: numShards.WithLabelValues(name), - failedSamplesTotal: failedSamplesTotal.WithLabelValues(name), - sentBatchDuration: sentBatchDuration.WithLabelValues(name), - succeededSamplesTotal: succeededSamplesTotal.WithLabelValues(name), - retriedSamplesTotal: retriedSamplesTotal.WithLabelValues(name), } t.watcher = NewWALWatcher(logger, name, t, walDir) t.shards = t.newShards() - // Initialise some metrics. - shardCapacity.WithLabelValues(name).Set(float64(t.cfg.Capacity)) - t.pendingSamplesMetric.Set(0) - return t } @@ -307,6 +292,27 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { + // Setup the QueueManagers metrics. We do this here rather than in the + // constructor because of the ordering of creating Queue Managers's, stopping them, + // and then starting new ones in storage/remote/storage.go ApplyConfig. + name := t.client.Name() + t.highestSentTimestampMetric = &maxGauge{ + Gauge: queueHighestSentTimestamp.WithLabelValues(name), + } + t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name) + t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name) + t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name) + t.numShardsMetric = numShards.WithLabelValues(name) + t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name) + t.sentBatchDuration = sentBatchDuration.WithLabelValues(name) + t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name) + t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name) + t.shardCapacity = shardCapacity.WithLabelValues(name) + + // Initialise some metrics. + t.shardCapacity.Set(float64(t.cfg.Capacity)) + t.pendingSamplesMetric.Set(0) + t.shards.start(t.numShards) t.watcher.Start() @@ -335,6 +341,18 @@ func (t *QueueManager) Stop() { for _, labels := range t.seriesLabels { release(labels) } + // Delete metrics so we don't have alerts for queues that are gone. + name := t.client.Name() + queueHighestSentTimestamp.DeleteLabelValues(name) + queuePendingSamples.DeleteLabelValues(name) + enqueueRetriesTotal.DeleteLabelValues(name) + droppedSamplesTotal.DeleteLabelValues(name) + numShards.DeleteLabelValues(name) + failedSamplesTotal.DeleteLabelValues(name) + sentBatchDuration.DeleteLabelValues(name) + succeededSamplesTotal.DeleteLabelValues(name) + retriedSamplesTotal.DeleteLabelValues(name) + shardCapacity.DeleteLabelValues(name) } // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 4c58264158..3dba373b48 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -128,18 +128,25 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string quit: make(chan struct{}), done: make(chan struct{}), - recordsReadMetric: watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: name}), - recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name), - samplesSentPreTailing: watcherSamplesSentPreTailing.WithLabelValues(name), - currentSegmentMetric: watcherCurrentSegment.WithLabelValues(name), - maxSegment: -1, } } +func (w *WALWatcher) setMetrics() { + // Setup the WAL Watchers metrics. We do this here rather than in the + // constructor because of the ordering of creating Queue Managers's, + // stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig. + w.recordsReadMetric = watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: w.name}) + w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.name) + w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.name) + w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.name) +} + // Start the WALWatcher. func (w *WALWatcher) Start() { + w.setMetrics() level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) + go w.loop() } @@ -147,6 +154,14 @@ func (w *WALWatcher) Start() { func (w *WALWatcher) Stop() { close(w.quit) <-w.done + + // Records read metric has series and samples. + watcherRecordsRead.DeleteLabelValues(w.name, "series") + watcherRecordsRead.DeleteLabelValues(w.name, "samples") + watcherRecordDecodeFails.DeleteLabelValues(w.name) + watcherSamplesSentPreTailing.DeleteLabelValues(w.name) + watcherCurrentSegment.DeleteLabelValues(w.name) + level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index a802a76462..31f27d3807 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -102,8 +102,6 @@ func TestTailSamples(t *testing.T) { err = os.Mkdir(wdir, 0777) testutil.Ok(t, err) - // os.Create(wal.SegmentName(wdir, 30)) - enc := tsdb.RecordEncoder{} w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) testutil.Ok(t, err) @@ -139,6 +137,9 @@ func TestTailSamples(t *testing.T) { wt := newWriteToMock() watcher := NewWALWatcher(nil, "", wt, dir) watcher.startTime = now.UnixNano() + + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() for i := first; i <= last; i++ { segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i)) testutil.Ok(t, err) @@ -148,14 +149,12 @@ func TestTailSamples(t *testing.T) { // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } - go watcher.Start() expectedSeries := seriesCount expectedSamples := seriesCount * samplesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumLabels() >= expectedSeries }) - watcher.Stop() testutil.Equals(t, expectedSeries, wt.checkNumLabels()) testutil.Equals(t, expectedSamples, wt.samplesAppended) } @@ -424,6 +423,9 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { watcher := NewWALWatcher(nil, "", wt, dir) watcher.maxSegment = -1 + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() + lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir) testutil.Ok(t, err)