diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 26ed504e1e..206c766d50 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,6 +53,8 @@ jobs: env: # Enforce the Go version. GOTOOLCHAIN: local + # TODO: remove once 1.25 is the min version. + GOEXPERIMENT: synctest container: # The go version in this image should be N-1 wrt test_go. image: quay.io/prometheus/golang-builder:1.24-base @@ -63,6 +65,9 @@ jobs: - run: make build # Don't run NPM build; don't run race-detector. - run: make test GO_ONLY=1 test-flags="" + # TODO: remove once 1.25 is the min version. + # ensure we can build without the tag. + - run: GOEXPERIMENT="" make build test_ui: name: UI tests @@ -229,15 +234,16 @@ jobs: - name: Get golangci-lint version id: golangci-lint-version run: echo "version=$(make print-golangci-lint-version)" >> $GITHUB_OUTPUT - - name: Lint with stringlabels + - name: Lint uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0 with: - args: --verbose --build-tags=stringlabels + args: --verbose version: ${{ steps.golangci-lint-version.outputs.version }} - name: Lint with slicelabels uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0 with: - args: --verbose --build-tags=slicelabels + # goexperiment.synctest to ensure we don't miss files that depend on it. + args: --verbose --build-tags=slicelabels,goexperiment.synctest version: ${{ steps.golangci-lint-version.outputs.version }} - name: Lint with dedupelabels uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0 diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b1b3c5ea8e..6adaa0d4cb 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -962,10 +962,10 @@ func (t *QueueManager) Stop() { defer t.logger.Info("Remote storage stopped.") close(t.quit) - t.wg.Wait() // Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This // is to ensure we don't end up executing a reshard and shards.stop() at the same time, which // causes a closed channel panic. + t.wg.Wait() t.shards.stop() t.watcher.Stop() if t.mcfg.Send { @@ -1458,9 +1458,15 @@ func (q *queue) ReturnForReuse(batch []timeSeries) { // FlushAndShutdown stops the queue and flushes any samples. No appends can be // made after this is called. func (q *queue) FlushAndShutdown(done <-chan struct{}) { +loop: for q.tryEnqueueingBatch(done) { - time.Sleep(time.Second) + select { + case <-done: + break loop + case <-time.After(time.Second): + } } + q.batchMtx.Lock() defer q.batchMtx.Unlock() q.batch = nil diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 7a051656d5..a812f0e88a 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -49,6 +49,7 @@ import ( "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) const defaultFlushDeadline = 1 * time.Minute @@ -456,38 +457,34 @@ func TestSampleDeliveryOrder(t *testing.T) { } func TestShutdown(t *testing.T) { - // Not t.Parallel() because the test became flaky; see https://github.com/prometheus/prometheus/issues/17045 - deadline := 1 * time.Second - c := NewTestBlockedWriteClient() + t.Parallel() + synctest.Test(t, func(t *testing.T) { + deadline := 15 * time.Second + c := NewTestBlockedWriteClient() - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig - m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) - n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend - samples, series := createTimeseries(n, n) - m.StoreSeries(series, 0) - m.Start() + m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1) + n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend + samples, series := createTimeseries(n, n) + m.StoreSeries(series, 0) + m.Start() - // Append blocks to guarantee delivery, so we do it in the background. - go func() { - m.Append(samples) - }() - time.Sleep(100 * time.Millisecond) + // Append blocks to guarantee delivery, so we do it in the background. + go func() { + m.Append(samples) + }() + synctest.Wait() - // Test to ensure that Stop doesn't block. - start := time.Now() - m.Stop() - // The samples will never be delivered, so duration should - // be at least equal to deadline, otherwise the flush deadline - // was not respected. - duration := time.Since(start) - if duration > deadline+(deadline/10) { - t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) - } - if duration < deadline { - t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) - } + // Test to ensure that Stop doesn't block. + start := time.Now() + m.Stop() + // The samples will never be delivered, so duration should + // be at least equal to deadline, otherwise the flush deadline + // was not respected. + require.Equal(t, time.Since(start), deadline) + }) } func TestSeriesReset(t *testing.T) { diff --git a/util/testutil/synctest/disabled.go b/util/testutil/synctest/disabled.go new file mode 100644 index 0000000000..2cdcc72e07 --- /dev/null +++ b/util/testutil/synctest/disabled.go @@ -0,0 +1,29 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !goexperiment.synctest && !go1.25 + +package synctest + +import ( + "testing" +) + +func Test(t *testing.T, f func(t *testing.T)) { + t.Skip("goexperiment.synctest is not enabled") +} + +func Wait() { + // It isn't meant to be called outside of Test(). + panic("goexperiment.synctest is not enabled") +} diff --git a/util/testutil/synctest/enabled.go b/util/testutil/synctest/enabled.go new file mode 100644 index 0000000000..61aa85dcf7 --- /dev/null +++ b/util/testutil/synctest/enabled.go @@ -0,0 +1,31 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build goexperiment.synctest && !go1.25 + +package synctest + +import ( + "testing" + "testing/synctest" +) + +func Test(t *testing.T, f func(t *testing.T)) { + synctest.Run(func() { + f(t) + }) +} + +func Wait() { + synctest.Wait() +} diff --git a/util/testutil/synctest/synctest.go b/util/testutil/synctest/synctest.go new file mode 100644 index 0000000000..6780798a9b --- /dev/null +++ b/util/testutil/synctest/synctest.go @@ -0,0 +1,29 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.25 + +package synctest + +import ( + "testing" + "testing/synctest" +) + +func Test(t *testing.T, f func(t *testing.T)) { + synctest.Test(t, f) +} + +func Wait() { + synctest.Wait() +}