diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4b0c72cf60..98e4aa54c1 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -396,8 +396,10 @@ type WriteClient interface { // indicated by the provided WriteClient. Implements writeTo interface // used by WAL Watcher. type QueueManager struct { - lastSendTimestamp atomic.Int64 - buildRequestLimitTimestamp atomic.Int64 + lastSendTimestamp atomic.Int64 + buildRequestLimitTimestamp atomic.Int64 + reshardDisableStartTimestamp atomic.Int64 // Time that reshard was disabled. + reshardDisableEndTimestamp atomic.Int64 // Time that reshard is disabled until. logger log.Logger flushDeadline time.Duration @@ -574,7 +576,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p retry := func() { t.metrics.retriedMetadataTotal.Add(float64(len(metadata))) } - err = sendWriteRequestWithBackoff(ctx, t.cfg, t.logger, attemptStore, retry) + err = t.sendWriteRequestWithBackoff(ctx, attemptStore, retry) if err != nil { return err } @@ -1021,6 +1023,13 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool { level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp) return false } + if disableTimestamp := t.reshardDisableEndTimestamp.Load(); time.Now().Unix() < disableTimestamp { + disabledAt := time.Unix(t.reshardDisableStartTimestamp.Load(), 0) + disabledFor := time.Until(time.Unix(disableTimestamp, 0)) + + level.Warn(t.logger).Log("msg", "Skipping resharding, resharding is disabled while waiting for recoverable errors", "disabled_at", disabledAt, "disabled_for", disabledFor) + return false + } return true } @@ -1622,7 +1631,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) } - err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) + err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry) if errors.Is(err, context.Canceled) { // When there is resharding, we cancel the context for this queue, which means the data is not sent. // So we exit early to not update the metrics. @@ -1635,8 +1644,8 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti return err } -func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { - backoff := cfg.MinBackoff +func (t *QueueManager) sendWriteRequestWithBackoff(ctx context.Context, attempt func(int) error, onRetry func()) error { + backoff := t.cfg.MinBackoff sleepDuration := model.Duration(0) try := 0 @@ -1663,9 +1672,26 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l switch { case backoffErr.retryAfter > 0: sleepDuration = backoffErr.retryAfter - level.Info(l).Log("msg", "Retrying after duration specified by Retry-After header", "duration", sleepDuration) + level.Info(t.logger).Log("msg", "Retrying after duration specified by Retry-After header", "duration", sleepDuration) case backoffErr.retryAfter < 0: - level.Debug(l).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism") + level.Debug(t.logger).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism") + } + + // We should never reshard for a recoverable error; increasing shards could + // make the problem worse, particularly if we're getting rate limited. + // + // reshardDisableTimestamp holds the unix timestamp until which resharding + // is diableld. We'll update that timestamp if the period we were just told + // to sleep for is newer than the existing disabled timestamp. + reshardWaitPeriod := time.Now().Add(time.Duration(sleepDuration) * 2) + if oldTS, updated := setAtomicToNewer(&t.reshardDisableEndTimestamp, reshardWaitPeriod.Unix()); updated { + // If the old timestamp was in the past, then resharding was previously + // enabled. We want to track the time where it initially got disabled for + // logging purposes. + disableTime := time.Now().Unix() + if oldTS < disableTime { + t.reshardDisableStartTimestamp.Store(disableTime) + } } select { @@ -1675,18 +1701,38 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l // If we make it this far, we've encountered a recoverable error and will retry. onRetry() - level.Warn(l).Log("msg", "Failed to send batch, retrying", "err", err) + level.Warn(t.logger).Log("msg", "Failed to send batch, retrying", "err", err) backoff = sleepDuration * 2 - if backoff > cfg.MaxBackoff { - backoff = cfg.MaxBackoff + if backoff > t.cfg.MaxBackoff { + backoff = t.cfg.MaxBackoff } try++ } } +// setAtomicToNewer atomically sets a value to the newer int64 between itself +// and the provided newValue argument. setAtomicToNewer returns whether the +// atomic value was updated and what the previous value was. +func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, updated bool) { + for { + current := value.Load() + if current >= newValue { + // If the current stored value is newer than newValue; abort. + return current, false + } + + // Try to swap the value. If the atomic value has changed, we loop back to + // the beginning until we've successfully swapped out the value or the + // value stored in it is newer than newValue. + if value.CompareAndSwap(current, newValue) { + return current, true + } + } +} + func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) { var highest int64 var lowest int64 diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e9de8beba4..028120c05c 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -520,6 +520,69 @@ func TestShouldReshard(t *testing.T) { } } +// TestDisableReshardOnRetry asserts that resharding should be disabled when a +// recoverable error is returned from remote_write. +func TestDisableReshardOnRetry(t *testing.T) { + onStoredContext, onStoreCalled := context.WithCancel(context.Background()) + defer onStoreCalled() + + var ( + fakeSamples, fakeSeries = createTimeseries(100, 100) + + cfg = config.DefaultQueueConfig + mcfg = config.DefaultMetadataConfig + retryAfter = time.Second + + metrics = newQueueManagerMetrics(nil, "", "") + + client = &MockWriteClient{ + StoreFunc: func(ctx context.Context, b []byte, i int) error { + onStoreCalled() + + return RecoverableError{ + error: fmt.Errorf("fake error"), + retryAfter: model.Duration(retryAfter), + } + }, + NameFunc: func() string { return "mock" }, + EndpointFunc: func() string { return "http://fake:9090/api/v1/write" }, + } + ) + + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false) + m.StoreSeries(fakeSeries, 0) + + // Attempt to samples while the manager is running. We immediately stop the + // manager after the recoverable error is generated to prevent the manager + // from resharding itself. + m.Start() + { + m.Append(fakeSamples) + + select { + case <-onStoredContext.Done(): + case <-time.After(time.Minute): + require.FailNow(t, "timed out waiting for client to be sent metrics") + } + } + m.Stop() + + require.Eventually(t, func() bool { + // Force m.lastSendTimestamp to be current so the last send timestamp isn't + // the reason resharding is disabled. + m.lastSendTimestamp.Store(time.Now().Unix()) + return m.shouldReshard(m.numShards+1) == false + }, time.Minute, 10*time.Millisecond, "shouldReshard was never disabled") + + // After 2x retryAfter, resharding should be enabled again. + require.Eventually(t, func() bool { + // Force m.lastSendTimestamp to be current so the last send timestamp isn't + // the reason resharding is disabled. + m.lastSendTimestamp.Store(time.Now().Unix()) + return m.shouldReshard(m.numShards+1) == true + }, time.Minute, retryAfter, "shouldReshard should have been re-enabled") +} + func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) { samples := make([]record.RefSample, 0, numSamples) series := make([]record.RefSeries, 0, numSeries) @@ -844,6 +907,18 @@ func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil func (c *NopWriteClient) Name() string { return "nopwriteclient" } func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } +type MockWriteClient struct { + StoreFunc func(context.Context, []byte, int) error + NameFunc func() string + EndpointFunc func() string +} + +func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) error { + return c.StoreFunc(ctx, bb, n) +} +func (c *MockWriteClient) Name() string { return c.NameFunc() } +func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() } + // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. var extraLabels []labels.Label = []labels.Label{ {Name: "kubernetes_io_arch", Value: "amd64"},