Merge pull request #17441 from pipiland2612/refactor_queue_manger

Refactor part of queue_manger.go by creating struct to reuse some common function
This commit is contained in:
Bartlomiej Plotka 2025-11-13 15:07:11 +01:00 committed by GitHub
commit 675bafe2fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 219 additions and 168 deletions

View File

@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/config"
@ -644,63 +645,78 @@ func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) boo
return sampleTs.Before(limitTs)
}
// timeSeriesAgeChecker encapsulates the logic for checking if time series data is too old.
type timeSeriesAgeChecker struct {
metrics *queueManagerMetrics
baseTime time.Time
sampleAgeLimit time.Duration
}
// checkAndRecordIfOld checks if a timestamp is too old and records the appropriate metric.
// Returns true if the data should be dropped.
func (c *timeSeriesAgeChecker) checkAndRecordIfOld(timestamp int64, dataType string) bool {
if c.sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
if !isSampleOld(c.baseTime, c.sampleAgeLimit, timestamp) {
return false
}
// Record the drop in metrics.
switch dataType {
case "sample":
c.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
case "histogram":
c.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
case "exemplar":
c.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
}
return true
}
func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool {
checker := &timeSeriesAgeChecker{
metrics: metrics,
baseTime: baseTime,
sampleAgeLimit: sampleAgeLimit,
}
return func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
switch {
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample")
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram")
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar")
default:
return false
}
return false
}
}
func isV2TimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts writev2.TimeSeries) bool {
checker := &timeSeriesAgeChecker{
metrics: metrics,
baseTime: baseTime,
sampleAgeLimit: sampleAgeLimit,
}
return func(ts writev2.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
switch {
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Samples[0].Timestamp, "sample")
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Histograms[0].Timestamp, "histogram")
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
return checker.checkAndRecordIfOld(ts.Exemplars[0].Timestamp, "exemplar")
default:
return false
}
return false
}
}
@ -1737,6 +1753,20 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
}
reqSize := len(req)
sc := sendBatchContext{
ctx: ctx,
sampleCount: sampleCount,
exemplarCount: exemplarCount,
histogramCount: histogramCount,
metadataCount: metadataCount,
reqSize: reqSize,
}
metricsUpdater := batchMetricsUpdater{
metrics: s.qm.metrics,
storeClient: s.qm.storeClient,
sentDuration: s.qm.metrics.sentBatchDuration,
}
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
@ -1772,33 +1802,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
req = req2
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try)
defer span.End()
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
metricsUpdater.recordBatchAttempt(sc, begin)
// Technically for v1, we will likely have empty response stats, but for
// newer Receivers this might be not, so used it in a best effort.
rs, err := s.qm.client().Store(ctx, req, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
addStats(rs)
@ -1811,9 +1822,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
metricsUpdater.recordRetry(sc)
}
err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry)
@ -1850,6 +1859,20 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
}
reqSize := len(req)
sc := sendBatchContext{
ctx: ctx,
sampleCount: sampleCount,
exemplarCount: exemplarCount,
histogramCount: histogramCount,
metadataCount: metadataCount,
reqSize: reqSize,
}
metricsUpdater := batchMetricsUpdater{
metrics: s.qm.metrics,
storeClient: s.qm.storeClient,
sentDuration: s.qm.metrics.sentBatchDuration,
}
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
@ -1885,31 +1908,12 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
req = req2
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try)
defer span.End()
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
metricsUpdater.recordBatchAttempt(sc, begin)
rs, err := s.qm.client().Store(ctx, req, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
addStats(rs)
@ -1933,9 +1937,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
metricsUpdater.recordRetry(sc)
}
err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry)
@ -2101,48 +2103,27 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda
}
}
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) ([]prompb.TimeSeries, *timeSeriesStats) {
stats := newTimeSeriesStats()
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0)
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
if len(ts.Samples) > 0 {
stats.updateTimestamp(ts.Samples[0].Timestamp)
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
if len(ts.Exemplars) > 0 {
stats.updateTimestamp(ts.Exemplars[0].Timestamp)
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
if len(ts.Histograms) > 0 {
stats.updateTimestamp(ts.Histograms[0].Timestamp)
}
// Get lowest timestamp
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
if i != keepIdx {
// We have to swap the kept timeseries with the one which should be dropped.
// Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries).
@ -2151,16 +2132,14 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
return timeSeries[:keepIdx], stats
}
func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (_ []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
timeSeries, stats := buildTimeSeries(timeSeries, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms)
}
req := &prompb.WriteRequest{
@ -2174,21 +2153,21 @@ func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, meta
pBuf.Reset()
}
if err := pBuf.Marshal(req); err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
compressed, err := compression.Encode(compr, pBuf.Bytes(), buf)
if err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
return compressed, highest, lowest, nil
return compressed, stats.highest, stats.lowest, nil
}
func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf *[]byte, filter func(writev2.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter)
timeSeries, stats := buildV2TimeSeries(samples, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms)
}
req := &writev2.Request{
@ -2202,59 +2181,38 @@ func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labe
data, err := req.OptimizedMarshal(*pBuf)
if err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
*pBuf = data
compressed, err = compression.Encode(compr, *pBuf, buf)
if err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
return compressed, highest, lowest, nil
return compressed, stats.highest, stats.lowest, nil
}
func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) (int64, int64, []writev2.TimeSeries, int, int, int) {
var highest int64
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) ([]writev2.TimeSeries, *timeSeriesStats) {
stats := newTimeSeriesStats()
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0)
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
if len(ts.Samples) > 0 {
stats.updateTimestamp(ts.Samples[0].Timestamp)
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
if len(ts.Exemplars) > 0 {
stats.updateTimestamp(ts.Exemplars[0].Timestamp)
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
if len(ts.Histograms) > 0 {
stats.updateTimestamp(ts.Histograms[0].Timestamp)
}
// Get the lowest timestamp.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
if i != keepIdx {
// We have to swap the kept timeseries with the one which should be dropped.
// Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries).
@ -2263,6 +2221,99 @@ func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.Time
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
return timeSeries[:keepIdx], stats
}
// timeSeriesStats tracks statistics during time series processing.
type timeSeriesStats struct {
highest int64
lowest int64
droppedSamples int
droppedExemplars int
droppedHistograms int
}
// newTimeSeriesStats creates a new timeSeriesStats with lowest initialized to MaxInt64.
func newTimeSeriesStats() *timeSeriesStats {
return &timeSeriesStats{
lowest: math.MaxInt64,
}
}
// updateTimestamp updates highest and lowest timestamps if the given timestamp is valid.
func (s *timeSeriesStats) updateTimestamp(timestamp int64) {
if timestamp > 0 {
if timestamp > s.highest {
s.highest = timestamp
}
if timestamp < s.lowest {
s.lowest = timestamp
}
}
}
// recordDropped increments the dropped counters based on what data exists.
func (s *timeSeriesStats) recordDropped(hasSamples, hasExemplars, hasHistograms bool) {
if hasSamples {
s.droppedSamples++
}
if hasExemplars {
s.droppedExemplars++
}
if hasHistograms {
s.droppedHistograms++
}
}
// sendBatchContext encapsulates the common parameters for sending batches.
// This reduces the number of function arguments (addresses "too many arguments" issue).
type sendBatchContext struct {
ctx context.Context
sampleCount int
exemplarCount int
histogramCount int
metadataCount int
reqSize int
}
// batchMetricsUpdater encapsulates metrics update operations for batch sending.
type batchMetricsUpdater struct {
metrics *queueManagerMetrics
storeClient WriteClient
sentDuration prometheus.Observer
}
// recordBatchAttempt records metrics for a batch send attempt.
func (b *batchMetricsUpdater) recordBatchAttempt(sc sendBatchContext, begin time.Time) {
b.metrics.samplesTotal.Add(float64(sc.sampleCount))
b.metrics.exemplarsTotal.Add(float64(sc.exemplarCount))
b.metrics.histogramsTotal.Add(float64(sc.histogramCount))
b.metrics.metadataTotal.Add(float64(sc.metadataCount))
b.sentDuration.Observe(time.Since(begin).Seconds())
}
// recordRetry records retry metrics for a batch.
func (b *batchMetricsUpdater) recordRetry(sc sendBatchContext) {
b.metrics.retriedSamplesTotal.Add(float64(sc.sampleCount))
b.metrics.retriedExemplarsTotal.Add(float64(sc.exemplarCount))
b.metrics.retriedHistogramsTotal.Add(float64(sc.histogramCount))
}
// createSpan creates and configures an OpenTelemetry span for batch sending.
func createBatchSpan(ctx context.Context, sc sendBatchContext, remoteName, remoteURL string, try int) (context.Context, trace.Span) {
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
span.SetAttributes(
attribute.Int("request_size", sc.reqSize),
attribute.Int("samples", sc.sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", remoteName),
attribute.String("remote_url", remoteURL),
)
if sc.exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", sc.exemplarCount))
}
if sc.histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", sc.histogramCount))
}
return ctx, span
}

View File

@ -2351,12 +2351,12 @@ func TestBuildTimeSeries(t *testing.T) {
// Run the test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
result, stats := buildTimeSeries(tc.ts, tc.filter)
require.NotNil(t, result)
require.Len(t, result, tc.responseLen)
require.Equal(t, tc.highestTs, highest)
require.Equal(t, tc.lowestTs, lowest)
require.Equal(t, tc.droppedSamples, droppedSamples)
require.Equal(t, tc.highestTs, stats.highest)
require.Equal(t, tc.lowestTs, stats.lowest)
require.Equal(t, tc.droppedSamples, stats.droppedSamples)
})
}
}
@ -2367,7 +2367,7 @@ func BenchmarkBuildTimeSeries(b *testing.B) {
filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) }
for b.Loop() {
samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...)
_, _, result, _, _, _ := buildTimeSeries(samples, filter)
result, _ := buildTimeSeries(samples, filter)
require.NotNil(b, result)
}
}