[release-3.7] fix: Remote-write: revert changes in the queue resharding logic (#17412)

* Revert "chore: deprecate prometheus_remote_storage_{samples,exemplars,histograms}_in_total and prometheus_remote_storage_highest_timestamp_in_seconds"

This reverts commit ba14bc49db31a1b0ba3127e6ddf59a9f32a08dff.

Signed-off-by: machine424 <ayoubmrini424@gmail.com>

* Revert "storage/remote: compute highestTimestamp and dataIn at QueueManager level"

This reverts commit 184c7eb9186aa8fea09920f2f8e8aa8a603da300.

Signed-off-by: machine424 <ayoubmrini424@gmail.com>

* fix(remote-write): bring back the per queue metrics

Signed-off-by: machine424 <ayoubmrini424@gmail.com>

* test(remote): add TestRemoteWrite_ReshardingWithoutDeadlock to reproduce the sharding scale up deadlock

Signed-off-by: machine424 <ayoubmrini424@gmail.com>

---------

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
Ayoub Mrini 2025-10-29 15:04:09 +01:00 committed by GitHub
parent 92ccadc96d
commit 6806b68f93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 136 additions and 22 deletions

View File

@ -965,3 +965,73 @@ remote_write:
return true
}, 10*time.Second, 100*time.Millisecond)
}
// TestRemoteWrite_ReshardingWithoutDeadlock ensures that resharding (scaling up) doesn't block when the shards are full.
// See: https://github.com/prometheus/prometheus/issues/17384.
func TestRemoteWrite_ReshardingWithoutDeadlock(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
configFile := filepath.Join(tmpDir, "prometheus.yml")
port := testutil.RandomUnprivilegedPort(t)
server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
time.Sleep(time.Second)
}))
t.Cleanup(server.Close)
config := fmt.Sprintf(`
global:
scrape_interval: 100ms
scrape_configs:
- job_name: 'self'
static_configs:
- targets: ['localhost:%d']
remote_write:
- url: %s
queue_config:
# Speed up the queue being full.
capacity: 1
`, port, server.URL)
require.NoError(t, os.WriteFile(configFile, []byte(config), 0o777))
prom := prometheusCommandWithLogging(
t,
configFile,
port,
fmt.Sprintf("--storage.tsdb.path=%s", tmpDir),
)
require.NoError(t, prom.Start())
var checkInitialDesiredShardsOnce sync.Once
require.Eventually(t, func() bool {
r, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port))
if err != nil {
return false
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
return false
}
metrics, err := io.ReadAll(r.Body)
if err != nil {
return false
}
checkInitialDesiredShardsOnce.Do(func() {
s, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_shards_desired")
require.NoError(t, err)
require.Equal(t, 1.0, s)
})
desiredShards, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_shards_desired")
if err != nil || desiredShards <= 1 {
return false
}
return true
// 3*shardUpdateDuration to allow for the resharding logic to run.
}, 30*time.Second, 1*time.Second)
}

View File

@ -453,10 +453,11 @@ type QueueManager struct {
quit chan struct{}
wg sync.WaitGroup
dataIn, dataOut, dataOutDuration *ewmaRate
dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate
metrics *queueManagerMetrics
interner *pool
highestRecvTimestamp *maxTimestamp
}
// NewQueueManager builds a new QueueManager and starts a new
@ -470,6 +471,7 @@ func NewQueueManager(
readerMetrics *wlog.LiveReaderMetrics,
logger *slog.Logger,
dir string,
samplesIn *ewmaRate,
cfg config.QueueConfig,
mCfg config.MetadataConfig,
externalLabels labels.Labels,
@ -477,6 +479,7 @@ func NewQueueManager(
client WriteClient,
flushDeadline time.Duration,
interner *pool,
highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
@ -516,12 +519,14 @@ func NewQueueManager(
reshardChan: make(chan int),
quit: make(chan struct{}),
dataIn: newEWMARate(ewmaWeight, shardUpdateDuration),
dataIn: samplesIn,
dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
dataOut: newEWMARate(ewmaWeight, shardUpdateDuration),
dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
metrics: metrics,
interner: interner,
highestRecvTimestamp: highestRecvTimestamp,
protoMsg: protoMsg,
compr: compression.Snappy, // Hardcoded for now, but scaffolding exists for likely future use.
@ -711,6 +716,7 @@ outer:
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref]
if !ok {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok {
t.logger.Info("Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -772,6 +778,8 @@ outer:
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[e.Ref]
if !ok {
// Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[e.Ref]; !ok {
t.logger.Info("Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref)
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -833,6 +841,7 @@ outer:
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -893,6 +902,7 @@ outer:
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -1122,8 +1132,8 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
// outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value.
func (t *QueueManager) calculateDesiredShards() int {
t.dataIn.tick()
t.dataOut.tick()
t.dataDropped.tick()
t.dataOutDuration.tick()
// We use the number of incoming samples as a prediction of how much work we
@ -1133,12 +1143,13 @@ func (t *QueueManager) calculateDesiredShards() int {
var (
dataInRate = t.dataIn.rate()
dataOutRate = t.dataOut.rate()
dataKeptRatio = dataOutRate / (t.dataDropped.rate() + dataOutRate)
dataOutDuration = t.dataOutDuration.rate() / float64(time.Second)
dataPendingRate = dataInRate - dataOutRate
dataPendingRate = dataInRate*dataKeptRatio - dataOutRate
highestSent = t.metrics.highestSentTimestamp.Get()
highestRecv = t.metrics.highestTimestamp.Get()
highestRecv = t.highestRecvTimestamp.Get()
delay = highestRecv - highestSent
dataPending = delay * dataInRate
dataPending = delay * dataInRate * dataKeptRatio
)
if dataOutRate <= 0 {
@ -1150,12 +1161,13 @@ func (t *QueueManager) calculateDesiredShards() int {
backlogCatchup = 0.05 * dataPending
// Calculate Time to send one sample, averaged across all sends done this tick.
timePerSample = dataOutDuration / dataOutRate
desiredShards = timePerSample * (dataInRate + backlogCatchup)
desiredShards = timePerSample * (dataInRate*dataKeptRatio + backlogCatchup)
)
t.metrics.desiredNumShards.Set(desiredShards)
t.logger.Debug("QueueManager.calculateDesiredShards",
"dataInRate", dataInRate,
"dataOutRate", dataOutRate,
"dataKeptRatio", dataKeptRatio,
"dataPendingRate", dataPendingRate,
"dataPending", dataPending,
"dataOutDuration", dataOutDuration,
@ -1348,7 +1360,6 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
return true
}
s.qm.metrics.highestTimestamp.Set(float64(data.timestamp / 1000))
s.qm.dataIn.incr(1)
return true
}
}

View File

@ -54,6 +54,17 @@ import (
const defaultFlushDeadline = 1 * time.Minute
func newHighestTimestampMetric() *maxTimestamp {
return &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet",
}),
}
}
func TestBasicContentNegotiation(t *testing.T) {
t.Parallel()
queueConfig := config.DefaultQueueConfig
@ -313,7 +324,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), nil, false, false, false, protoMsg)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg)
return m
}
@ -769,7 +780,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}
)
m := NewQueueManager(metrics, nil, nil, nil, "", cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
m.StoreSeries(fakeSeries, 0)
// Attempt to samples while the manager is running. We immediately stop the
@ -1457,8 +1468,7 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
@ -1558,8 +1568,9 @@ func TestCalculateDesiredShards(t *testing.T) {
addSamples := func(s int64, ts time.Duration) {
pendingSamples += s
samplesIn.incr(s)
samplesIn.tick()
m.metrics.highestTimestamp.Set(float64(startedAt.Add(ts).Unix()))
m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix()))
}
// helper function for sending samples.
@ -1616,6 +1627,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
prevShards int
dataIn int64 // Quantities normalised to seconds.
dataOut int64
dataDropped int64
dataOutDuration float64
backlog float64
expectedShards int
@ -1762,9 +1774,11 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
m.numShards = tc.prevShards
forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second))
samplesIn.tick()
forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second))
forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second))
forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration)))
m.metrics.highestTimestamp.value = tc.backlog // Not Set() because it can only increase value.
m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value.
require.Equal(t, tc.expectedShards, m.calculateDesiredShards())
})

View File

@ -34,8 +34,6 @@ import (
"github.com/prometheus/prometheus/tsdb/wlog"
)
// TODO: Remove along with timestampTracker logic once we can be sure no user
// will encounter a gap that these metrics cover but other metrics don't.
var (
samplesIn = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
@ -68,9 +66,11 @@ type WriteStorage struct {
externalLabels labels.Labels
dir string
queues map[string]*QueueManager
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
scraper ReadyScrapeManager
quit chan struct{}
// For timestampTracker.
highestTimestamp *maxTimestamp
@ -89,11 +89,11 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
logger: logger,
reg: reg,
flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
dir: dir,
interner: newPool(),
scraper: sm,
// TODO: Remove along with timestampTracker logic once we can be sure no user
// will encounter a gap that this metric covers but other metrics don't.
quit: make(chan struct{}),
highestTimestamp: &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
@ -107,9 +107,23 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
if reg != nil {
reg.MustRegister(rws.highestTimestamp)
}
go rws.run()
return rws
}
func (rws *WriteStorage) run() {
ticker := time.NewTicker(shardUpdateDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rws.samplesIn.tick()
case <-rws.quit:
return
}
}
}
func (rws *WriteStorage) Notify() {
rws.mtx.Lock()
defer rws.mtx.Unlock()
@ -187,6 +201,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.liveReaderMetrics,
rws.logger,
rws.dir,
rws.samplesIn,
rwConf.QueueConfig,
rwConf.MetadataConfig,
conf.GlobalConfig.ExternalLabels,
@ -194,6 +209,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
c,
rws.flushDeadline,
rws.interner,
rws.highestTimestamp,
rws.scraper,
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
@ -254,6 +270,7 @@ func (rws *WriteStorage) Close() error {
for _, q := range rws.queues {
q.Stop()
}
close(rws.quit)
rws.watcherMetrics.Unregister()
rws.liveReaderMetrics.Unregister()
@ -329,6 +346,8 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada
// Commit implements storage.Appender.
func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)
samplesIn.Add(float64(t.samples))
exemplarsIn.Add(float64(t.exemplars))
histogramsIn.Add(float64(t.histograms))