add timeSeriesAgeChecker to refactor filter code

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
pipiland2612 2025-10-31 23:19:53 +02:00
parent 9e6a626dae
commit 704afd8529

View File

@ -645,63 +645,78 @@ func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) boo
return sampleTs.Before(limitTs)
}
// timeSeriesAgeChecker encapsulates the logic for checking if time series data is too old.
type timeSeriesAgeChecker struct {
metrics *queueManagerMetrics
baseTime time.Time
sampleAgeLimit time.Duration
}
// checkAndRecordIfOld checks if a timestamp is too old and records the appropriate metric.
// Returns true if the data should be dropped.
func (c *timeSeriesAgeChecker) checkAndRecordIfOld(timestamp int64, dataType string) bool {
if c.sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
if !isSampleOld(c.baseTime, c.sampleAgeLimit, timestamp) {
return false
}
// Record the drop in metrics.
switch dataType {
case "sample":
c.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
case "histogram":
c.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
case "exemplar":
c.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
}
return true
}
func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool {
checker := &timeSeriesAgeChecker{
metrics: metrics,
baseTime: baseTime,
sampleAgeLimit: sampleAgeLimit,
}
return func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
switch {
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample")
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram")
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar")
default:
return false
}
return false
}
}
func isV2TimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts writev2.TimeSeries) bool {
checker := &timeSeriesAgeChecker{
metrics: metrics,
baseTime: baseTime,
sampleAgeLimit: sampleAgeLimit,
}
return func(ts writev2.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
switch {
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample")
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram")
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar")
default:
return false
}
return false
}
}