diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go index de2c65962d..36f2213453 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go @@ -139,7 +139,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat ref = 0 } updateRefs := !exists || series.ct != ct - if updateRefs && ct != 0 && b.ingestCTZeroSample { + if updateRefs && ct != 0 && ct < t && b.ingestCTZeroSample { var newRef storage.SeriesRef if h != nil { newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil) @@ -147,10 +147,14 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct) } if err != nil { - if !errors.Is(err, storage.ErrOutOfOrderCT) { + if !errors.Is(err, storage.ErrOutOfOrderCT) && !errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { // Even for the first sample OOO is a common scenario because // we can't tell if a CT was already ingested in a previous request. // We ignore the error. + // ErrDuplicateSampleForTimestamp is also a common scenario because + // unknown start times in Opentelemetry are indicated by setting + // the start time to the same as the first sample time. + // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#cumulative-streams-handling-unknown-start-time b.logger.Warn("Error when appending CT from OTLP", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t, "sample_type", sampleType(h)) } } else { diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go index 5652d0b74a..669b75257e 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go @@ -14,6 +14,7 @@ package prometheusremotewrite import ( + "bytes" "context" "errors" "math" @@ -160,8 +161,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { testCases := map[string]struct { appendFunc func(*testing.T, CombinedAppender) + extraAppendFunc func(*testing.T, CombinedAppender) expectedSamples []sample expectedExemplars []exemplar.QueryResult + expectedLogsForCT []string }{ "single float sample, zero CT": { appendFunc: func(t *testing.T, app CombinedAppender) { @@ -185,6 +188,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { f: 42.0, }, }, + expectedLogsForCT: []string{ + "Error when appending CT from OTLP", + "out of bound", + }, }, "single float sample, normal CT": { appendFunc: func(t *testing.T, app CombinedAppender) { @@ -212,6 +219,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { }, }, }, + "two float samples in different messages, CT same time as first sample": { + appendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), 42.0, nil)) + }, + extraAppendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), 43.0, nil)) + }, + expectedSamples: []sample{ + { + t: now.UnixMilli(), + f: 42.0, + }, + { + t: now.Add(time.Second).UnixMilli(), + f: 43.0, + }, + }, + }, "single float sample, CT in the future of the sample": { appendFunc: func(t *testing.T, app CombinedAppender) { require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), 42.0, nil)) @@ -245,6 +270,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { h: tsdbutil.GenerateTestHistogram(42), }, }, + expectedLogsForCT: []string{ + "Error when appending CT from OTLP", + "out of bound", + }, }, "single histogram sample, normal CT": { appendFunc: func(t *testing.T, app CombinedAppender) { @@ -273,6 +302,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { }, }, }, + "two histogram samples in different messages, CT same time as first sample": { + appendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) + }, + extraAppendFunc: func(t *testing.T, app CombinedAppender) { + require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), tsdbutil.GenerateTestHistogram(43), nil)) + }, + expectedSamples: []sample{ + { + t: now.UnixMilli(), + h: tsdbutil.GenerateTestHistogram(42), + }, + { + t: now.Add(time.Second).UnixMilli(), + h: tsdbutil.GenerateTestHistogram(43), + }, + }, + }, "single histogram sample, CT in the future of the sample": { appendFunc: func(t *testing.T, app CombinedAppender) { require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) @@ -344,6 +391,11 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { + var expectedLogs []string + if ingestCTZeroSample { + expectedLogs = append(expectedLogs, tc.expectedLogsForCT...) + } + dir := t.TempDir() opts := tsdb.DefaultOptions() opts.EnableExemplarStorage = true @@ -354,15 +406,32 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) { t.Cleanup(func() { db.Close() }) + var output bytes.Buffer + logger := promslog.New(&promslog.Config{Writer: &output}) + ctx := context.Background() reg := prometheus.NewRegistry() + cappMetrics := NewCombinedAppenderMetrics(reg) app := db.Appender(ctx) - capp := NewCombinedAppender(app, promslog.NewNopLogger(), ingestCTZeroSample, NewCombinedAppenderMetrics(reg)) - + capp := NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics) tc.appendFunc(t, capp) - require.NoError(t, app.Commit()) + if tc.extraAppendFunc != nil { + app = db.Appender(ctx) + capp = NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics) + tc.extraAppendFunc(t, capp) + require.NoError(t, app.Commit()) + } + + if len(expectedLogs) > 0 { + for _, expectedLog := range expectedLogs { + require.Contains(t, output.String(), expectedLog) + } + } else { + require.Empty(t, output.String(), "unexpected log output") + } + q, err := db.Querier(int64(math.MinInt64), int64(math.MaxInt64)) require.NoError(t, err)