mirror of
https://github.com/prometheus/prometheus.git
synced 2025-12-04 00:51:02 +01:00
OTLP Receiver: Only update metadata to WAL when metadata-wal-records feature is enabled (#17472)
OTLP Receiver: Only update metadata to WAL when metadata-wal-records feature is enabled. --------- Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
parent
f5d1cb48ca
commit
7ebff91cfd
@ -82,11 +82,12 @@ func NewCombinedAppenderMetrics(reg prometheus.Registerer) CombinedAppenderMetri
|
|||||||
// NewCombinedAppender creates a combined appender that sets start times and
|
// NewCombinedAppender creates a combined appender that sets start times and
|
||||||
// updates metadata for each series only once, and appends samples and
|
// updates metadata for each series only once, and appends samples and
|
||||||
// exemplars for each call.
|
// exemplars for each call.
|
||||||
func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample bool, metrics CombinedAppenderMetrics) CombinedAppender {
|
func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample, appendMetadata bool, metrics CombinedAppenderMetrics) CombinedAppender {
|
||||||
return &combinedAppender{
|
return &combinedAppender{
|
||||||
app: app,
|
app: app,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
ingestCTZeroSample: ingestCTZeroSample,
|
ingestCTZeroSample: ingestCTZeroSample,
|
||||||
|
appendMetadata: appendMetadata,
|
||||||
refs: make(map[uint64]seriesRef),
|
refs: make(map[uint64]seriesRef),
|
||||||
samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata,
|
samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata,
|
||||||
outOfOrderExemplars: metrics.outOfOrderExemplars,
|
outOfOrderExemplars: metrics.outOfOrderExemplars,
|
||||||
@ -106,6 +107,7 @@ type combinedAppender struct {
|
|||||||
samplesAppendedWithoutMetadata prometheus.Counter
|
samplesAppendedWithoutMetadata prometheus.Counter
|
||||||
outOfOrderExemplars prometheus.Counter
|
outOfOrderExemplars prometheus.Counter
|
||||||
ingestCTZeroSample bool
|
ingestCTZeroSample bool
|
||||||
|
appendMetadata bool
|
||||||
// Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs.
|
// Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs.
|
||||||
// To detect hash collision it also stores the labels.
|
// To detect hash collision it also stores the labels.
|
||||||
// There is no overflow/conflict list, the TSDB will handle that part.
|
// There is no overflow/conflict list, the TSDB will handle that part.
|
||||||
@ -189,17 +191,10 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !exists || series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit {
|
metadataChanged := exists && (series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit)
|
||||||
updateRefs = true
|
|
||||||
// If this is the first time we see this series, set the metadata.
|
|
||||||
_, err := b.app.UpdateMetadata(ref, ls, meta)
|
|
||||||
if err != nil {
|
|
||||||
b.samplesAppendedWithoutMetadata.Add(1)
|
|
||||||
b.logger.Warn("Error while updating metadata from OTLP", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if updateRefs {
|
// Update cache if references changed or metadata changed.
|
||||||
|
if updateRefs || metadataChanged {
|
||||||
b.refs[hash] = seriesRef{
|
b.refs[hash] = seriesRef{
|
||||||
ref: ref,
|
ref: ref,
|
||||||
ct: ct,
|
ct: ct,
|
||||||
@ -208,6 +203,17 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update metadata in storage if enabled and needed.
|
||||||
|
if b.appendMetadata && (!exists || metadataChanged) {
|
||||||
|
// Only update metadata in WAL if the metadata-wal-records feature is enabled.
|
||||||
|
// Without this feature, metadata is not persisted to WAL.
|
||||||
|
_, err := b.app.UpdateMetadata(ref, ls, meta)
|
||||||
|
if err != nil {
|
||||||
|
b.samplesAppendedWithoutMetadata.Add(1)
|
||||||
|
b.logger.Warn("Error while updating metadata from OTLP", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
b.appendExemplars(ref, ls, es)
|
b.appendExemplars(ref, ls, es)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|||||||
@ -17,6 +17,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -412,13 +413,13 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
|
|||||||
reg := prometheus.NewRegistry()
|
reg := prometheus.NewRegistry()
|
||||||
cappMetrics := NewCombinedAppenderMetrics(reg)
|
cappMetrics := NewCombinedAppenderMetrics(reg)
|
||||||
app := db.Appender(ctx)
|
app := db.Appender(ctx)
|
||||||
capp := NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics)
|
capp := NewCombinedAppender(app, logger, ingestCTZeroSample, false, cappMetrics)
|
||||||
tc.appendFunc(t, capp)
|
tc.appendFunc(t, capp)
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
if tc.extraAppendFunc != nil {
|
if tc.extraAppendFunc != nil {
|
||||||
app = db.Appender(ctx)
|
app = db.Appender(ctx)
|
||||||
capp = NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics)
|
capp = NewCombinedAppender(app, logger, ingestCTZeroSample, false, cappMetrics)
|
||||||
tc.extraAppendFunc(t, capp)
|
tc.extraAppendFunc(t, capp)
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
}
|
}
|
||||||
@ -501,7 +502,7 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("happy case with CT zero, reference is passed and reused", func(t *testing.T) {
|
t.Run("happy case with CT zero, reference is passed and reused", func(t *testing.T) {
|
||||||
app := &appenderRecorder{}
|
app := &appenderRecorder{}
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
||||||
|
|
||||||
@ -512,109 +513,38 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
require.Len(t, app.records, 6)
|
require.Len(t, app.records, 5)
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
||||||
ref := app.records[0].outRef
|
ref := app.records[0].outRef
|
||||||
require.NotZero(t, ref)
|
require.NotZero(t, ref)
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2])
|
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[2])
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3])
|
requireEqualOpAndRef(t, "Append", ref, app.records[3])
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[4])
|
requireEqualOpAndRef(t, "AppendExemplar", ref, app.records[4])
|
||||||
requireEqualOpAndRef(t, "AppendExemplar", ref, app.records[5])
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("error on second CT ingest doesn't update the reference", func(t *testing.T) {
|
t.Run("error on second CT ingest doesn't update the reference", func(t *testing.T) {
|
||||||
app := &appenderRecorder{}
|
app := &appenderRecorder{}
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
||||||
|
|
||||||
app.appendCTZeroSampleError = errors.New("test error")
|
app.appendCTZeroSampleError = errors.New("test error")
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 3, 4, 62.0, nil))
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 3, 4, 62.0, nil))
|
||||||
|
|
||||||
require.Len(t, app.records, 5)
|
require.Len(t, app.records, 4)
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
||||||
ref := app.records[0].outRef
|
ref := app.records[0].outRef
|
||||||
require.NotZero(t, ref)
|
require.NotZero(t, ref)
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2])
|
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[2])
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3])
|
require.Zero(t, app.records[2].outRef, "the second AppendCTZeroSample returned 0")
|
||||||
require.Zero(t, app.records[3].outRef, "the second AppendCTZeroSample returned 0")
|
requireEqualOpAndRef(t, "Append", ref, app.records[3])
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[4])
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("updateMetadata called when meta help changes", func(t *testing.T) {
|
|
||||||
app := &appenderRecorder{}
|
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
|
||||||
|
|
||||||
newMetadata := floatMetadata
|
|
||||||
newMetadata.Help = "some other help"
|
|
||||||
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil))
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil))
|
|
||||||
|
|
||||||
require.Len(t, app.records, 7)
|
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
|
||||||
ref := app.records[0].outRef
|
|
||||||
require.NotZero(t, ref)
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2])
|
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3])
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[4])
|
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5])
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[6])
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("updateMetadata called when meta unit changes", func(t *testing.T) {
|
|
||||||
app := &appenderRecorder{}
|
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
|
||||||
|
|
||||||
newMetadata := floatMetadata
|
|
||||||
newMetadata.Unit = "seconds"
|
|
||||||
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil))
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil))
|
|
||||||
|
|
||||||
require.Len(t, app.records, 7)
|
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
|
||||||
ref := app.records[0].outRef
|
|
||||||
require.NotZero(t, ref)
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2])
|
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3])
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[4])
|
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5])
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[6])
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("updateMetadata called when meta type changes", func(t *testing.T) {
|
|
||||||
app := &appenderRecorder{}
|
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
|
||||||
|
|
||||||
newMetadata := floatMetadata
|
|
||||||
newMetadata.Type = model.MetricTypeGauge
|
|
||||||
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil))
|
|
||||||
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil))
|
|
||||||
|
|
||||||
require.Len(t, app.records, 7)
|
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
|
||||||
ref := app.records[0].outRef
|
|
||||||
require.NotZero(t, ref)
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2])
|
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3])
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[4])
|
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5])
|
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[6])
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("metadata, exemplars are not updated if append failed", func(t *testing.T) {
|
t.Run("metadata, exemplars are not updated if append failed", func(t *testing.T) {
|
||||||
app := &appenderRecorder{}
|
app := &appenderRecorder{}
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
app.appendError = errors.New("test error")
|
app.appendError = errors.New("test error")
|
||||||
require.Error(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 0, 1, 42.0, []exemplar.Exemplar{
|
require.Error(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 0, 1, 42.0, []exemplar.Exemplar{
|
||||||
{
|
{
|
||||||
@ -632,7 +562,7 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("metadata, exemplars are updated if append failed but reference is valid", func(t *testing.T) {
|
t.Run("metadata, exemplars are updated if append failed but reference is valid", func(t *testing.T) {
|
||||||
app := &appenderRecorder{}
|
app := &appenderRecorder{}
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
newMetadata := floatMetadata
|
newMetadata := floatMetadata
|
||||||
newMetadata.Help = "some other help"
|
newMetadata.Help = "some other help"
|
||||||
@ -661,7 +591,7 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("simulate conflict with existing series", func(t *testing.T) {
|
t.Run("simulate conflict with existing series", func(t *testing.T) {
|
||||||
app := &appenderRecorder{}
|
app := &appenderRecorder{}
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
ls := labels.FromStrings(
|
ls := labels.FromStrings(
|
||||||
model.MetricNameLabel, "test_bytes_total",
|
model.MetricNameLabel, "test_bytes_total",
|
||||||
@ -688,23 +618,21 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
require.Len(t, app.records, 7)
|
require.Len(t, app.records, 5)
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
||||||
ref := app.records[0].outRef
|
ref := app.records[0].outRef
|
||||||
require.NotZero(t, ref)
|
require.NotZero(t, ref)
|
||||||
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2])
|
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[2])
|
||||||
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[3])
|
newRef := app.records[2].outRef
|
||||||
newRef := app.records[3].outRef
|
|
||||||
require.NotEqual(t, ref, newRef, "the second AppendCTZeroSample returned a different reference")
|
require.NotEqual(t, ref, newRef, "the second AppendCTZeroSample returned a different reference")
|
||||||
requireEqualOpAndRef(t, "Append", newRef, app.records[4])
|
requireEqualOpAndRef(t, "Append", newRef, app.records[3])
|
||||||
requireEqualOpAndRef(t, "UpdateMetadata", newRef, app.records[5])
|
requireEqualOpAndRef(t, "AppendExemplar", newRef, app.records[4])
|
||||||
requireEqualOpAndRef(t, "AppendExemplar", newRef, app.records[6])
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("check that invoking AppendHistogram returns an error for nil histogram", func(t *testing.T) {
|
t.Run("check that invoking AppendHistogram returns an error for nil histogram", func(t *testing.T) {
|
||||||
app := &appenderRecorder{}
|
app := &appenderRecorder{}
|
||||||
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
ls := labels.FromStrings(
|
ls := labels.FromStrings(
|
||||||
model.MetricNameLabel, "test_bytes_total",
|
model.MetricNameLabel, "test_bytes_total",
|
||||||
@ -713,6 +641,101 @@ func TestCombinedAppenderSeriesRefs(t *testing.T) {
|
|||||||
err := capp.AppendHistogram(ls, Metadata{}, 4, 2, nil, nil)
|
err := capp.AppendHistogram(ls, Metadata{}, 4, 2, nil, nil)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
for _, appendMetadata := range []bool{false, true} {
|
||||||
|
t.Run(fmt.Sprintf("appendMetadata=%t", appendMetadata), func(t *testing.T) {
|
||||||
|
app := &appenderRecorder{}
|
||||||
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, appendMetadata, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil))
|
||||||
|
|
||||||
|
if appendMetadata {
|
||||||
|
require.Len(t, app.records, 3)
|
||||||
|
requireEqualOp(t, "AppendCTZeroSample", app.records[0])
|
||||||
|
requireEqualOp(t, "Append", app.records[1])
|
||||||
|
requireEqualOp(t, "UpdateMetadata", app.records[2])
|
||||||
|
} else {
|
||||||
|
require.Len(t, app.records, 2)
|
||||||
|
requireEqualOp(t, "AppendCTZeroSample", app.records[0])
|
||||||
|
requireEqualOp(t, "Append", app.records[1])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCombinedAppenderMetadataChanges verifies that UpdateMetadata is called
|
||||||
|
// when metadata fields change (help, unit, or type).
|
||||||
|
func TestCombinedAppenderMetadataChanges(t *testing.T) {
|
||||||
|
seriesLabels := labels.FromStrings(
|
||||||
|
model.MetricNameLabel, "test_metric",
|
||||||
|
"foo", "bar",
|
||||||
|
)
|
||||||
|
|
||||||
|
baseMetadata := Metadata{
|
||||||
|
Metadata: metadata.Metadata{
|
||||||
|
Type: model.MetricTypeCounter,
|
||||||
|
Unit: "bytes",
|
||||||
|
Help: "original help",
|
||||||
|
},
|
||||||
|
MetricFamilyName: "test_metric",
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
modifyMetadata func(Metadata) Metadata
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "help changes",
|
||||||
|
modifyMetadata: func(m Metadata) Metadata {
|
||||||
|
m.Help = "new help text"
|
||||||
|
return m
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unit changes",
|
||||||
|
modifyMetadata: func(m Metadata) Metadata {
|
||||||
|
m.Unit = "seconds"
|
||||||
|
return m
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "type changes",
|
||||||
|
modifyMetadata: func(m Metadata) Metadata {
|
||||||
|
m.Type = model.MetricTypeGauge
|
||||||
|
return m
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
app := &appenderRecorder{}
|
||||||
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, true, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
|
newMetadata := tt.modifyMetadata(baseMetadata)
|
||||||
|
|
||||||
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), baseMetadata, 1, 2, 42.0, nil))
|
||||||
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil))
|
||||||
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil))
|
||||||
|
|
||||||
|
// Verify expected operations.
|
||||||
|
require.Len(t, app.records, 7)
|
||||||
|
requireEqualOpAndRef(t, "AppendCTZeroSample", 0, app.records[0])
|
||||||
|
ref := app.records[0].outRef
|
||||||
|
require.NotZero(t, ref)
|
||||||
|
requireEqualOpAndRef(t, "Append", ref, app.records[1])
|
||||||
|
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2])
|
||||||
|
requireEqualOpAndRef(t, "AppendCTZeroSample", ref, app.records[3])
|
||||||
|
requireEqualOpAndRef(t, "Append", ref, app.records[4])
|
||||||
|
requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5])
|
||||||
|
requireEqualOpAndRef(t, "Append", ref, app.records[6])
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireEqualOp(t *testing.T, expectedOp string, actual appenderRecord) {
|
||||||
|
t.Helper()
|
||||||
|
require.Equal(t, expectedOp, actual.op)
|
||||||
}
|
}
|
||||||
|
|
||||||
func requireEqualOpAndRef(t *testing.T, expectedOp string, expectedRef storage.SeriesRef, actual appenderRecord) {
|
func requireEqualOpAndRef(t *testing.T, expectedOp string, expectedRef storage.SeriesRef, actual appenderRecord) {
|
||||||
@ -833,3 +856,82 @@ func (a *appenderRecorder) Rollback() error {
|
|||||||
func (*appenderRecorder) SetOptions(_ *storage.AppendOptions) {
|
func (*appenderRecorder) SetOptions(_ *storage.AppendOptions) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetadataChangedLogic(t *testing.T) {
|
||||||
|
seriesLabels := labels.FromStrings(model.MetricNameLabel, "test_metric", "foo", "bar")
|
||||||
|
baseMetadata := Metadata{
|
||||||
|
Metadata: metadata.Metadata{Type: model.MetricTypeCounter, Unit: "bytes", Help: "original"},
|
||||||
|
MetricFamilyName: "test_metric",
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
appendMetadata bool
|
||||||
|
modifyMetadata func(Metadata) Metadata
|
||||||
|
expectWALCall bool
|
||||||
|
verifyCached func(*testing.T, metadata.Metadata)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "appendMetadata=false, no change",
|
||||||
|
appendMetadata: false,
|
||||||
|
modifyMetadata: func(m Metadata) Metadata { return m },
|
||||||
|
expectWALCall: false,
|
||||||
|
verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "original", m.Help) },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "appendMetadata=false, help changes - cache updated, no WAL",
|
||||||
|
appendMetadata: false,
|
||||||
|
modifyMetadata: func(m Metadata) Metadata { m.Help = "changed"; return m },
|
||||||
|
expectWALCall: false,
|
||||||
|
verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "changed", m.Help) },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "appendMetadata=true, help changes - cache and WAL updated",
|
||||||
|
appendMetadata: true,
|
||||||
|
modifyMetadata: func(m Metadata) Metadata { m.Help = "changed"; return m },
|
||||||
|
expectWALCall: true,
|
||||||
|
verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "changed", m.Help) },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "appendMetadata=true, unit changes",
|
||||||
|
appendMetadata: true,
|
||||||
|
modifyMetadata: func(m Metadata) Metadata { m.Unit = "seconds"; return m },
|
||||||
|
expectWALCall: true,
|
||||||
|
verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "seconds", m.Unit) },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "appendMetadata=true, type changes",
|
||||||
|
appendMetadata: true,
|
||||||
|
modifyMetadata: func(m Metadata) Metadata { m.Type = model.MetricTypeGauge; return m },
|
||||||
|
expectWALCall: true,
|
||||||
|
verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, model.MetricTypeGauge, m.Type) },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
app := &appenderRecorder{}
|
||||||
|
capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, tt.appendMetadata, NewCombinedAppenderMetrics(prometheus.NewRegistry()))
|
||||||
|
|
||||||
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), baseMetadata, 1, 2, 42.0, nil))
|
||||||
|
|
||||||
|
modifiedMetadata := tt.modifyMetadata(baseMetadata)
|
||||||
|
app.records = nil
|
||||||
|
require.NoError(t, capp.AppendSample(seriesLabels.Copy(), modifiedMetadata, 1, 3, 43.0, nil))
|
||||||
|
|
||||||
|
hash := seriesLabels.Hash()
|
||||||
|
cached, exists := capp.(*combinedAppender).refs[hash]
|
||||||
|
require.True(t, exists)
|
||||||
|
tt.verifyCached(t, cached.meta)
|
||||||
|
|
||||||
|
updateMetadataCalled := false
|
||||||
|
for _, record := range app.records {
|
||||||
|
if record.op == "UpdateMetadata" {
|
||||||
|
updateMetadataCalled = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.Equal(t, tt.expectWALCall, updateMetadataCalled)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1067,7 +1067,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
|
|||||||
|
|
||||||
for b.Loop() {
|
for b.Loop() {
|
||||||
app := &noOpAppender{}
|
app := &noOpAppender{}
|
||||||
mockAppender := NewCombinedAppender(app, noOpLogger, false, appMetrics)
|
mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics)
|
||||||
converter := NewPrometheusConverter(mockAppender)
|
converter := NewPrometheusConverter(mockAppender)
|
||||||
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings)
|
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|||||||
@ -501,6 +501,8 @@ type OTLPOptions struct {
|
|||||||
// IngestCTZeroSample enables writing zero samples based on the start time
|
// IngestCTZeroSample enables writing zero samples based on the start time
|
||||||
// of metrics.
|
// of metrics.
|
||||||
IngestCTZeroSample bool
|
IngestCTZeroSample bool
|
||||||
|
// AppendMetadata enables writing metadata to WAL when metadata-wal-records feature is enabled.
|
||||||
|
AppendMetadata bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
|
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
|
||||||
@ -519,6 +521,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appenda
|
|||||||
lookbackDelta: opts.LookbackDelta,
|
lookbackDelta: opts.LookbackDelta,
|
||||||
ingestCTZeroSample: opts.IngestCTZeroSample,
|
ingestCTZeroSample: opts.IngestCTZeroSample,
|
||||||
enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels,
|
enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels,
|
||||||
|
appendMetadata: opts.AppendMetadata,
|
||||||
// Register metrics.
|
// Register metrics.
|
||||||
metrics: otlptranslator.NewCombinedAppenderMetrics(reg),
|
metrics: otlptranslator.NewCombinedAppenderMetrics(reg),
|
||||||
}
|
}
|
||||||
@ -561,6 +564,7 @@ type rwExporter struct {
|
|||||||
lookbackDelta time.Duration
|
lookbackDelta time.Duration
|
||||||
ingestCTZeroSample bool
|
ingestCTZeroSample bool
|
||||||
enableTypeAndUnitLabels bool
|
enableTypeAndUnitLabels bool
|
||||||
|
appendMetadata bool
|
||||||
|
|
||||||
// Metrics.
|
// Metrics.
|
||||||
metrics otlptranslator.CombinedAppenderMetrics
|
metrics otlptranslator.CombinedAppenderMetrics
|
||||||
@ -572,7 +576,7 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er
|
|||||||
Appender: rw.appendable.Appender(ctx),
|
Appender: rw.appendable.Appender(ctx),
|
||||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||||
}
|
}
|
||||||
combinedAppender := otlptranslator.NewCombinedAppender(app, rw.logger, rw.ingestCTZeroSample, rw.metrics)
|
combinedAppender := otlptranslator.NewCombinedAppender(app, rw.logger, rw.ingestCTZeroSample, rw.appendMetadata, rw.metrics)
|
||||||
converter := otlptranslator.NewPrometheusConverter(combinedAppender)
|
converter := otlptranslator.NewPrometheusConverter(combinedAppender)
|
||||||
annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{
|
annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{
|
||||||
AddMetricSuffixes: otlpCfg.TranslationStrategy.ShouldAddSuffixes(),
|
AddMetricSuffixes: otlpCfg.TranslationStrategy.ShouldAddSuffixes(),
|
||||||
|
|||||||
@ -921,6 +921,7 @@ func TestOTLPWriteHandler(t *testing.T) {
|
|||||||
t.Run(testCase.name, func(t *testing.T) {
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
otlpOpts := OTLPOptions{
|
otlpOpts := OTLPOptions{
|
||||||
EnableTypeAndUnitLabels: testCase.typeAndUnitLabels,
|
EnableTypeAndUnitLabels: testCase.typeAndUnitLabels,
|
||||||
|
AppendMetadata: true,
|
||||||
}
|
}
|
||||||
appendable := handleOTLP(t, exportRequest, testCase.otlpCfg, otlpOpts)
|
appendable := handleOTLP(t, exportRequest, testCase.otlpCfg, otlpOpts)
|
||||||
for _, sample := range testCase.expectedSamples {
|
for _, sample := range testCase.expectedSamples {
|
||||||
|
|||||||
@ -348,6 +348,7 @@ func NewAPI(
|
|||||||
LookbackDelta: lookbackDelta,
|
LookbackDelta: lookbackDelta,
|
||||||
IngestCTZeroSample: ctZeroIngestionEnabled,
|
IngestCTZeroSample: ctZeroIngestionEnabled,
|
||||||
EnableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
EnableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
||||||
|
AppendMetadata: appendMetadata,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user