From 8462515c75d823fecd2d16e1908289fcd64887a0 Mon Sep 17 00:00:00 2001 From: machine424 Date: Wed, 13 Aug 2025 12:24:51 +0200 Subject: [PATCH] test(storage/remote/queue_manager_test.go): use synctest in TestShutdown for better control over time The test becomes flaky after it was asked to run on parallel and "fight" for resources let's hide all of that Signed-off-by: machine424 --- storage/remote/queue_manager.go | 13 ++++++- storage/remote/queue_manager_test.go | 58 ++++++++++++++-------------- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b1b3c5ea8e..600966a385 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -962,10 +962,10 @@ func (t *QueueManager) Stop() { defer t.logger.Info("Remote storage stopped.") close(t.quit) - t.wg.Wait() // Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This // is to ensure we don't end up executing a reshard and shards.stop() at the same time, which // causes a closed channel panic. + t.wg.Wait() t.shards.stop() t.watcher.Stop() if t.mcfg.Send { @@ -1458,9 +1458,18 @@ func (q *queue) ReturnForReuse(batch []timeSeries) { // FlushAndShutdown stops the queue and flushes any samples. No appends can be // made after this is called. func (q *queue) FlushAndShutdown(done <-chan struct{}) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + +loop: for q.tryEnqueueingBatch(done) { - time.Sleep(time.Second) + select { + case <-done: + break loop + case <-ticker.C: + } } + q.batchMtx.Lock() defer q.batchMtx.Unlock() q.batch = nil diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 7a051656d5..5959890cee 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -24,6 +24,7 @@ import ( "strings" "sync" "testing" + "testing/synctest" "time" "github.com/gogo/protobuf/proto" @@ -34,6 +35,7 @@ import ( "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/goleak" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" @@ -53,6 +55,10 @@ import ( const defaultFlushDeadline = 1 * time.Minute +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + func TestBasicContentNegotiation(t *testing.T) { t.Parallel() queueConfig := config.DefaultQueueConfig @@ -456,38 +462,34 @@ func TestSampleDeliveryOrder(t *testing.T) { } func TestShutdown(t *testing.T) { - // Not t.Parallel() because the test became flaky; see https://github.com/prometheus/prometheus/issues/17045 - deadline := 1 * time.Second - c := NewTestBlockedWriteClient() + t.Parallel() + synctest.Run(func() { + deadline := 15 * time.Second + c := NewTestBlockedWriteClient() - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig - m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) - n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend - samples, series := createTimeseries(n, n) - m.StoreSeries(series, 0) - m.Start() + m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) + n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend + samples, series := createTimeseries(n, n) + m.StoreSeries(series, 0) + m.Start() - // Append blocks to guarantee delivery, so we do it in the background. - go func() { - m.Append(samples) - }() - time.Sleep(100 * time.Millisecond) + // Append blocks to guarantee delivery, so we do it in the background. + go func() { + m.Append(samples) + }() + synctest.Wait() - // Test to ensure that Stop doesn't block. - start := time.Now() - m.Stop() - // The samples will never be delivered, so duration should - // be at least equal to deadline, otherwise the flush deadline - // was not respected. - duration := time.Since(start) - if duration > deadline+(deadline/10) { - t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) - } - if duration < deadline { - t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) - } + // Test to ensure that Stop doesn't block. + start := time.Now() + m.Stop() + // The samples will never be delivered, so duration should + // be at least equal to deadline, otherwise the flush deadline + // was not respected. + require.Equal(t, time.Since(start), deadline) + }) } func TestSeriesReset(t *testing.T) {