From 9e6a626dae6d0477b875fdd94c8eaac37cd00797 Mon Sep 17 00:00:00 2001 From: pipiland2612 Date: Fri, 31 Oct 2025 22:17:45 +0200 Subject: [PATCH] create timeSeriesStats to reduce return variable Signed-off-by: pipiland2612 --- storage/remote/queue_manager.go | 156 +++++++++++++-------------- storage/remote/queue_manager_test.go | 10 +- 2 files changed, 81 insertions(+), 85 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a5c215ec2e..e3d6396a78 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -30,8 +30,8 @@ import ( "github.com/prometheus/common/promslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "github.com/prometheus/prometheus/config" @@ -2088,48 +2088,27 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda } } -func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) { - var highest int64 - var lowest int64 - var droppedSamples, droppedExemplars, droppedHistograms int - +func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) ([]prompb.TimeSeries, *timeSeriesStats) { + stats := newTimeSeriesStats() keepIdx := 0 - lowest = math.MaxInt64 + for i, ts := range timeSeries { if filter != nil && filter(ts) { - if len(ts.Samples) > 0 { - droppedSamples++ - } - if len(ts.Exemplars) > 0 { - droppedExemplars++ - } - if len(ts.Histograms) > 0 { - droppedHistograms++ - } + stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0) continue } // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { - highest = ts.Samples[0].Timestamp + if len(ts.Samples) > 0 { + stats.updateTimestamp(ts.Samples[0].Timestamp) } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { - highest = ts.Exemplars[0].Timestamp + if len(ts.Exemplars) > 0 { + stats.updateTimestamp(ts.Exemplars[0].Timestamp) } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { - highest = ts.Histograms[0].Timestamp + if len(ts.Histograms) > 0 { + stats.updateTimestamp(ts.Histograms[0].Timestamp) } - // Get lowest timestamp - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest { - lowest = ts.Samples[0].Timestamp - } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest { - lowest = ts.Exemplars[0].Timestamp - } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { - lowest = ts.Histograms[0].Timestamp - } if i != keepIdx { // We have to swap the kept timeseries with the one which should be dropped. // Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries). @@ -2138,16 +2117,14 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri keepIdx++ } - timeSeries = timeSeries[:keepIdx] - return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms + return timeSeries[:keepIdx], stats } func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (_ []byte, highest, lowest int64, _ error) { - highest, lowest, timeSeries, - droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) + timeSeries, stats := buildTimeSeries(timeSeries, filter) - if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { - logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) + if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 { + logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms) } req := &prompb.WriteRequest{ @@ -2161,21 +2138,21 @@ func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, meta pBuf.Reset() } if err := pBuf.Marshal(req); err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } compressed, err := compression.Encode(compr, pBuf.Bytes(), buf) if err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } - return compressed, highest, lowest, nil + return compressed, stats.highest, stats.lowest, nil } func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf *[]byte, filter func(writev2.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) { - highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter) + timeSeries, stats := buildV2TimeSeries(samples, filter) - if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { - logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) + if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 { + logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms) } req := &writev2.Request{ @@ -2189,59 +2166,38 @@ func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labe data, err := req.OptimizedMarshal(*pBuf) if err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } *pBuf = data compressed, err = compression.Encode(compr, *pBuf, buf) if err != nil { - return nil, highest, lowest, err + return nil, stats.highest, stats.lowest, err } - return compressed, highest, lowest, nil + return compressed, stats.highest, stats.lowest, nil } -func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) (int64, int64, []writev2.TimeSeries, int, int, int) { - var highest int64 - var lowest int64 - var droppedSamples, droppedExemplars, droppedHistograms int - +func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) ([]writev2.TimeSeries, *timeSeriesStats) { + stats := newTimeSeriesStats() keepIdx := 0 - lowest = math.MaxInt64 + for i, ts := range timeSeries { if filter != nil && filter(ts) { - if len(ts.Samples) > 0 { - droppedSamples++ - } - if len(ts.Exemplars) > 0 { - droppedExemplars++ - } - if len(ts.Histograms) > 0 { - droppedHistograms++ - } + stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0) continue } // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { - highest = ts.Samples[0].Timestamp + if len(ts.Samples) > 0 { + stats.updateTimestamp(ts.Samples[0].Timestamp) } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { - highest = ts.Exemplars[0].Timestamp + if len(ts.Exemplars) > 0 { + stats.updateTimestamp(ts.Exemplars[0].Timestamp) } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { - highest = ts.Histograms[0].Timestamp + if len(ts.Histograms) > 0 { + stats.updateTimestamp(ts.Histograms[0].Timestamp) } - // Get the lowest timestamp. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest { - lowest = ts.Samples[0].Timestamp - } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest { - lowest = ts.Exemplars[0].Timestamp - } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { - lowest = ts.Histograms[0].Timestamp - } if i != keepIdx { // We have to swap the kept timeseries with the one which should be dropped. // Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries). @@ -2250,8 +2206,48 @@ func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.Time keepIdx++ } - timeSeries = timeSeries[:keepIdx] - return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms + return timeSeries[:keepIdx], stats +} + +// timeSeriesStats tracks statistics during time series processing. +type timeSeriesStats struct { + highest int64 + lowest int64 + droppedSamples int + droppedExemplars int + droppedHistograms int +} + +// newTimeSeriesStats creates a new timeSeriesStats with lowest initialized to MaxInt64. +func newTimeSeriesStats() *timeSeriesStats { + return &timeSeriesStats{ + lowest: math.MaxInt64, + } +} + +// updateTimestamp updates highest and lowest timestamps if the given timestamp is valid. +func (s *timeSeriesStats) updateTimestamp(timestamp int64) { + if timestamp > 0 { + if timestamp > s.highest { + s.highest = timestamp + } + if timestamp < s.lowest { + s.lowest = timestamp + } + } +} + +// recordDropped increments the dropped counters based on what data exists. +func (s *timeSeriesStats) recordDropped(hasSamples, hasExemplars, hasHistograms bool) { + if hasSamples { + s.droppedSamples++ + } + if hasExemplars { + s.droppedExemplars++ + } + if hasHistograms { + s.droppedHistograms++ + } } // sendBatchContext encapsulates the common parameters for sending batches. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index bb72b7f998..08f1a141c7 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -2323,12 +2323,12 @@ func TestBuildTimeSeries(t *testing.T) { // Run the test cases for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter) + result, stats := buildTimeSeries(tc.ts, tc.filter) require.NotNil(t, result) require.Len(t, result, tc.responseLen) - require.Equal(t, tc.highestTs, highest) - require.Equal(t, tc.lowestTs, lowest) - require.Equal(t, tc.droppedSamples, droppedSamples) + require.Equal(t, tc.highestTs, stats.highest) + require.Equal(t, tc.lowestTs, stats.lowest) + require.Equal(t, tc.droppedSamples, stats.droppedSamples) }) } } @@ -2339,7 +2339,7 @@ func BenchmarkBuildTimeSeries(b *testing.B) { filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) } for i := 0; i < b.N; i++ { samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...) - _, _, result, _, _, _ := buildTimeSeries(samples, filter) + result, _ := buildTimeSeries(samples, filter) require.NotNil(b, result) } }