diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3ed27ccecd..dc2be46e9f 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1182,8 +1182,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { queue.ReturnForReuse(batch) n := nPendingSamples + nPendingExemplars s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) - s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1206,8 +1204,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { n := nPendingSamples + nPendingExemplars level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) - s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1253,6 +1249,12 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s s.qm.dataOut.incr(int64(len(samples))) s.qm.dataOutDuration.incr(int64(time.Since(begin))) s.qm.lastSendTimestamp.Store(time.Now().Unix()) + // Pending samples/exemplars also should be subtracted as an error means + // they will not be retried. + s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) + s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) + s.enqueuedSamples.Sub(int64(sampleCount)) + s.enqueuedExemplars.Sub(int64(exemplarCount)) } // sendSamples to the remote storage with backoff for recoverable errors.