diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e2fb6b26f8..1bfea4041c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -30,7 +30,6 @@ import ( "runtime" "strings" "sync" - "sync/atomic" "syscall" "time" @@ -47,6 +46,7 @@ import ( "github.com/prometheus/common/version" jcfg "github.com/uber/jaeger-client-go/config" jprom "github.com/uber/jaeger-lib/metrics/prometheus" + "go.uber.org/atomic" kingpin "gopkg.in/alecthomas/kingpin.v2" "k8s.io/klog" @@ -801,18 +801,18 @@ func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, } type safePromQLNoStepSubqueryInterval struct { - value int64 + value atomic.Int64 } func durationToInt64Millis(d time.Duration) int64 { return int64(d / time.Millisecond) } func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) { - atomic.StoreInt64(&i.value, durationToInt64Millis(time.Duration(ev))) + i.value.Store(durationToInt64Millis(time.Duration(ev))) } func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 { - return atomic.LoadInt64(&i.value) + return i.value.Load() } func reloadConfig(filename string, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...func(*config.Config) error) (err error) { diff --git a/notifier/notifier.go b/notifier/notifier.go index 3f64919c38..08ccccf202 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -26,19 +26,19 @@ import ( "path" "strings" "sync" - "sync/atomic" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/go-openapi/strfmt" "github.com/pkg/errors" + "go.uber.org/atomic" + + "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" - - "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" @@ -466,7 +466,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { var ( wg sync.WaitGroup - numSuccess uint64 + numSuccess atomic.Uint64 ) for _, ams := range amSets { var ( @@ -527,7 +527,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err) n.metrics.errors.WithLabelValues(url).Inc() } else { - atomic.AddUint64(&numSuccess, 1) + numSuccess.Inc() } n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts))) @@ -541,7 +541,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { wg.Wait() - return numSuccess > 0 + return numSuccess.Load() > 0 } func alertsToOpenAPIAlerts(alerts []*Alert) models.PostableAlerts { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 5c39afcd2b..a7fcc51ca8 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -22,7 +22,6 @@ import ( "net/http" "net/http/httptest" "net/url" - "sync/atomic" "testing" "time" @@ -30,6 +29,7 @@ import ( "github.com/prometheus/alertmanager/api/v2/models" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" + "go.uber.org/atomic" yaml "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/config" @@ -102,10 +102,12 @@ func TestHandlerSendAll(t *testing.T) { var ( errc = make(chan error, 1) expected = make([]*Alert, 0, maxBatchSize) - status1, status2 = int32(http.StatusOK), int32(http.StatusOK) + status1, status2 atomic.Int32 ) + status1.Store(int32(http.StatusOK)) + status2.Store(int32(http.StatusOK)) - newHTTPServer := func(u, p string, status *int32) *httptest.Server { + newHTTPServer := func(u, p string, status *atomic.Int32) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var err error defer func() { @@ -128,7 +130,7 @@ func TestHandlerSendAll(t *testing.T) { if err == nil { err = alertsEqual(expected, alerts) } - w.WriteHeader(int(atomic.LoadInt32(status))) + w.WriteHeader(int(status.Load())) })) } server1 := newHTTPServer("prometheus", "testing_password", &status1) @@ -194,11 +196,11 @@ func TestHandlerSendAll(t *testing.T) { testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly") checkNoErr() - atomic.StoreInt32(&status1, int32(http.StatusNotFound)) + status1.Store(int32(http.StatusNotFound)) testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly") checkNoErr() - atomic.StoreInt32(&status2, int32(http.StatusInternalServerError)) + status2.Store(int32(http.StatusInternalServerError)) testutil.Assert(t, !h.sendAll(h.queue...), "all sends succeeded unexpectedly") checkNoErr() } diff --git a/storage/remote/ewma.go b/storage/remote/ewma.go index 88cc79d756..c7fb0289b0 100644 --- a/storage/remote/ewma.go +++ b/storage/remote/ewma.go @@ -15,15 +15,14 @@ package remote import ( "sync" - "sync/atomic" "time" + + "go.uber.org/atomic" ) // ewmaRate tracks an exponentially weighted moving average of a per-second rate. type ewmaRate struct { - // Keep all 64bit atomically accessed variables at the top of this struct. - // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info. - newEvents int64 + newEvents atomic.Int64 alpha float64 interval time.Duration @@ -50,7 +49,7 @@ func (r *ewmaRate) rate() float64 { // tick assumes to be called every r.interval. func (r *ewmaRate) tick() { - newEvents := atomic.SwapInt64(&r.newEvents, 0) + newEvents := r.newEvents.Swap(0) instantRate := float64(newEvents) / r.interval.Seconds() r.mutex.Lock() @@ -66,5 +65,5 @@ func (r *ewmaRate) tick() { // inc counts one event. func (r *ewmaRate) incr(incr int64) { - atomic.AddInt64(&r.newEvents, incr) + r.newEvents.Add(incr) } diff --git a/storage/remote/intern.go b/storage/remote/intern.go index 9ae6a569c9..3dffaeacbd 100644 --- a/storage/remote/intern.go +++ b/storage/remote/intern.go @@ -20,7 +20,8 @@ package remote import ( "sync" - "sync/atomic" + + "go.uber.org/atomic" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -40,13 +41,15 @@ type pool struct { } type entry struct { - // Keep all 64bit atomically accessed variables at the top of this struct. - // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info. - refs int64 + refs atomic.Int64 s string } +func newEntry(s string) *entry { + return &entry{s: s} +} + func newPool() *pool { return &pool{ pool: map[string]*entry{}, @@ -62,20 +65,18 @@ func (p *pool) intern(s string) string { interned, ok := p.pool[s] p.mtx.RUnlock() if ok { - atomic.AddInt64(&interned.refs, 1) + interned.refs.Inc() return interned.s } p.mtx.Lock() defer p.mtx.Unlock() if interned, ok := p.pool[s]; ok { - atomic.AddInt64(&interned.refs, 1) + interned.refs.Inc() return interned.s } - p.pool[s] = &entry{ - s: s, - refs: 1, - } + p.pool[s] = newEntry(s) + p.pool[s].refs.Store(1) return s } @@ -89,14 +90,14 @@ func (p *pool) release(s string) { return } - refs := atomic.AddInt64(&interned.refs, -1) + refs := interned.refs.Dec() if refs > 0 { return } p.mtx.Lock() defer p.mtx.Unlock() - if atomic.LoadInt64(&interned.refs) != 0 { + if interned.refs.Load() != 0 { return } delete(p.pool, s) diff --git a/storage/remote/intern_test.go b/storage/remote/intern_test.go index d77a560602..1124ef0db2 100644 --- a/storage/remote/intern_test.go +++ b/storage/remote/intern_test.go @@ -20,7 +20,6 @@ package remote import ( "fmt" - "sync/atomic" "testing" "time" @@ -33,7 +32,7 @@ func TestIntern(t *testing.T) { interned, ok := interner.pool[testString] testutil.Equals(t, true, ok) - testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) } func TestIntern_MultiRef(t *testing.T) { @@ -43,13 +42,13 @@ func TestIntern_MultiRef(t *testing.T) { interned, ok := interner.pool[testString] testutil.Equals(t, true, ok) - testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) interner.intern(testString) interned, ok = interner.pool[testString] testutil.Equals(t, true, ok) - testutil.Assert(t, interned.refs == 2, fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs)) + testutil.Assert(t, interned.refs.Load() == 2, fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs)) } func TestIntern_DeleteRef(t *testing.T) { @@ -59,7 +58,7 @@ func TestIntern_DeleteRef(t *testing.T) { interned, ok := interner.pool[testString] testutil.Equals(t, true, ok) - testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) interner.release(testString) _, ok = interner.pool[testString] @@ -72,7 +71,7 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) { interner.intern(testString) interned, ok := interner.pool[testString] testutil.Equals(t, true, ok) - testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) go interner.release(testString) @@ -84,5 +83,5 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) { interned, ok = interner.pool[testString] interner.mtx.RUnlock() testutil.Equals(t, true, ok) - testutil.Assert(t, atomic.LoadInt64(&interned.refs) == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 6c91e24104..cc6994eff3 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -18,7 +18,6 @@ import ( "math" "strconv" "sync" - "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -27,6 +26,7 @@ import ( "github.com/golang/snappy" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" + "go.uber.org/atomic" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" @@ -235,8 +235,7 @@ type WriteClient interface { // indicated by the provided WriteClient. Implements writeTo interface // used by WAL Watcher. type QueueManager struct { - // https://golang.org/pkg/sync/atomic/#pkg-note-BUG - lastSendTimestamp int64 + lastSendTimestamp atomic.Int64 logger log.Logger flushDeadline time.Duration @@ -537,7 +536,7 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool { // We shouldn't reshard if Prometheus hasn't been able to send to the // remote endpoint successfully within some period of time. minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() - lsts := atomic.LoadInt64(&t.lastSendTimestamp) + lsts := t.lastSendTimestamp.Load() if lsts < minSendTimestamp { level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp) return false @@ -663,7 +662,7 @@ type shards struct { // Emulate a wait group with a channel and an atomic int, as you // cannot select on a wait group. done chan struct{} - running int32 + running atomic.Int32 // Soft shutdown context will prevent new enqueues and deadlocks. softShutdown chan struct{} @@ -671,7 +670,7 @@ type shards struct { // Hard shutdown context is used to terminate outgoing HTTP connections // after giving them a chance to terminate. hardShutdown context.CancelFunc - droppedOnHardShutdown uint32 + droppedOnHardShutdown atomic.Uint32 } // start the shards; must be called before any call to enqueue. @@ -692,9 +691,9 @@ func (s *shards) start(n int) { var hardShutdownCtx context.Context hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background()) s.softShutdown = make(chan struct{}) - s.running = int32(n) + s.running.Store(int32(n)) s.done = make(chan struct{}) - atomic.StoreUint32(&s.droppedOnHardShutdown, 0) + s.droppedOnHardShutdown.Store(0) for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) } @@ -727,7 +726,7 @@ func (s *shards) stop() { // Force an unclean shutdown. s.hardShutdown() <-s.done - if dropped := atomic.LoadUint32(&s.droppedOnHardShutdown); dropped > 0 { + if dropped := s.droppedOnHardShutdown.Load(); dropped > 0 { level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped) } } @@ -756,7 +755,7 @@ func (s *shards) enqueue(ref uint64, sample sample) bool { func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { defer func() { - if atomic.AddInt32(&s.running, -1) == 0 { + if s.running.Dec() == 0 { close(s.done) } }() @@ -792,7 +791,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { droppedSamples := nPending + len(queue) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) - atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples)) + s.droppedOnHardShutdown.Add(uint32(droppedSamples)) return case sample, ok := <-queue: @@ -847,7 +846,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b // should be maintained irrespective of success or failure. s.qm.samplesOut.incr(int64(len(samples))) s.qm.samplesOutDuration.incr(int64(time.Since(begin))) - atomic.StoreInt64(&s.qm.lastSendTimestamp, time.Now().Unix()) + s.qm.lastSendTimestamp.Store(time.Now().Unix()) } // sendSamples to the remote storage with backoff for recoverable errors. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 59cf64e857..32941e9ed5 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -24,13 +24,13 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "testing" "time" "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "go.uber.org/atomic" "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -336,7 +336,7 @@ func TestShouldReshard(t *testing.T) { m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) m.samplesOut.incr(c.samplesOut) - m.lastSendTimestamp = c.lastSendTimestamp + m.lastSendTimestamp.Store(c.lastSendTimestamp) m.Start() @@ -497,7 +497,7 @@ func (c *TestWriteClient) Endpoint() string { // point the `numCalls` property will contain a count of how many times Store() // was called. type TestBlockingWriteClient struct { - numCalls uint64 + numCalls atomic.Uint64 } func NewTestBlockedWriteClient() *TestBlockingWriteClient { @@ -505,13 +505,13 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient { } func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error { - atomic.AddUint64(&c.numCalls, 1) + c.numCalls.Inc() <-ctx.Done() return nil } func (c *TestBlockingWriteClient) NumCalls() uint64 { - return atomic.LoadUint64(&c.numCalls) + return c.numCalls.Load() } func (c *TestBlockingWriteClient) Name() string { @@ -667,7 +667,7 @@ func TestCalculateDesiredShards(t *testing.T) { highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix())) - atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix()) + m.lastSendTimestamp.Store(time.Now().Unix()) } ts := time.Duration(0) diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index 0dd30b55d6..82e8de48dd 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -17,9 +17,10 @@ import ( "io/ioutil" "os" "strconv" - "sync/atomic" "testing" + "go.uber.org/atomic" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/util/testutil" ) @@ -51,11 +52,11 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { testutil.Ok(b, err) defer h.Close() - var count int64 + var count atomic.Int64 b.RunParallel(func(pb *testing.PB) { for pb.Next() { - i := atomic.AddInt64(&count, 1) + i := count.Inc() h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i)))) } }) diff --git a/web/web.go b/web/web.go index fbd4043692..9955facab8 100644 --- a/web/web.go +++ b/web/web.go @@ -34,7 +34,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" template_text "text/template" "time" @@ -51,9 +50,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/common/server" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/index" "github.com/soheilhy/cmux" + "go.uber.org/atomic" "golang.org/x/net/netutil" "google.golang.org/grpc" @@ -64,6 +62,8 @@ import ( "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/template" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/util/httputil" api_v1 "github.com/prometheus/prometheus/web/api/v1" api_v2 "github.com/prometheus/prometheus/web/api/v2" @@ -202,7 +202,7 @@ type Handler struct { mtx sync.RWMutex now func() model.Time - ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions. + ready atomic.Uint32 // ready is uint32 rather than boolean to be able to use atomic functions. } // ApplyConfig updates the config field of the Handler struct @@ -293,9 +293,8 @@ func New(logger log.Logger, o *Options) *Handler { notifier: o.Notifier, now: model.Now, - - ready: 0, } + h.ready.Store(0) factoryTr := func(_ context.Context) api_v1.TargetRetriever { return h.scrapeManager } factoryAr := func(_ context.Context) api_v1.AlertmanagerRetriever { return h.notifier } @@ -484,13 +483,12 @@ func serveDebug(w http.ResponseWriter, req *http.Request) { // Ready sets Handler to be ready. func (h *Handler) Ready() { - atomic.StoreUint32(&h.ready, 1) + h.ready.Store(1) } // Verifies whether the server is ready or not. func (h *Handler) isReady() bool { - ready := atomic.LoadUint32(&h.ready) - return ready > 0 + return h.ready.Load() > 0 } // Checks if server is ready, calls f if it is, returns 503 if it is not.