diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 25d3a94b6a..73a4896f19 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "github.com/prometheus/prometheus/config" @@ -644,63 +645,78 @@ func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) boo return sampleTs.Before(limitTs) } +// timeSeriesAgeChecker encapsulates the logic for checking if time series data is too old. +type timeSeriesAgeChecker struct { + metrics *queueManagerMetrics + baseTime time.Time + sampleAgeLimit time.Duration +} + +// checkAndRecordIfOld checks if a timestamp is too old and records the appropriate metric. +// Returns true if the data should be dropped. +func (c *timeSeriesAgeChecker) checkAndRecordIfOld(timestamp int64, dataType string) bool { + if c.sampleAgeLimit == 0 { + // If sampleAgeLimit is unset, then we never skip samples due to their age. + return false + } + + if !isSampleOld(c.baseTime, c.sampleAgeLimit, timestamp) { + return false + } + + // Record the drop in metrics. + switch dataType { + case "sample": + c.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() + case "histogram": + c.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + case "exemplar": + c.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() + } + return true +} + func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool { + checker := &timeSeriesAgeChecker{ + metrics: metrics, + baseTime: baseTime, + sampleAgeLimit: sampleAgeLimit, + } + return func(ts prompb.TimeSeries) bool { - if sampleAgeLimit == 0 { - // If sampleAgeLimit is unset, then we never skip samples due to their age. - return false - } - switch { // Only the first element should be set in the series, therefore we only check the first element. + switch { case len(ts.Samples) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) { - metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample") case len(ts.Histograms) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) { - metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram") case len(ts.Exemplars) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) { - metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar") default: return false } - return false } } func isV2TimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts writev2.TimeSeries) bool { + checker := &timeSeriesAgeChecker{ + metrics: metrics, + baseTime: baseTime, + sampleAgeLimit: sampleAgeLimit, + } + return func(ts writev2.TimeSeries) bool { - if sampleAgeLimit == 0 { - // If sampleAgeLimit is unset, then we never skip samples due to their age. - return false - } - switch { // Only the first element should be set in the series, therefore we only check the first element. + switch { case len(ts.Samples) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) { - metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample") case len(ts.Histograms) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) { - metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram") case len(ts.Exemplars) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) { - metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar") default: return false } - return false } } @@ -1737,6 +1753,20 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } reqSize := len(req) + sc := sendBatchContext{ + ctx: ctx, + sampleCount: sampleCount, + exemplarCount: exemplarCount, + histogramCount: histogramCount, + metadataCount: metadataCount, + reqSize: reqSize, + } + + metricsUpdater := batchMetricsUpdater{ + metrics: s.qm.metrics, + storeClient: s.qm.storeClient, + sentDuration: s.qm.metrics.sentBatchDuration, + } // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need // to track the total amount of accepted data across the various attempts. @@ -1772,33 +1802,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti req = req2 } - ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") + ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try) defer span.End() - span.SetAttributes( - attribute.Int("request_size", reqSize), - attribute.Int("samples", sampleCount), - attribute.Int("try", try), - attribute.String("remote_name", s.qm.storeClient.Name()), - attribute.String("remote_url", s.qm.storeClient.Endpoint()), - ) - - if exemplarCount > 0 { - span.SetAttributes(attribute.Int("exemplars", exemplarCount)) - } - if histogramCount > 0 { - span.SetAttributes(attribute.Int("histograms", histogramCount)) - } - begin := time.Now() - s.qm.metrics.samplesTotal.Add(float64(sampleCount)) - s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) - s.qm.metrics.metadataTotal.Add(float64(metadataCount)) + metricsUpdater.recordBatchAttempt(sc, begin) // Technically for v1, we will likely have empty response stats, but for // newer Receivers this might be not, so used it in a best effort. rs, err := s.qm.client().Store(ctx, req, try) - s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error // so far we don't have those, so it's ok to potentially skew statistics. addStats(rs) @@ -1811,9 +1822,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } onRetry := func() { - s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) + metricsUpdater.recordRetry(sc) } err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry) @@ -1850,6 +1859,20 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 } reqSize := len(req) + sc := sendBatchContext{ + ctx: ctx, + sampleCount: sampleCount, + exemplarCount: exemplarCount, + histogramCount: histogramCount, + metadataCount: metadataCount, + reqSize: reqSize, + } + + metricsUpdater := batchMetricsUpdater{ + metrics: s.qm.metrics, + storeClient: s.qm.storeClient, + sentDuration: s.qm.metrics.sentBatchDuration, + } // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need // to track the total amount of accepted data across the various attempts. @@ -1885,31 +1908,12 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 req = req2 } - ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") + ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try) defer span.End() - span.SetAttributes( - attribute.Int("request_size", reqSize), - attribute.Int("samples", sampleCount), - attribute.Int("try", try), - attribute.String("remote_name", s.qm.storeClient.Name()), - attribute.String("remote_url", s.qm.storeClient.Endpoint()), - ) - - if exemplarCount > 0 { - span.SetAttributes(attribute.Int("exemplars", exemplarCount)) - } - if histogramCount > 0 { - span.SetAttributes(attribute.Int("histograms", histogramCount)) - } - begin := time.Now() - s.qm.metrics.samplesTotal.Add(float64(sampleCount)) - s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) - s.qm.metrics.metadataTotal.Add(float64(metadataCount)) + metricsUpdater.recordBatchAttempt(sc, begin) rs, err := s.qm.client().Store(ctx, req, try) - s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error // so far we don't have those, so it's ok to potentially skew statistics. addStats(rs) @@ -1933,9 +1937,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 } onRetry := func() { - s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) + metricsUpdater.recordRetry(sc) } err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry) @@ -2101,48 +2103,27 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda } } -func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) { - var highest int64 - var lowest int64 - var droppedSamples, droppedExemplars, droppedHistograms int - +func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) ([]prompb.TimeSeries, *timeSeriesStats) { + stats := newTimeSeriesStats() keepIdx := 0 - lowest = math.MaxInt64 + for i, ts := range timeSeries { if filter != nil && filter(ts) { - if len(ts.Samples) > 0 { - droppedSamples++ - } - if len(ts.Exemplars) > 0 { - droppedExemplars++ - } - if len(ts.Histograms) > 0 { - droppedHistograms++ - } + stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0) continue } // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { - highest = ts.Samples[0].Timestamp + if len(ts.Samples) > 0 { + stats.updateTimestamp(ts.Samples[0].Timestamp) } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { - highest = ts.Exemplars[0].Timestamp + if len(ts.Exemplars) > 0 { + stats.updateTimestamp(ts.Exemplars[0].Timestamp) } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { - highest = ts.Histograms[0].Timestamp + if len(ts.Histograms) > 0 { + stats.updateTimestamp(ts.Histograms[0].Timestamp) } - // Get lowest timestamp - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest { - lowest = ts.Samples[0].Timestamp - } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest { - lowest = ts.Exemplars[0].Timestamp - } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { - lowest = ts.Histograms[0].Timestamp - } if i != keepIdx { // We have to swap the kept timeseries with the one which should be dropped. // Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries). @@ -2151,16 +2132,14 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri keepIdx++ } - timeSeries = timeSeries[:keepIdx] - return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms + return timeSeries[:keepIdx], stats } func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (_ []byte, highest, lowest int64, _ error) { - highest, lowest, timeSeries, - droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) + timeSeries, stats := buildTimeSeries(timeSeries, filter) - if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { - logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) + if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 { + logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms) } req := &prompb.WriteRequest{ @@ -2174,21 +2153,21 @@ func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, meta pBuf.Reset() } if err := pBuf.Marshal(req); err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } compressed, err := compression.Encode(compr, pBuf.Bytes(), buf) if err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } - return compressed, highest, lowest, nil + return compressed, stats.highest, stats.lowest, nil } func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf *[]byte, filter func(writev2.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) { - highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter) + timeSeries, stats := buildV2TimeSeries(samples, filter) - if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { - logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) + if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 { + logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms) } req := &writev2.Request{ @@ -2202,59 +2181,38 @@ func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labe data, err := req.OptimizedMarshal(*pBuf) if err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } *pBuf = data compressed, err = compression.Encode(compr, *pBuf, buf) if err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } - return compressed, highest, lowest, nil + return compressed, stats.highest, stats.lowest, nil } -func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) (int64, int64, []writev2.TimeSeries, int, int, int) { - var highest int64 - var lowest int64 - var droppedSamples, droppedExemplars, droppedHistograms int - +func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) ([]writev2.TimeSeries, *timeSeriesStats) { + stats := newTimeSeriesStats() keepIdx := 0 - lowest = math.MaxInt64 + for i, ts := range timeSeries { if filter != nil && filter(ts) { - if len(ts.Samples) > 0 { - droppedSamples++ - } - if len(ts.Exemplars) > 0 { - droppedExemplars++ - } - if len(ts.Histograms) > 0 { - droppedHistograms++ - } + stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0) continue } // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { - highest = ts.Samples[0].Timestamp + if len(ts.Samples) > 0 { + stats.updateTimestamp(ts.Samples[0].Timestamp) } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { - highest = ts.Exemplars[0].Timestamp + if len(ts.Exemplars) > 0 { + stats.updateTimestamp(ts.Exemplars[0].Timestamp) } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { - highest = ts.Histograms[0].Timestamp + if len(ts.Histograms) > 0 { + stats.updateTimestamp(ts.Histograms[0].Timestamp) } - // Get the lowest timestamp. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest { - lowest = ts.Samples[0].Timestamp - } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest { - lowest = ts.Exemplars[0].Timestamp - } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { - lowest = ts.Histograms[0].Timestamp - } if i != keepIdx { // We have to swap the kept timeseries with the one which should be dropped. // Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries). @@ -2263,6 +2221,99 @@ func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.Time keepIdx++ } - timeSeries = timeSeries[:keepIdx] - return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms + return timeSeries[:keepIdx], stats +} + +// timeSeriesStats tracks statistics during time series processing. +type timeSeriesStats struct { + highest int64 + lowest int64 + droppedSamples int + droppedExemplars int + droppedHistograms int +} + +// newTimeSeriesStats creates a new timeSeriesStats with lowest initialized to MaxInt64. +func newTimeSeriesStats() *timeSeriesStats { + return &timeSeriesStats{ + lowest: math.MaxInt64, + } +} + +// updateTimestamp updates highest and lowest timestamps if the given timestamp is valid. +func (s *timeSeriesStats) updateTimestamp(timestamp int64) { + if timestamp > 0 { + if timestamp > s.highest { + s.highest = timestamp + } + if timestamp < s.lowest { + s.lowest = timestamp + } + } +} + +// recordDropped increments the dropped counters based on what data exists. +func (s *timeSeriesStats) recordDropped(hasSamples, hasExemplars, hasHistograms bool) { + if hasSamples { + s.droppedSamples++ + } + if hasExemplars { + s.droppedExemplars++ + } + if hasHistograms { + s.droppedHistograms++ + } +} + +// sendBatchContext encapsulates the common parameters for sending batches. +// This reduces the number of function arguments (addresses "too many arguments" issue). +type sendBatchContext struct { + ctx context.Context + sampleCount int + exemplarCount int + histogramCount int + metadataCount int + reqSize int +} + +// batchMetricsUpdater encapsulates metrics update operations for batch sending. +type batchMetricsUpdater struct { + metrics *queueManagerMetrics + storeClient WriteClient + sentDuration prometheus.Observer +} + +// recordBatchAttempt records metrics for a batch send attempt. +func (b *batchMetricsUpdater) recordBatchAttempt(sc sendBatchContext, begin time.Time) { + b.metrics.samplesTotal.Add(float64(sc.sampleCount)) + b.metrics.exemplarsTotal.Add(float64(sc.exemplarCount)) + b.metrics.histogramsTotal.Add(float64(sc.histogramCount)) + b.metrics.metadataTotal.Add(float64(sc.metadataCount)) + b.sentDuration.Observe(time.Since(begin).Seconds()) +} + +// recordRetry records retry metrics for a batch. +func (b *batchMetricsUpdater) recordRetry(sc sendBatchContext) { + b.metrics.retriedSamplesTotal.Add(float64(sc.sampleCount)) + b.metrics.retriedExemplarsTotal.Add(float64(sc.exemplarCount)) + b.metrics.retriedHistogramsTotal.Add(float64(sc.histogramCount)) +} + +// createSpan creates and configures an OpenTelemetry span for batch sending. +func createBatchSpan(ctx context.Context, sc sendBatchContext, remoteName, remoteURL string, try int) (context.Context, trace.Span) { + ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") + span.SetAttributes( + attribute.Int("request_size", sc.reqSize), + attribute.Int("samples", sc.sampleCount), + attribute.Int("try", try), + attribute.String("remote_name", remoteName), + attribute.String("remote_url", remoteURL), + ) + if sc.exemplarCount > 0 { + span.SetAttributes(attribute.Int("exemplars", sc.exemplarCount)) + } + if sc.histogramCount > 0 { + span.SetAttributes(attribute.Int("histograms", sc.histogramCount)) + } + return ctx, span } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index ce9cc6f1b6..704a5628d3 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -2351,12 +2351,12 @@ func TestBuildTimeSeries(t *testing.T) { // Run the test cases for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter) + result, stats := buildTimeSeries(tc.ts, tc.filter) require.NotNil(t, result) require.Len(t, result, tc.responseLen) - require.Equal(t, tc.highestTs, highest) - require.Equal(t, tc.lowestTs, lowest) - require.Equal(t, tc.droppedSamples, droppedSamples) + require.Equal(t, tc.highestTs, stats.highest) + require.Equal(t, tc.lowestTs, stats.lowest) + require.Equal(t, tc.droppedSamples, stats.droppedSamples) }) } } @@ -2367,7 +2367,7 @@ func BenchmarkBuildTimeSeries(b *testing.B) { filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) } for b.Loop() { samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...) - _, _, result, _, _, _ := buildTimeSeries(samples, filter) + result, _ := buildTimeSeries(samples, filter) require.NotNil(b, result) } }