From d77c985f8c122cf5947b0eb621999fcc8dcca9bf Mon Sep 17 00:00:00 2001 From: Serge Catudal Date: Tue, 21 Sep 2021 16:53:27 -0400 Subject: [PATCH] Add initial support for exemplar to the remote write receiver endpoint (#9319) * Add initial support for exemplar to the remote write receiver endpoint Signed-off-by: Serge Catudal * Update storage remote write handler tests with exemplars Signed-off-by: Serge Catudal * Update remote write handler in order to have a distinct checkAppendExemplarError function from scrape Signed-off-by: Serge Catudal --- storage/remote/codec.go | 12 +++++ storage/remote/codec_test.go | 6 ++- storage/remote/write_handler.go | 41 ++++++++++++++++- storage/remote/write_handler_test.go | 68 ++++++++++++++++++++++++---- 4 files changed, 114 insertions(+), 13 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 0bd1b97622..545138da56 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/prompb" @@ -450,6 +451,17 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro return result, nil } +func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar { + timestamp := ep.Timestamp + + return exemplar.Exemplar{ + Labels: labelProtosToLabels(ep.Labels), + Value: ep.Value, + Ts: timestamp, + HasTs: timestamp != 0, + } +} + // LabelProtosToMetric unpack a []*prompb.Label to a model.Metric func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { metric := make(model.Metric, len(labelPairs)) diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 0f6a56bdad..68cfe9899f 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -36,7 +36,8 @@ var writeRequestFixture = &prompb.WriteRequest{ {Name: "d", Value: "e"}, {Name: "foo", Value: "bar"}, }, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, }, { Labels: []prompb.Label{ @@ -46,7 +47,8 @@ var writeRequestFixture = &prompb.WriteRequest{ {Name: "d", Value: "e"}, {Name: "foo", Value: "bar"}, }, - Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, }, }, } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index a3dee6136a..92637cf471 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -15,10 +15,14 @@ package remote import ( "context" + "fmt" "net/http" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/pkg/errors" + + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" ) @@ -62,16 +66,35 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +// checkAppendExemplarError modifies the AppendExamplar's returned error based on the error cause. +func (h *writeHandler) checkAppendExemplarError(err error, e exemplar.Exemplar, outOfOrderErrs *int) error { + switch errors.Cause(err) { + case storage.ErrNotFound: + return storage.ErrNotFound + case storage.ErrOutOfOrderExemplar: + *outOfOrderErrs++ + level.Debug(h.logger).Log("msg", "Out of order exemplar", "exemplar", fmt.Sprintf("%+v", e)) + return nil + default: + return err + } +} + func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { + var ( + outOfOrderExemplarErrs = 0 + ) + app := h.appendable.Appender(ctx) defer func() { if err != nil { - app.Rollback() + _ = app.Rollback() return } err = app.Commit() }() + var exemplarErr error for _, ts := range req.Timeseries { labels := labelProtosToLabels(ts.Labels) for _, s := range ts.Samples { @@ -79,7 +102,23 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err if err != nil { return err } + } + + for _, ep := range ts.Exemplars { + e := exemplarProtoToExemplar(ep) + + _, exemplarErr = app.AppendExemplar(0, labels, e) + exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs) + if exemplarErr != nil { + // Since exemplar storage is still experimental, we don't fail the request on ingestion errors. + level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) + } + } + } + + if outOfOrderExemplarErrs > 0 { + _ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs) } return nil diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5e086fe2fa..d4ba5bfd8b 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -23,11 +23,12 @@ import ( "testing" "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" ) func TestRemoteWriteHandler(t *testing.T) { @@ -47,16 +48,23 @@ func TestRemoteWriteHandler(t *testing.T) { require.Equal(t, http.StatusNoContent, resp.StatusCode) i := 0 + j := 0 for _, ts := range writeRequestFixture.Timeseries { labels := labelProtosToLabels(ts.Labels) for _, s := range ts.Samples { require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) i++ } + + for _, e := range ts.Exemplars { + exemplarLabels := labelProtosToLabels(e.Labels) + require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) + j++ + } } } -func TestOutOfOrder(t *testing.T) { +func TestOutOfOrderSample(t *testing.T) { buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, @@ -67,7 +75,7 @@ func TestOutOfOrder(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{ - latest: 100, + latestSample: 100, } handler := NewWriteHandler(log.NewNopLogger(), appendable) @@ -78,6 +86,32 @@ func TestOutOfOrder(t *testing.T) { require.Equal(t, http.StatusBadRequest, resp.StatusCode) } +// This test case currently aims to verify that the WriteHandler endpoint +// don't fail on ingestion errors since the exemplar storage is +// still experimental. +func TestOutOfOrderExemplar(t *testing.T) { + buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, + }}, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + latestExemplar: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. + require.Equal(t, http.StatusNoContent, resp.StatusCode) +} + func TestCommitErr(t *testing.T) { buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) require.NoError(t, err) @@ -101,9 +135,11 @@ func TestCommitErr(t *testing.T) { } type mockAppendable struct { - latest int64 - samples []mockSample - commitErr error + latestSample int64 + samples []mockSample + latestExemplar int64 + exemplars []mockExemplar + commitErr error } type mockSample struct { @@ -112,16 +148,23 @@ type mockSample struct { v float64 } +type mockExemplar struct { + l labels.Labels + el labels.Labels + t int64 + v float64 +} + func (m *mockAppendable) Appender(_ context.Context) storage.Appender { return m } func (m *mockAppendable) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) { - if t < m.latest { + if t < m.latestSample { return 0, storage.ErrOutOfOrderSample } - m.latest = t + m.latestSample = t m.samples = append(m.samples, mockSample{l, t, v}) return 0, nil } @@ -134,7 +177,12 @@ func (*mockAppendable) Rollback() error { return fmt.Errorf("not implemented") } -func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { - // noop until we implement exemplars over remote write +func (m *mockAppendable) AppendExemplar(_ uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + if e.Ts < m.latestExemplar { + return 0, storage.ErrOutOfOrderExemplar + } + + m.latestExemplar = e.Ts + m.exemplars = append(m.exemplars, mockExemplar{l, e.Labels, e.Ts, e.Value}) return 0, nil }