From 0cf54d7819eb326fef7661e31aa8ea54f7a0ebf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Wed, 17 Sep 2025 08:41:06 +0200 Subject: [PATCH] perf(otlp): reduce logs from OTLP endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It's not possible to store created timestamp at the same timestamp as the current sample, so do not even try. In OpenTelemetry spec, if the start time is unknown, it will be set to the same timestamp as the first sample. https://opentelemetry.io/docs/specs/otel/metrics/data-model/#cumulative-streams-handling-unknown-start-time This means that we will get a lot of duplicate sample for timestamp errors and we should not log those. Signed-off-by: György Krajcsovits --- .../combined_appender.go | 8 +- .../combined_appender_test.go | 75 ++++++++++++++++++- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go index de2c65962d..36f2213453 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go @@ -139,7 +139,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat ref = 0 } updateRefs := !exists || series.ct != ct - if updateRefs && ct != 0 && b.ingestCTZeroSample { + if updateRefs && ct != 0 && ct < t && b.ingestCTZeroSample { var newRef storage.SeriesRef if h != nil { newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil) @@ -147,10 +147,14 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct) } if err != nil { - if !errors.Is(err, storage.ErrOutOfOrderCT) { + if !errors.Is(err, storage.ErrOutOfOrderCT) && !errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { // Even for the first sample OOO is a common scenario because // we can't tell if a CT was already ingested in a previous request. // We ignore the error. + // ErrDuplicateSampleForTimestamp is also a common scenario because + // unknown start times in Opentelemetry are indicated by setting + // the start time to the same as the first sample time. + // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#cumulative-streams-handling-unknown-start-time b.logger.Warn("Error when appending CT from OTLP", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t, "sample_type", sampleType(h)) } } else { diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go index 5652d0b74a..669b75257e 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go @@ -14,6 +14,7 @@ package prometheusremotewrite import ( + "bytes" "context" "errors" "math" @@ -160,8 +161,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { testCases := map[string]struct { appendFunc func(*testing.T, CombinedAppender) + extraAppendFunc func(*testing.T, CombinedAppender) expectedSamples []sample expectedExemplars []exemplar.QueryResult + expectedLogsForCT []string }{ "single float sample, zero CT": { appendFunc: func(t *testing.T, app CombinedAppender) { @@ -185,6 +188,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { f: 42.0, }, }, + expectedLogsForCT: []string{ + "Error when appending CT from OTLP", + "out of bound", + }, }, "single float sample, normal CT": { appendFunc: func(t *testing.T, app CombinedAppender) { @@ -212,6 +219,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { }, }, }, + "two float samples in different messages, CT same time as first sample": { + appendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), 42.0, nil)) + }, + extraAppendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), 43.0, nil)) + }, + expectedSamples: []sample{ + { + t: now.UnixMilli(), + f: 42.0, + }, + { + t: now.Add(time.Second).UnixMilli(), + f: 43.0, + }, + }, + }, "single float sample, CT in the future of the sample": { appendFunc: func(t *testing.T, app CombinedAppender) { require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), 42.0, nil)) @@ -245,6 +270,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { h: tsdbutil.GenerateTestHistogram(42), }, }, + expectedLogsForCT: []string{ + "Error when appending CT from OTLP", + "out of bound", + }, }, "single histogram sample, normal CT": { appendFunc: func(t *testing.T, app CombinedAppender) { @@ -273,6 +302,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { }, }, }, + "two histogram samples in different messages, CT same time as first sample": { + appendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) + }, + extraAppendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), tsdbutil.GenerateTestHistogram(43), nil)) + }, + expectedSamples: []sample{ + { + t: now.UnixMilli(), + h: tsdbutil.GenerateTestHistogram(42), + }, + { + t: now.Add(time.Second).UnixMilli(), + h: tsdbutil.GenerateTestHistogram(43), + }, + }, + }, "single histogram sample, CT in the future of the sample": { appendFunc: func(t *testing.T, app CombinedAppender) { require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) @@ -344,6 +391,11 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { + var expectedLogs []string + if ingestCTZeroSample { + expectedLogs = append(expectedLogs, tc.expectedLogsForCT...) + } + dir := t.TempDir() opts := tsdb.DefaultOptions() opts.EnableExemplarStorage = true @@ -354,15 +406,32 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { t.Cleanup(func() { db.Close() }) + var output bytes.Buffer + logger := promslog.New(&promslog.Config{Writer: &output}) + ctx := context.Background() reg := prometheus.NewRegistry() + cappMetrics := NewCombinedAppenderMetrics(reg) app := db.Appender(ctx) - capp := NewCombinedAppender(app, promslog.NewNopLogger(), ingestCTZeroSample, NewCombinedAppenderMetrics(reg)) - + capp := NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics) tc.appendFunc(t, capp) - require.NoError(t, app.Commit()) + if tc.extraAppendFunc != nil { + app = db.Appender(ctx) + capp = NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics) + tc.extraAppendFunc(t, capp) + require.NoError(t, app.Commit()) + } + + if len(expectedLogs) > 0 { + for _, expectedLog := range expectedLogs { + require.Contains(t, output.String(), expectedLog) + } + } else { + require.Empty(t, output.String(), "unexpected log output") + } + q, err := db.Querier(int64(math.MinInt64), int64(math.MaxInt64)) require.NoError(t, err)