mirror of
https://github.com/prometheus/prometheus.git
synced 2025-09-21 05:41:01 +02:00
Merge pull request #17201 from prometheus/krajo/ignore-duplicate-ct
perf(otlp): reduce logs from OTLP endpoint
This commit is contained in:
commit
95b0d75fbc
@ -139,7 +139,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
|
|||||||
ref = 0
|
ref = 0
|
||||||
}
|
}
|
||||||
updateRefs := !exists || series.ct != ct
|
updateRefs := !exists || series.ct != ct
|
||||||
if updateRefs && ct != 0 && b.ingestCTZeroSample {
|
if updateRefs && ct != 0 && ct < t && b.ingestCTZeroSample {
|
||||||
var newRef storage.SeriesRef
|
var newRef storage.SeriesRef
|
||||||
if h != nil {
|
if h != nil {
|
||||||
newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
|
newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
|
||||||
@ -147,10 +147,14 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
|
|||||||
newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct)
|
newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, storage.ErrOutOfOrderCT) {
|
if !errors.Is(err, storage.ErrOutOfOrderCT) && !errors.Is(err, storage.ErrDuplicateSampleForTimestamp) {
|
||||||
// Even for the first sample OOO is a common scenario because
|
// Even for the first sample OOO is a common scenario because
|
||||||
// we can't tell if a CT was already ingested in a previous request.
|
// we can't tell if a CT was already ingested in a previous request.
|
||||||
// We ignore the error.
|
// We ignore the error.
|
||||||
|
// ErrDuplicateSampleForTimestamp is also a common scenario because
|
||||||
|
// unknown start times in Opentelemetry are indicated by setting
|
||||||
|
// the start time to the same as the first sample time.
|
||||||
|
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#cumulative-streams-handling-unknown-start-time
|
||||||
b.logger.Warn("Error when appending CT from OTLP", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t, "sample_type", sampleType(h))
|
b.logger.Warn("Error when appending CT from OTLP", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t, "sample_type", sampleType(h))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
package prometheusremotewrite
|
package prometheusremotewrite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
@ -160,8 +161,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
|
|
||||||
testCases := map[string]struct {
|
testCases := map[string]struct {
|
||||||
appendFunc func(*testing.T, CombinedAppender)
|
appendFunc func(*testing.T, CombinedAppender)
|
||||||
|
extraAppendFunc func(*testing.T, CombinedAppender)
|
||||||
expectedSamples []sample
|
expectedSamples []sample
|
||||||
expectedExemplars []exemplar.QueryResult
|
expectedExemplars []exemplar.QueryResult
|
||||||
|
expectedLogsForCT []string
|
||||||
}{
|
}{
|
||||||
"single float sample, zero CT": {
|
"single float sample, zero CT": {
|
||||||
appendFunc: func(t *testing.T, app CombinedAppender) {
|
appendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
@ -185,6 +188,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
f: 42.0,
|
f: 42.0,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedLogsForCT: []string{
|
||||||
|
"Error when appending CT from OTLP",
|
||||||
|
"out of bound",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"single float sample, normal CT": {
|
"single float sample, normal CT": {
|
||||||
appendFunc: func(t *testing.T, app CombinedAppender) {
|
appendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
@ -212,6 +219,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"two float samples in different messages, CT same time as first sample": {
|
||||||
|
appendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
|
require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), 42.0, nil))
|
||||||
|
},
|
||||||
|
extraAppendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
|
require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), 43.0, nil))
|
||||||
|
},
|
||||||
|
expectedSamples: []sample{
|
||||||
|
{
|
||||||
|
t: now.UnixMilli(),
|
||||||
|
f: 42.0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
t: now.Add(time.Second).UnixMilli(),
|
||||||
|
f: 43.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
"single float sample, CT in the future of the sample": {
|
"single float sample, CT in the future of the sample": {
|
||||||
appendFunc: func(t *testing.T, app CombinedAppender) {
|
appendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), 42.0, nil))
|
require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), 42.0, nil))
|
||||||
@ -245,6 +270,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
h: tsdbutil.GenerateTestHistogram(42),
|
h: tsdbutil.GenerateTestHistogram(42),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
expectedLogsForCT: []string{
|
||||||
|
"Error when appending CT from OTLP",
|
||||||
|
"out of bound",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"single histogram sample, normal CT": {
|
"single histogram sample, normal CT": {
|
||||||
appendFunc: func(t *testing.T, app CombinedAppender) {
|
appendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
@ -273,6 +302,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"two histogram samples in different messages, CT same time as first sample": {
|
||||||
|
appendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
|
require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil))
|
||||||
|
},
|
||||||
|
extraAppendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
|
require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), tsdbutil.GenerateTestHistogram(43), nil))
|
||||||
|
},
|
||||||
|
expectedSamples: []sample{
|
||||||
|
{
|
||||||
|
t: now.UnixMilli(),
|
||||||
|
h: tsdbutil.GenerateTestHistogram(42),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
t: now.Add(time.Second).UnixMilli(),
|
||||||
|
h: tsdbutil.GenerateTestHistogram(43),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
"single histogram sample, CT in the future of the sample": {
|
"single histogram sample, CT in the future of the sample": {
|
||||||
appendFunc: func(t *testing.T, app CombinedAppender) {
|
appendFunc: func(t *testing.T, app CombinedAppender) {
|
||||||
require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil))
|
require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil))
|
||||||
@ -344,6 +391,11 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
|
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
|
var expectedLogs []string
|
||||||
|
if ingestCTZeroSample {
|
||||||
|
expectedLogs = append(expectedLogs, tc.expectedLogsForCT...)
|
||||||
|
}
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
opts := tsdb.DefaultOptions()
|
opts := tsdb.DefaultOptions()
|
||||||
opts.EnableExemplarStorage = true
|
opts.EnableExemplarStorage = true
|
||||||
@ -354,15 +406,32 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
|
|
||||||
t.Cleanup(func() { db.Close() })
|
t.Cleanup(func() { db.Close() })
|
||||||
|
|
||||||
|
var output bytes.Buffer
|
||||||
|
logger := promslog.New(&promslog.Config{Writer: &output})
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
reg := prometheus.NewRegistry()
|
reg := prometheus.NewRegistry()
|
||||||
|
cappMetrics := NewCombinedAppenderMetrics(reg)
|
||||||
app := db.Appender(ctx)
|
app := db.Appender(ctx)
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), ingestCTZeroSample, NewCombinedAppenderMetrics(reg))
|
capp := NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics)
|
||||||
|
|
||||||
tc.appendFunc(t, capp)
|
tc.appendFunc(t, capp)
|
||||||
|
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
if tc.extraAppendFunc != nil {
|
||||||
|
app = db.Appender(ctx)
|
||||||
|
capp = NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics)
|
||||||
|
tc.extraAppendFunc(t, capp)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(expectedLogs) > 0 {
|
||||||
|
for _, expectedLog := range expectedLogs {
|
||||||
|
require.Contains(t, output.String(), expectedLog)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
require.Empty(t, output.String(), "unexpected log output")
|
||||||
|
}
|
||||||
|
|
||||||
q, err := db.Querier(int64(math.MinInt64), int64(math.MaxInt64))
|
q, err := db.Querier(int64(math.MinInt64), int64(math.MaxInt64))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user