create timeSeriesStats to reduce return variable

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
pipiland2612 2025-10-31 22:17:45 +02:00
parent e1cb29bf8a
commit 9e6a626dae
2 changed files with 81 additions and 85 deletions

View File

@ -30,8 +30,8 @@ import (
"github.com/prometheus/common/promslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/config"
@ -2088,48 +2088,27 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda
}
}
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) ([]prompb.TimeSeries, *timeSeriesStats) {
stats := newTimeSeriesStats()
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0)
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
if len(ts.Samples) > 0 {
stats.updateTimestamp(ts.Samples[0].Timestamp)
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
if len(ts.Exemplars) > 0 {
stats.updateTimestamp(ts.Exemplars[0].Timestamp)
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
if len(ts.Histograms) > 0 {
stats.updateTimestamp(ts.Histograms[0].Timestamp)
}
// Get lowest timestamp
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
if i != keepIdx {
// We have to swap the kept timeseries with the one which should be dropped.
// Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries).
@ -2138,16 +2117,14 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
return timeSeries[:keepIdx], stats
}
func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (_ []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
timeSeries, stats := buildTimeSeries(timeSeries, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms)
}
req := &prompb.WriteRequest{
@ -2161,21 +2138,21 @@ func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, meta
pBuf.Reset()
}
if err := pBuf.Marshal(req); err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
compressed, err := compression.Encode(compr, pBuf.Bytes(), buf)
if err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
return compressed, highest, lowest, nil
return compressed, stats.highest, stats.lowest, nil
}
func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labels []string, pBuf *[]byte, filter func(writev2.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter)
timeSeries, stats := buildV2TimeSeries(samples, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
if stats.droppedSamples > 0 || stats.droppedExemplars > 0 || stats.droppedHistograms > 0 {
logger.Debug("dropped data due to their age", "droppedSamples", stats.droppedSamples, "droppedExemplars", stats.droppedExemplars, "droppedHistograms", stats.droppedHistograms)
}
req := &writev2.Request{
@ -2189,59 +2166,38 @@ func buildV2WriteRequest(logger *slog.Logger, samples []writev2.TimeSeries, labe
data, err := req.OptimizedMarshal(*pBuf)
if err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
*pBuf = data
compressed, err = compression.Encode(compr, *pBuf, buf)
if err != nil {
return nil, highest, lowest, err
return nil, stats.highest, stats.lowest, err
}
return compressed, highest, lowest, nil
return compressed, stats.highest, stats.lowest, nil
}
func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) (int64, int64, []writev2.TimeSeries, int, int, int) {
var highest int64
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) ([]writev2.TimeSeries, *timeSeriesStats) {
stats := newTimeSeriesStats()
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0)
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
if len(ts.Samples) > 0 {
stats.updateTimestamp(ts.Samples[0].Timestamp)
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest {
highest = ts.Exemplars[0].Timestamp
if len(ts.Exemplars) > 0 {
stats.updateTimestamp(ts.Exemplars[0].Timestamp)
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
if len(ts.Histograms) > 0 {
stats.updateTimestamp(ts.Histograms[0].Timestamp)
}
// Get the lowest timestamp.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
if i != keepIdx {
// We have to swap the kept timeseries with the one which should be dropped.
// Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries).
@ -2250,8 +2206,48 @@ func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.Time
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
return timeSeries[:keepIdx], stats
}
// timeSeriesStats tracks statistics during time series processing.
type timeSeriesStats struct {
highest int64
lowest int64
droppedSamples int
droppedExemplars int
droppedHistograms int
}
// newTimeSeriesStats creates a new timeSeriesStats with lowest initialized to MaxInt64.
func newTimeSeriesStats() *timeSeriesStats {
return &timeSeriesStats{
lowest: math.MaxInt64,
}
}
// updateTimestamp updates highest and lowest timestamps if the given timestamp is valid.
func (s *timeSeriesStats) updateTimestamp(timestamp int64) {
if timestamp > 0 {
if timestamp > s.highest {
s.highest = timestamp
}
if timestamp < s.lowest {
s.lowest = timestamp
}
}
}
// recordDropped increments the dropped counters based on what data exists.
func (s *timeSeriesStats) recordDropped(hasSamples, hasExemplars, hasHistograms bool) {
if hasSamples {
s.droppedSamples++
}
if hasExemplars {
s.droppedExemplars++
}
if hasHistograms {
s.droppedHistograms++
}
}
// sendBatchContext encapsulates the common parameters for sending batches.

View File

@ -2323,12 +2323,12 @@ func TestBuildTimeSeries(t *testing.T) {
// Run the test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
result, stats := buildTimeSeries(tc.ts, tc.filter)
require.NotNil(t, result)
require.Len(t, result, tc.responseLen)
require.Equal(t, tc.highestTs, highest)
require.Equal(t, tc.lowestTs, lowest)
require.Equal(t, tc.droppedSamples, droppedSamples)
require.Equal(t, tc.highestTs, stats.highest)
require.Equal(t, tc.lowestTs, stats.lowest)
require.Equal(t, tc.droppedSamples, stats.droppedSamples)
})
}
}
@ -2339,7 +2339,7 @@ func BenchmarkBuildTimeSeries(b *testing.B) {
filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) }
for i := 0; i < b.N; i++ {
samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...)
_, _, result, _, _, _ := buildTimeSeries(samples, filter)
result, _ := buildTimeSeries(samples, filter)
require.NotNil(b, result)
}
}