diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e3d6396a78..73a4896f19 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -645,63 +645,78 @@ func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) boo return sampleTs.Before(limitTs) } +// timeSeriesAgeChecker encapsulates the logic for checking if time series data is too old. +type timeSeriesAgeChecker struct { + metrics *queueManagerMetrics + baseTime time.Time + sampleAgeLimit time.Duration +} + +// checkAndRecordIfOld checks if a timestamp is too old and records the appropriate metric. +// Returns true if the data should be dropped. +func (c *timeSeriesAgeChecker) checkAndRecordIfOld(timestamp int64, dataType string) bool { + if c.sampleAgeLimit == 0 { + // If sampleAgeLimit is unset, then we never skip samples due to their age. + return false + } + + if !isSampleOld(c.baseTime, c.sampleAgeLimit, timestamp) { + return false + } + + // Record the drop in metrics. + switch dataType { + case "sample": + c.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() + case "histogram": + c.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + case "exemplar": + c.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() + } + return true +} + func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool { + checker := &timeSeriesAgeChecker{ + metrics: metrics, + baseTime: baseTime, + sampleAgeLimit: sampleAgeLimit, + } + return func(ts prompb.TimeSeries) bool { - if sampleAgeLimit == 0 { - // If sampleAgeLimit is unset, then we never skip samples due to their age. - return false - } - switch { // Only the first element should be set in the series, therefore we only check the first element. + switch { case len(ts.Samples) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) { - metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample") case len(ts.Histograms) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) { - metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram") case len(ts.Exemplars) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) { - metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar") default: return false } - return false } } func isV2TimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts writev2.TimeSeries) bool { + checker := &timeSeriesAgeChecker{ + metrics: metrics, + baseTime: baseTime, + sampleAgeLimit: sampleAgeLimit, + } + return func(ts writev2.TimeSeries) bool { - if sampleAgeLimit == 0 { - // If sampleAgeLimit is unset, then we never skip samples due to their age. - return false - } - switch { // Only the first element should be set in the series, therefore we only check the first element. + switch { case len(ts.Samples) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) { - metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample") case len(ts.Histograms) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) { - metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram") case len(ts.Exemplars) > 0: - if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) { - metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() - return true - } + return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar") default: return false } - return false } }