create common struct and function to DRY

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
pipiland2612 2025-10-31 21:55:14 +02:00
parent 9f93c2d2e1
commit e1cb29bf8a

View File

@ -30,6 +30,7 @@ import (
"github.com/prometheus/common/promslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.uber.org/atomic"
@ -1737,6 +1738,20 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
}
reqSize := len(req)
sc := sendBatchContext{
ctx: ctx,
sampleCount: sampleCount,
exemplarCount: exemplarCount,
histogramCount: histogramCount,
metadataCount: metadataCount,
reqSize: reqSize,
}
metricsUpdater := batchMetricsUpdater{
metrics: s.qm.metrics,
storeClient: s.qm.storeClient,
sentDuration: s.qm.metrics.sentBatchDuration,
}
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
@ -1772,33 +1787,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
req = req2
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try)
defer span.End()
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
metricsUpdater.recordBatchAttempt(sc, begin)
// Technically for v1, we will likely have empty response stats, but for
// newer Receivers this might be not, so used it in a best effort.
rs, err := s.qm.client().Store(ctx, req, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
addStats(rs)
@ -1811,9 +1807,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
metricsUpdater.recordRetry(sc)
}
err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry)
@ -1850,6 +1844,20 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
}
reqSize := len(req)
sc := sendBatchContext{
ctx: ctx,
sampleCount: sampleCount,
exemplarCount: exemplarCount,
histogramCount: histogramCount,
metadataCount: metadataCount,
reqSize: reqSize,
}
metricsUpdater := batchMetricsUpdater{
metrics: s.qm.metrics,
storeClient: s.qm.storeClient,
sentDuration: s.qm.metrics.sentBatchDuration,
}
// Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
// to track the total amount of accepted data across the various attempts.
@ -1885,31 +1893,12 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
req = req2
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try)
defer span.End()
span.SetAttributes(
attribute.Int("request_size", reqSize),
attribute.Int("samples", sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", s.qm.storeClient.Name()),
attribute.String("remote_url", s.qm.storeClient.Endpoint()),
)
if exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", exemplarCount))
}
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
metricsUpdater.recordBatchAttempt(sc, begin)
rs, err := s.qm.client().Store(ctx, req, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
// TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error
// so far we don't have those, so it's ok to potentially skew statistics.
addStats(rs)
@ -1933,9 +1922,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
metricsUpdater.recordRetry(sc)
}
err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry)
@ -2266,3 +2253,56 @@ func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.Time
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}
// sendBatchContext encapsulates the common parameters for sending batches.
// This reduces the number of function arguments (addresses "too many arguments" issue).
type sendBatchContext struct {
ctx context.Context
sampleCount int
exemplarCount int
histogramCount int
metadataCount int
reqSize int
}
// batchMetricsUpdater encapsulates metrics update operations for batch sending.
type batchMetricsUpdater struct {
metrics *queueManagerMetrics
storeClient WriteClient
sentDuration prometheus.Observer
}
// recordBatchAttempt records metrics for a batch send attempt.
func (b *batchMetricsUpdater) recordBatchAttempt(sc sendBatchContext, begin time.Time) {
b.metrics.samplesTotal.Add(float64(sc.sampleCount))
b.metrics.exemplarsTotal.Add(float64(sc.exemplarCount))
b.metrics.histogramsTotal.Add(float64(sc.histogramCount))
b.metrics.metadataTotal.Add(float64(sc.metadataCount))
b.sentDuration.Observe(time.Since(begin).Seconds())
}
// recordRetry records retry metrics for a batch.
func (b *batchMetricsUpdater) recordRetry(sc sendBatchContext) {
b.metrics.retriedSamplesTotal.Add(float64(sc.sampleCount))
b.metrics.retriedExemplarsTotal.Add(float64(sc.exemplarCount))
b.metrics.retriedHistogramsTotal.Add(float64(sc.histogramCount))
}
// createSpan creates and configures an OpenTelemetry span for batch sending.
func createBatchSpan(ctx context.Context, sc sendBatchContext, remoteName, remoteURL string, try int) (context.Context, trace.Span) {
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
span.SetAttributes(
attribute.Int("request_size", sc.reqSize),
attribute.Int("samples", sc.sampleCount),
attribute.Int("try", try),
attribute.String("remote_name", remoteName),
attribute.String("remote_url", remoteURL),
)
if sc.exemplarCount > 0 {
span.SetAttributes(attribute.Int("exemplars", sc.exemplarCount))
}
if sc.histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", sc.histogramCount))
}
return ctx, span
}