mirror of
https://github.com/prometheus/prometheus.git
synced 2025-11-10 13:21:01 +01:00
test(storage/remote/queue_manager_test.go): use synctest in TestShutdown for better
control over time The test becomes flaky after it was asked to run on parallel and "fight" for resources let's hide all of that Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
parent
7416f33df5
commit
8462515c75
@ -962,10 +962,10 @@ func (t *QueueManager) Stop() {
|
|||||||
defer t.logger.Info("Remote storage stopped.")
|
defer t.logger.Info("Remote storage stopped.")
|
||||||
|
|
||||||
close(t.quit)
|
close(t.quit)
|
||||||
t.wg.Wait()
|
|
||||||
// Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This
|
// Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This
|
||||||
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
|
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
|
||||||
// causes a closed channel panic.
|
// causes a closed channel panic.
|
||||||
|
t.wg.Wait()
|
||||||
t.shards.stop()
|
t.shards.stop()
|
||||||
t.watcher.Stop()
|
t.watcher.Stop()
|
||||||
if t.mcfg.Send {
|
if t.mcfg.Send {
|
||||||
@ -1458,9 +1458,18 @@ func (q *queue) ReturnForReuse(batch []timeSeries) {
|
|||||||
// FlushAndShutdown stops the queue and flushes any samples. No appends can be
|
// FlushAndShutdown stops the queue and flushes any samples. No appends can be
|
||||||
// made after this is called.
|
// made after this is called.
|
||||||
func (q *queue) FlushAndShutdown(done <-chan struct{}) {
|
func (q *queue) FlushAndShutdown(done <-chan struct{}) {
|
||||||
|
ticker := time.NewTicker(time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
loop:
|
||||||
for q.tryEnqueueingBatch(done) {
|
for q.tryEnqueueingBatch(done) {
|
||||||
time.Sleep(time.Second)
|
select {
|
||||||
|
case <-done:
|
||||||
|
break loop
|
||||||
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
q.batchMtx.Lock()
|
q.batchMtx.Lock()
|
||||||
defer q.batchMtx.Unlock()
|
defer q.batchMtx.Unlock()
|
||||||
q.batch = nil
|
q.batch = nil
|
||||||
|
|||||||
@ -24,6 +24,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"testing/synctest"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
@ -34,6 +35,7 @@ import (
|
|||||||
"github.com/prometheus/common/promslog"
|
"github.com/prometheus/common/promslog"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
"go.uber.org/goleak"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
@ -53,6 +55,10 @@ import (
|
|||||||
|
|
||||||
const defaultFlushDeadline = 1 * time.Minute
|
const defaultFlushDeadline = 1 * time.Minute
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
goleak.VerifyTestMain(m)
|
||||||
|
}
|
||||||
|
|
||||||
func TestBasicContentNegotiation(t *testing.T) {
|
func TestBasicContentNegotiation(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
queueConfig := config.DefaultQueueConfig
|
queueConfig := config.DefaultQueueConfig
|
||||||
@ -456,8 +462,9 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestShutdown(t *testing.T) {
|
func TestShutdown(t *testing.T) {
|
||||||
// Not t.Parallel() because the test became flaky; see https://github.com/prometheus/prometheus/issues/17045
|
t.Parallel()
|
||||||
deadline := 1 * time.Second
|
synctest.Run(func() {
|
||||||
|
deadline := 15 * time.Second
|
||||||
c := NewTestBlockedWriteClient()
|
c := NewTestBlockedWriteClient()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
@ -473,7 +480,7 @@ func TestShutdown(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
m.Append(samples)
|
m.Append(samples)
|
||||||
}()
|
}()
|
||||||
time.Sleep(100 * time.Millisecond)
|
synctest.Wait()
|
||||||
|
|
||||||
// Test to ensure that Stop doesn't block.
|
// Test to ensure that Stop doesn't block.
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@ -481,13 +488,8 @@ func TestShutdown(t *testing.T) {
|
|||||||
// The samples will never be delivered, so duration should
|
// The samples will never be delivered, so duration should
|
||||||
// be at least equal to deadline, otherwise the flush deadline
|
// be at least equal to deadline, otherwise the flush deadline
|
||||||
// was not respected.
|
// was not respected.
|
||||||
duration := time.Since(start)
|
require.Equal(t, time.Since(start), deadline)
|
||||||
if duration > deadline+(deadline/10) {
|
})
|
||||||
t.Errorf("Took too long to shutdown: %s > %s", duration, deadline)
|
|
||||||
}
|
|
||||||
if duration < deadline {
|
|
||||||
t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSeriesReset(t *testing.T) {
|
func TestSeriesReset(t *testing.T) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user