diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 215537e4f0..46bd7e1ff2 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1116,21 +1116,35 @@ func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { // FlushAndShutdown stops the queue and flushes any samples. No appends can be // made after this is called. func (q *queue) FlushAndShutdown(done <-chan struct{}) { - q.batchMtx.Lock() - defer q.batchMtx.Unlock() - - if len(q.batch) > 0 { - select { - case q.batchQueue <- q.batch: - case <-done: - // The shard has been hard shut down, so no more samples can be - // sent. Drop everything left in the queue. - } + for q.tryEnqueueingBatch(done) { + time.Sleep(time.Second) } q.batch = nil close(q.batchQueue) } +// tryEnqueueingBatch tries to send a batch if necessary. If sending needs to +// be retried it will return true. +func (q *queue) tryEnqueueingBatch(done <-chan struct{}) bool { + q.batchMtx.Lock() + defer q.batchMtx.Unlock() + if len(q.batch) == 0 { + return false + } + + select { + case q.batchQueue <- q.batch: + return false + case <-done: + // The shard has been hard shut down, so no more samples can be sent. + // No need to try again as we will drop everything left in the queue. + return false + default: + // The batchQueue is full, so we need to try again later. + return true + } +} + func (q *queue) newBatch(capacity int) []sampleOrExemplar { q.poolMtx.Lock() defer q.poolMtx.Unlock() diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 0f8f9f4495..ff3f1aaf20 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1183,3 +1183,29 @@ func TestQueueManagerMetrics(t *testing.T) { err = client_testutil.GatherAndCompare(reg, strings.NewReader("")) require.NoError(t, err) } + +func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) { + capacity := 100 + batchSize := 10 + queue := newQueue(batchSize, capacity) + for i := 0; i < capacity+batchSize; i++ { + queue.Append(sampleOrExemplar{}) + } + + done := make(chan struct{}) + go queue.FlushAndShutdown(done) + go func() { + // Give enough time for FlushAndShutdown to acquire the lock. queue.Batch() + // should not block forever even if the lock is acquired. + time.Sleep(10 * time.Millisecond) + queue.Batch() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Error("Deadlock in FlushAndShutdown detected") + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + t.FailNow() + } +}