From 6806b68f930722746b23f3e30f184b22ca1fafe0 Mon Sep 17 00:00:00 2001 From: Ayoub Mrini Date: Wed, 29 Oct 2025 15:04:09 +0100 Subject: [PATCH] [release-3.7] fix: Remote-write: revert changes in the queue resharding logic (#17412) * Revert "chore: deprecate prometheus_remote_storage_{samples,exemplars,histograms}_in_total and prometheus_remote_storage_highest_timestamp_in_seconds" This reverts commit ba14bc49db31a1b0ba3127e6ddf59a9f32a08dff. Signed-off-by: machine424 * Revert "storage/remote: compute highestTimestamp and dataIn at QueueManager level" This reverts commit 184c7eb9186aa8fea09920f2f8e8aa8a603da300. Signed-off-by: machine424 * fix(remote-write): bring back the per queue metrics Signed-off-by: machine424 * test(remote): add TestRemoteWrite_ReshardingWithoutDeadlock to reproduce the sharding scale up deadlock Signed-off-by: machine424 --------- Signed-off-by: machine424 --- cmd/prometheus/main_test.go | 70 ++++++++++++++++++++++++++++ storage/remote/queue_manager.go | 35 +++++++++----- storage/remote/queue_manager_test.go | 26 ++++++++--- storage/remote/write.go | 27 +++++++++-- 4 files changed, 136 insertions(+), 22 deletions(-) diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index 368d33f9aa..56ddbc0176 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -965,3 +965,73 @@ remote_write: return true }, 10*time.Second, 100*time.Millisecond) } + +// TestRemoteWrite_ReshardingWithoutDeadlock ensures that resharding (scaling up) doesn't block when the shards are full. +// See: https://github.com/prometheus/prometheus/issues/17384. +func TestRemoteWrite_ReshardingWithoutDeadlock(t *testing.T) { + t.Parallel() + + tmpDir := t.TempDir() + configFile := filepath.Join(tmpDir, "prometheus.yml") + + port := testutil.RandomUnprivilegedPort(t) + + server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + time.Sleep(time.Second) + })) + t.Cleanup(server.Close) + + config := fmt.Sprintf(` +global: + scrape_interval: 100ms +scrape_configs: + - job_name: 'self' + static_configs: + - targets: ['localhost:%d'] + +remote_write: + - url: %s + queue_config: + # Speed up the queue being full. + capacity: 1 +`, port, server.URL) + require.NoError(t, os.WriteFile(configFile, []byte(config), 0o777)) + + prom := prometheusCommandWithLogging( + t, + configFile, + port, + fmt.Sprintf("--storage.tsdb.path=%s", tmpDir), + ) + require.NoError(t, prom.Start()) + + var checkInitialDesiredShardsOnce sync.Once + require.Eventually(t, func() bool { + r, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port)) + if err != nil { + return false + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + return false + } + + metrics, err := io.ReadAll(r.Body) + if err != nil { + return false + } + + checkInitialDesiredShardsOnce.Do(func() { + s, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_shards_desired") + require.NoError(t, err) + require.Equal(t, 1.0, s) + }) + + desiredShards, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_shards_desired") + if err != nil || desiredShards <= 1 { + return false + } + return true + // 3*shardUpdateDuration to allow for the resharding logic to run. + }, 30*time.Second, 1*time.Second) +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 6adaa0d4cb..6e417ef447 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -453,10 +453,11 @@ type QueueManager struct { quit chan struct{} wg sync.WaitGroup - dataIn, dataOut, dataOutDuration *ewmaRate + dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate - metrics *queueManagerMetrics - interner *pool + metrics *queueManagerMetrics + interner *pool + highestRecvTimestamp *maxTimestamp } // NewQueueManager builds a new QueueManager and starts a new @@ -470,6 +471,7 @@ func NewQueueManager( readerMetrics *wlog.LiveReaderMetrics, logger *slog.Logger, dir string, + samplesIn *ewmaRate, cfg config.QueueConfig, mCfg config.MetadataConfig, externalLabels labels.Labels, @@ -477,6 +479,7 @@ func NewQueueManager( client WriteClient, flushDeadline time.Duration, interner *pool, + highestRecvTimestamp *maxTimestamp, sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, @@ -516,12 +519,14 @@ func NewQueueManager( reshardChan: make(chan int), quit: make(chan struct{}), - dataIn: newEWMARate(ewmaWeight, shardUpdateDuration), + dataIn: samplesIn, + dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration), dataOut: newEWMARate(ewmaWeight, shardUpdateDuration), dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), - metrics: metrics, - interner: interner, + metrics: metrics, + interner: interner, + highestRecvTimestamp: highestRecvTimestamp, protoMsg: protoMsg, compr: compression.Snappy, // Hardcoded for now, but scaffolding exists for likely future use. @@ -711,6 +716,7 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { + t.dataDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { t.logger.Info("Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -772,6 +778,8 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[e.Ref] if !ok { + // Track dropped exemplars in the same EWMA for sharding calc. + t.dataDropped.incr(1) if _, ok := t.droppedSeries[e.Ref]; !ok { t.logger.Info("Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref) t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -833,6 +841,7 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { + t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -893,6 +902,7 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { + t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -1122,8 +1132,8 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool { // outlined in this functions implementation. It is up to the caller to reshard, or not, // based on the return value. func (t *QueueManager) calculateDesiredShards() int { - t.dataIn.tick() t.dataOut.tick() + t.dataDropped.tick() t.dataOutDuration.tick() // We use the number of incoming samples as a prediction of how much work we @@ -1133,12 +1143,13 @@ func (t *QueueManager) calculateDesiredShards() int { var ( dataInRate = t.dataIn.rate() dataOutRate = t.dataOut.rate() + dataKeptRatio = dataOutRate / (t.dataDropped.rate() + dataOutRate) dataOutDuration = t.dataOutDuration.rate() / float64(time.Second) - dataPendingRate = dataInRate - dataOutRate + dataPendingRate = dataInRate*dataKeptRatio - dataOutRate highestSent = t.metrics.highestSentTimestamp.Get() - highestRecv = t.metrics.highestTimestamp.Get() + highestRecv = t.highestRecvTimestamp.Get() delay = highestRecv - highestSent - dataPending = delay * dataInRate + dataPending = delay * dataInRate * dataKeptRatio ) if dataOutRate <= 0 { @@ -1150,12 +1161,13 @@ func (t *QueueManager) calculateDesiredShards() int { backlogCatchup = 0.05 * dataPending // Calculate Time to send one sample, averaged across all sends done this tick. timePerSample = dataOutDuration / dataOutRate - desiredShards = timePerSample * (dataInRate + backlogCatchup) + desiredShards = timePerSample * (dataInRate*dataKeptRatio + backlogCatchup) ) t.metrics.desiredNumShards.Set(desiredShards) t.logger.Debug("QueueManager.calculateDesiredShards", "dataInRate", dataInRate, "dataOutRate", dataOutRate, + "dataKeptRatio", dataKeptRatio, "dataPendingRate", dataPendingRate, "dataPending", dataPending, "dataOutDuration", dataOutDuration, @@ -1348,7 +1360,6 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool { return true } s.qm.metrics.highestTimestamp.Set(float64(data.timestamp / 1000)) - s.qm.dataIn.incr(1) return true } } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index a812f0e88a..c3db3d9190 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -54,6 +54,17 @@ import ( const defaultFlushDeadline = 1 * time.Minute +func newHighestTimestampMetric() *maxTimestamp { + return &maxTimestamp{ + Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "highest_timestamp_in_seconds", + Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet", + }), + } +} + func TestBasicContentNegotiation(t *testing.T) { t.Parallel() queueConfig := config.DefaultQueueConfig @@ -313,7 +324,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), nil, false, false, false, protoMsg) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg) return m } @@ -769,7 +780,7 @@ func TestDisableReshardOnRetry(t *testing.T) { } ) - m := NewQueueManager(metrics, nil, nil, nil, "", cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1) m.StoreSeries(fakeSeries, 0) // Attempt to samples while the manager is running. We immediately stop the @@ -1457,8 +1468,7 @@ func BenchmarkStoreSeries(b *testing.B) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - - m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1) m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs @@ -1558,8 +1568,9 @@ func TestCalculateDesiredShards(t *testing.T) { addSamples := func(s int64, ts time.Duration) { pendingSamples += s samplesIn.incr(s) + samplesIn.tick() - m.metrics.highestTimestamp.Set(float64(startedAt.Add(ts).Unix())) + m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix())) } // helper function for sending samples. @@ -1616,6 +1627,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { prevShards int dataIn int64 // Quantities normalised to seconds. dataOut int64 + dataDropped int64 dataOutDuration float64 backlog float64 expectedShards int @@ -1762,9 +1774,11 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { t.Run(tc.name, func(t *testing.T) { m.numShards = tc.prevShards forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second)) + samplesIn.tick() forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second)) + forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second)) forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration))) - m.metrics.highestTimestamp.value = tc.backlog // Not Set() because it can only increase value. + m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value. require.Equal(t, tc.expectedShards, m.calculateDesiredShards()) }) diff --git a/storage/remote/write.go b/storage/remote/write.go index 34c5893715..6bc02bd6fe 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -34,8 +34,6 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) -// TODO: Remove along with timestampTracker logic once we can be sure no user -// will encounter a gap that these metrics cover but other metrics don't. var ( samplesIn = promauto.NewCounter(prometheus.CounterOpts{ Namespace: namespace, @@ -68,9 +66,11 @@ type WriteStorage struct { externalLabels labels.Labels dir string queues map[string]*QueueManager + samplesIn *ewmaRate flushDeadline time.Duration interner *pool scraper ReadyScrapeManager + quit chan struct{} // For timestampTracker. highestTimestamp *maxTimestamp @@ -89,11 +89,11 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, logger: logger, reg: reg, flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), dir: dir, interner: newPool(), scraper: sm, - // TODO: Remove along with timestampTracker logic once we can be sure no user - // will encounter a gap that this metric covers but other metrics don't. + quit: make(chan struct{}), highestTimestamp: &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -107,9 +107,23 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, if reg != nil { reg.MustRegister(rws.highestTimestamp) } + go rws.run() return rws } +func (rws *WriteStorage) run() { + ticker := time.NewTicker(shardUpdateDuration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + rws.samplesIn.tick() + case <-rws.quit: + return + } + } +} + func (rws *WriteStorage) Notify() { rws.mtx.Lock() defer rws.mtx.Unlock() @@ -187,6 +201,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.liveReaderMetrics, rws.logger, rws.dir, + rws.samplesIn, rwConf.QueueConfig, rwConf.MetadataConfig, conf.GlobalConfig.ExternalLabels, @@ -194,6 +209,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { c, rws.flushDeadline, rws.interner, + rws.highestTimestamp, rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, @@ -254,6 +270,7 @@ func (rws *WriteStorage) Close() error { for _, q := range rws.queues { q.Stop() } + close(rws.quit) rws.watcherMetrics.Unregister() rws.liveReaderMetrics.Unregister() @@ -329,6 +346,8 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { + t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) + samplesIn.Add(float64(t.samples)) exemplarsIn.Add(float64(t.exemplars)) histogramsIn.Add(float64(t.histograms))