diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b1b3c5ea8e..600966a385 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -962,10 +962,10 @@ func (t *QueueManager) Stop() { defer t.logger.Info("Remote storage stopped.") close(t.quit) - t.wg.Wait() // Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This // is to ensure we don't end up executing a reshard and shards.stop() at the same time, which // causes a closed channel panic. + t.wg.Wait() t.shards.stop() t.watcher.Stop() if t.mcfg.Send { @@ -1458,9 +1458,18 @@ func (q *queue) ReturnForReuse(batch []timeSeries) { // FlushAndShutdown stops the queue and flushes any samples. No appends can be // made after this is called. func (q *queue) FlushAndShutdown(done <-chan struct{}) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + +loop: for q.tryEnqueueingBatch(done) { - time.Sleep(time.Second) + select { + case <-done: + break loop + case <-ticker.C: + } } + q.batchMtx.Lock() defer q.batchMtx.Unlock() q.batch = nil diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 7a051656d5..5959890cee 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -24,6 +24,7 @@ import ( "strings" "sync" "testing" + "testing/synctest" "time" "github.com/gogo/protobuf/proto" @@ -34,6 +35,7 @@ import ( "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/goleak" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" @@ -53,6 +55,10 @@ import ( const defaultFlushDeadline = 1 * time.Minute +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + func TestBasicContentNegotiation(t *testing.T) { t.Parallel() queueConfig := config.DefaultQueueConfig @@ -456,38 +462,34 @@ func TestSampleDeliveryOrder(t *testing.T) { } func TestShutdown(t *testing.T) { - // Not t.Parallel() because the test became flaky; see https://github.com/prometheus/prometheus/issues/17045 - deadline := 1 * time.Second - c := NewTestBlockedWriteClient() + t.Parallel() + synctest.Run(func() { + deadline := 15 * time.Second + c := NewTestBlockedWriteClient() - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig - m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) - n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend - samples, series := createTimeseries(n, n) - m.StoreSeries(series, 0) - m.Start() + m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) + n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend + samples, series := createTimeseries(n, n) + m.StoreSeries(series, 0) + m.Start() - // Append blocks to guarantee delivery, so we do it in the background. - go func() { - m.Append(samples) - }() - time.Sleep(100 * time.Millisecond) + // Append blocks to guarantee delivery, so we do it in the background. + go func() { + m.Append(samples) + }() + synctest.Wait() - // Test to ensure that Stop doesn't block. - start := time.Now() - m.Stop() - // The samples will never be delivered, so duration should - // be at least equal to deadline, otherwise the flush deadline - // was not respected. - duration := time.Since(start) - if duration > deadline+(deadline/10) { - t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) - } - if duration < deadline { - t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) - } + // Test to ensure that Stop doesn't block. + start := time.Now() + m.Stop() + // The samples will never be delivered, so duration should + // be at least equal to deadline, otherwise the flush deadline + // was not respected. + require.Equal(t, time.Since(start), deadline) + }) } func TestSeriesReset(t *testing.T) {