diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 15f8fe1320..279d10e41b 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -36,48 +36,48 @@ import ( "github.com/prometheus/prometheus/util/annotations" ) -var testHistogram = histogram.Histogram{ - Schema: 2, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 0, - Sum: 20, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - PositiveBuckets: []int64{1}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, - NegativeBuckets: []int64{-1}, -} - -var writeRequestFixture = &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar"}, - }, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, - Histograms: []prompb.Histogram{prompb.FromIntHistogram(0, &testHistogram), prompb.FromFloatHistogram(1, testHistogram.ToFloat(nil))}, - }, - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar"}, - }, - Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, - Histograms: []prompb.Histogram{prompb.FromIntHistogram(2, &testHistogram), prompb.FromFloatHistogram(3, testHistogram.ToFloat(nil))}, - }, - }, -} - var ( + testHistogram = histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 0, + Sum: 20, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{-1}, + } + + writeRequestFixture = &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 1}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 1}}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, &testHistogram), prompb.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 2, Timestamp: 2}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 2}}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(3, &testHistogram), prompb.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + }, + }, + } + writeV2RequestSeries1Metadata = metadata.Metadata{ Type: model.MetricTypeGauge, Help: "Test gauge for test purposes", @@ -88,43 +88,78 @@ var ( Help: "Test counter for test purposes", } - // writeV2RequestFixture represents the same request as writeRequestFixture, but using the v2 representation. - writeV2RequestFixture = func() *writev2.Request { - st := writev2.NewSymbolTable() - b := labels.NewScratchBuilder(0) - labelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].ToLabels(&b, nil), nil) - exemplar1LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil) - exemplar2LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil) - return &writev2.Request{ - Timeseries: []writev2.TimeSeries{ - { - LabelsRefs: labelRefs, - Metadata: writev2.Metadata{ - Type: writev2.Metadata_METRIC_TYPE_GAUGE, // Same as writeV2RequestSeries1Metadata.Type, but in writev2. - HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help), - UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit), - }, - Samples: []writev2.Sample{{Value: 1, Timestamp: 0}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 0}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))}, - }, - { - LabelsRefs: labelRefs, - Metadata: writev2.Metadata{ - Type: writev2.Metadata_METRIC_TYPE_COUNTER, // Same as writeV2RequestSeries2Metadata.Type, but in writev2. - HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help), - // No unit. - }, - Samples: []writev2.Sample{{Value: 2, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 1}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(2, &testHistogram), writev2.FromFloatHistogram(3, testHistogram.ToFloat(nil))}, + // writeV2RequestFixture represents the same request as writeRequestFixture, + // but using the v2 representation, plus includes writeV2RequestSeries1Metadata and writeV2RequestSeries2Metadata. + // NOTE: Use TestWriteV2RequestFixture and copy the diff to regenerate if needed. + writeV2RequestFixture = &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, // writeV2RequestSeries1Metadata.Type. + + HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. + UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. }, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, }, - Symbols: st.Symbols(), - } - }() + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries2Metadata.Type. + + HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help. + // No unit. + }, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + }, + }, + } ) +func TestWriteV2RequestFixture(t *testing.T) { + // Generate dynamically writeV2RequestFixture, reusing v1 fixture elements. + st := writev2.NewSymbolTable() + b := labels.NewScratchBuilder(0) + labelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].ToLabels(&b, nil), nil) + exemplar1LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[0].Exemplars[0].ToExemplar(&b, nil).Labels, nil) + exemplar2LabelRefs := st.SymbolizeLabels(writeRequestFixture.Timeseries[1].Exemplars[0].ToExemplar(&b, nil).Labels, nil) + expected := &writev2.Request{ + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help), + UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit), + }, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + }, + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, + HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help), + // No unit. + }, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + }, + }, + Symbols: st.Symbols(), + } + // Check if it matches static writeV2RequestFixture. + require.Equal(t, expected, writeV2RequestFixture) +} + func TestValidateLabelsAndMetricName(t *testing.T) { tests := []struct { input []prompb.Label diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4bef9909c7..5bafb9da20 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1468,6 +1468,8 @@ func (q *queue) FlushAndShutdown(done <-chan struct{}) { for q.tryEnqueueingBatch(done) { time.Sleep(time.Second) } + q.batchMtx.Lock() + defer q.batchMtx.Unlock() q.batch = nil close(q.batchQueue) } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 9997811ab0..d822373717 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "time" @@ -27,6 +28,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" @@ -43,7 +45,8 @@ type writeHandler struct { logger log.Logger appendable storage.Appendable - samplesWithInvalidLabelsTotal prometheus.Counter + samplesWithInvalidLabelsTotal prometheus.Counter + samplesAppendedWithoutMetadata prometheus.Counter acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{} } @@ -52,6 +55,9 @@ const maxAheadTime = 10 * time.Minute // NewWriteHandler creates a http.Handler that accepts remote write requests with // the given message in acceptedProtoMsgs and writes them to the provided appendable. +// +// NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible +// as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg) http.Handler { protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{} for _, acc := range acceptedProtoMsgs { @@ -61,15 +67,18 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st logger: logger, appendable: appendable, acceptedProtoMsgs: protoMsgs, - samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + samplesWithInvalidLabelsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "api", Name: "remote_write_invalid_labels_samples_total", - Help: "The total number of remote write samples which contains invalid labels.", + Help: "The total number of received remote write samples and histogram samples which were rejected due to invalid labels.", + }), + samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "api", + Name: "remote_write_without_metadata_appended_samples_total", + Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.", }), - } - if reg != nil { - reg.MustRegister(h.samplesWithInvalidLabelsTotal) } return h } @@ -108,15 +117,15 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { contentType = appProtoContentType } - msg, err := h.parseProtoMsg(contentType) + msgType, err := h.parseProtoMsg(contentType) if err != nil { level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err) http.Error(w, err.Error(), http.StatusUnsupportedMediaType) return } - if _, ok := h.acceptedProtoMsgs[msg]; !ok { - err := fmt.Errorf("%v protobuf message is not accepted by this server; accepted %v", msg, func() (ret []string) { + if _, ok := h.acceptedProtoMsgs[msgType]; !ok { + err := fmt.Errorf("%v protobuf message is not accepted by this server; accepted %v", msgType, func() (ret []string) { for k := range h.acceptedProtoMsgs { ret = append(ret, string(k)) } @@ -154,100 +163,111 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Now we have a decompressed buffer we can unmarshal it. - switch msg { - case config.RemoteWriteProtoMsgV1: + + if msgType == config.RemoteWriteProtoMsgV1 { + // PRW 1.0 flow has different proto message and no partial write handling. var req prompb.WriteRequest if err := proto.Unmarshal(decompressed, &req); err != nil { // TODO(bwplotka): Add more context to responded error? - level.Error(h.logger).Log("msg", "Error decoding v1 remote write request", "protobuf_message", msg, "err", err.Error()) + level.Error(h.logger).Log("msg", "Error decoding v1 remote write request", "protobuf_message", msgType, "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } - err = h.write(r.Context(), &req) - case config.RemoteWriteProtoMsgV2: - var req writev2.Request - if err := proto.Unmarshal(decompressed, &req); err != nil { - // TODO(bwplotka): Add more context to responded error? - level.Error(h.logger).Log("msg", "Error decoding v2 remote write request", "protobuf_message", msg, "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return + if err = h.write(r.Context(), &req); err != nil { + switch { + case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample): + // Indicated an out-of-order sample is a bad request to prevent retries. + http.Error(w, err.Error(), http.StatusBadRequest) + return + default: + level.Error(h.logger).Log("msg", "Error while remote writing the v1 request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } - err = h.writeV2(r.Context(), &req) + w.WriteHeader(http.StatusNoContent) + return } - switch { - case err == nil: - case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample): - // Indicated an out of order sample is a bad request to prevent retries. + // Remote Write 2.x proto message handling. + var req writev2.Request + if err := proto.Unmarshal(decompressed, &req); err != nil { + // TODO(bwplotka): Add more context to responded error? + level.Error(h.logger).Log("msg", "Error decoding v2 remote write request", "protobuf_message", msgType, "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return - default: - level.Error(h.logger).Log("msg", "Error appending remote write", "err", err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) + } + + respStats, errHTTPCode, err := h.writeV2(r.Context(), &req) + + // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases. + respStats.SetResponseHeaders(w.Header()) + + if err != nil { + if errHTTPCode/5 == 100 { // 5xx + level.Error(h.logger).Log("msg", "Error while remote writing the v2 request", "err", err.Error()) + } + http.Error(w, err.Error(), errHTTPCode) return } - w.WriteHeader(http.StatusNoContent) } -// checkAppendExemplarError modifies the AppendExemplar's returned error based on the error cause. -func (h *writeHandler) checkAppendExemplarError(err error, e exemplar.Exemplar, outOfOrderErrs *int) error { - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - switch { - case errors.Is(unwrappedErr, storage.ErrNotFound): - return storage.ErrNotFound - case errors.Is(unwrappedErr, storage.ErrOutOfOrderExemplar): - *outOfOrderErrs++ - level.Debug(h.logger).Log("msg", "Out of order exemplar", "exemplar", fmt.Sprintf("%+v", e)) - return nil - default: - return err - } -} - func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { outOfOrderExemplarErrs := 0 samplesWithInvalidLabels := 0 + samplesAppended := 0 - timeLimitApp := &timeLimitAppender{ + app := &timeLimitAppender{ Appender: h.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } defer func() { if err != nil { - _ = timeLimitApp.Rollback() + _ = app.Rollback() return } - err = timeLimitApp.Commit() + err = app.Commit() + if err != nil { + h.samplesAppendedWithoutMetadata.Add(float64(samplesAppended)) + } }() b := labels.NewScratchBuilder(0) for _, ts := range req.Timeseries { ls := ts.ToLabels(&b, nil) - if !ls.IsValid() { + if !ls.Has(labels.MetricName) || !ls.IsValid() { level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", ls.String()) samplesWithInvalidLabels++ + // TODO(bwplotka): Even as per 1.0 spec, this should be a 400 error, while other samples are + // potentially written. Perhaps unify with fixed writeV2 implementation a bit. continue } - err := h.appendSamples(timeLimitApp, ts.Samples, ls) - if err != nil { + if err := h.appendV1Samples(app, ts.Samples, ls); err != nil { return err } + samplesAppended += len(ts.Samples) for _, ep := range ts.Exemplars { e := ep.ToExemplar(&b, nil) - h.appendExemplar(timeLimitApp, e, ls, &outOfOrderExemplarErrs) + if _, err := app.AppendExemplar(0, ls, e); err != nil { + switch { + case errors.Is(err, storage.ErrOutOfOrderExemplar): + outOfOrderExemplarErrs++ + level.Debug(h.logger).Log("msg", "Out of order exemplar", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) + default: + // Since exemplar storage is still experimental, we don't fail the request on ingestion errors + level.Debug(h.logger).Log("msg", "Error while adding exemplar in AppendExemplar", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err) + } + } } - err = h.appendHistograms(timeLimitApp, ts.Histograms, ls) - if err != nil { + if err = h.appendV1Histograms(app, ts.Histograms, ls); err != nil { return err } + samplesAppended += len(ts.Histograms) } if outOfOrderExemplarErrs > 0 { @@ -256,151 +276,216 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err if samplesWithInvalidLabels > 0 { h.samplesWithInvalidLabelsTotal.Add(float64(samplesWithInvalidLabels)) } - return nil } -func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (err error) { - outOfOrderExemplarErrs := 0 +func (h *writeHandler) appendV1Samples(app storage.Appender, ss []prompb.Sample, labels labels.Labels) error { + var ref storage.SeriesRef + var err error + for _, s := range ss { + ref, err = app.Append(ref, labels, s.GetTimestamp(), s.GetValue()) + if err != nil { + if errors.Is(err, storage.ErrOutOfOrderSample) || + errors.Is(err, storage.ErrOutOfBounds) || + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) + } + return err + } + } + return nil +} - timeLimitApp := &timeLimitAppender{ +func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Histogram, labels labels.Labels) error { + var err error + for _, hp := range hh { + if hp.IsFloatHistogram() { + _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram()) + } else { + _, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil) + } + if err != nil { + // Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is + // a note indicating its inclusion in the future. + if errors.Is(err, storage.ErrOutOfOrderSample) || + errors.Is(err, storage.ErrOutOfBounds) || + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) + } + return err + } + } + return nil +} + +const ( + prw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Written-Samples" + rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Written-Histograms" + rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Written-Exemplars" +) + +type responseStats struct { + samples int + histograms int + exemplars int +} + +func (s responseStats) SetResponseHeaders(h http.Header) { + h.Set(prw20WrittenSamplesHeader, strconv.Itoa(s.samples)) + h.Set(rw20WrittenHistogramsHeader, strconv.Itoa(s.histograms)) + h.Set(rw20WrittenExemplarsHeader, strconv.Itoa(s.exemplars)) +} + +// writeV2 is similar to write, but it works with v2 proto message, +// allows partial 4xx writes and gathers statistics. +// +// writeV2 returns the statistics. +// In error cases, writeV2, also returns statistics, but also the error that +// should be propagated to the remote write sender and httpCode to use for status. +// +// NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors. +// Once we have 5xx type of error, we immediately stop and rollback all appends. +func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ responseStats, errHTTPCode int, _ error) { + app := &timeLimitAppender{ Appender: h.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } - defer func() { - if err != nil { - _ = timeLimitApp.Rollback() - return + rs := responseStats{} + samplesWithoutMetadata, errHTTPCode, err := h.appendV2(app, req, &rs) + if err != nil { + if errHTTPCode/5 == 100 { + // On 5xx, we always rollback, because we expect + // sender to retry and TSDB is not idempotent. + if rerr := app.Rollback(); rerr != nil { + level.Error(h.logger).Log("msg", "writev2 rollback failed on retry-able error", "err", rerr) + } + return responseStats{}, errHTTPCode, err } - err = timeLimitApp.Commit() - }() - b := labels.NewScratchBuilder(0) + // Non-retriable (e.g. bad request error case). Can be partially written. + commitErr := app.Commit() + if commitErr != nil { + // Bad requests does not matter as we have internal error (retryable). + return responseStats{}, http.StatusInternalServerError, commitErr + } + // Bad request error happened, but rest of data (if any) was written. + h.samplesAppendedWithoutMetadata.Add(float64(samplesWithoutMetadata)) + return rs, errHTTPCode, err + } + + // All good just commit. + if err := app.Commit(); err != nil { + return responseStats{}, http.StatusInternalServerError, err + } + h.samplesAppendedWithoutMetadata.Add(float64(samplesWithoutMetadata)) + return rs, 0, nil +} + +func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *responseStats) (samplesWithoutMetadata, errHTTPCode int, err error) { + var ( + badRequestErrs []error + outOfOrderExemplarErrs, samplesWithInvalidLabels int + + b = labels.NewScratchBuilder(0) + ) for _, ts := range req.Timeseries { ls := ts.ToLabels(&b, req.Symbols) - - err := h.appendSamplesV2(timeLimitApp, ts.Samples, ls) - if err != nil { - return err + // Validate series labels early. + // NOTE(bwplotka): While spec allows UTF-8, Prometheus Receiver may impose + // specific limits and follow https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples case. + if !ls.Has(labels.MetricName) || !ls.IsValid() { + badRequestErrs = append(badRequestErrs, fmt.Errorf("invalid metric name or labels, got %v", ls.String())) + samplesWithInvalidLabels += len(ts.Samples) + len(ts.Histograms) + continue } + allSamplesSoFar := rs.samples + rs.histograms + var ref storage.SeriesRef + + // Samples. + for _, s := range ts.Samples { + ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) + if err == nil { + rs.samples++ + continue + } + // Handle append error. + if errors.Is(err, storage.ErrOutOfOrderSample) || + errors.Is(err, storage.ErrOutOfBounds) || + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) || + errors.Is(err, storage.ErrTooOldSample) { + // TODO(bwplotka): Not too spammy log? + level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", ls.String(), "timestamp", s.Timestamp) + badRequestErrs = append(badRequestErrs, fmt.Errorf("%w for series %v", err, ls.String())) + continue + } + return 0, http.StatusInternalServerError, err + } + + // Native Histograms. + for _, hp := range ts.Histograms { + if hp.IsFloatHistogram() { + ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram()) + } else { + ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, hp.ToIntHistogram(), nil) + } + if err == nil { + rs.histograms++ + continue + } + // Handle append error. + // Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is + // a note indicating its inclusion in the future. + if errors.Is(err, storage.ErrOutOfOrderSample) || + errors.Is(err, storage.ErrOutOfBounds) || + errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { + // TODO(bwplotka): Not too spammy log? + level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", ls.String(), "timestamp", hp.Timestamp) + badRequestErrs = append(badRequestErrs, fmt.Errorf("%w for series %v", err, ls.String())) + continue + } + return 0, http.StatusInternalServerError, err + } + + // Exemplars. for _, ep := range ts.Exemplars { e := ep.ToExemplar(&b, req.Symbols) - h.appendExemplar(timeLimitApp, e, ls, &outOfOrderExemplarErrs) - } - - err = h.appendHistogramsV2(timeLimitApp, ts.Histograms, ls) - if err != nil { - return err + ref, err = app.AppendExemplar(ref, ls, e) + if err == nil { + rs.exemplars++ + continue + } + // Handle append error. + // TODO(bwplotka): I left the logic as in v1, but we might want to make it consistent with samples and histograms. + // Since exemplar storage is still experimental, we don't fail in anyway, the request on ingestion errors. + if errors.Is(err, storage.ErrOutOfOrderExemplar) { + outOfOrderExemplarErrs++ + level.Debug(h.logger).Log("msg", "Out of order exemplar", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) + continue + } + level.Debug(h.logger).Log("msg", "Error while adding exemplar in AppendExemplar", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err) } m := ts.ToMetadata(req.Symbols) - if _, err = timeLimitApp.UpdateMetadata(0, ls, m); err != nil { + if _, err = app.UpdateMetadata(ref, ls, m); err != nil { level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err) + // Metadata is attached to each series, so since Prometheus does not reject sample without metadata information, + // we don't report remote write error either. We increment metric instead. + samplesWithoutMetadata += (rs.samples + rs.histograms) - allSamplesSoFar } } if outOfOrderExemplarErrs > 0 { - _ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs) + level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs) } + h.samplesWithInvalidLabelsTotal.Add(float64(samplesWithInvalidLabels)) - return nil -} - -func (h *writeHandler) appendExemplar(app storage.Appender, e exemplar.Exemplar, labels labels.Labels, outOfOrderExemplarErrs *int) { - _, err := app.AppendExemplar(0, labels, e) - err = h.checkAppendExemplarError(err, e, outOfOrderExemplarErrs) - if err != nil { - // Since exemplar storage is still experimental, we don't fail the request on ingestion errors - level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", err) + if len(badRequestErrs) == 0 { + return samplesWithoutMetadata, 0, nil } -} - -func (h *writeHandler) appendSamples(app storage.Appender, ss []prompb.Sample, labels labels.Labels) error { - var ref storage.SeriesRef - var err error - for _, s := range ss { - ref, err = app.Append(ref, labels, s.GetTimestamp(), s.GetValue()) - if err != nil { - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) - } - return err - } - } - return nil -} - -func (h *writeHandler) appendSamplesV2(app storage.Appender, ss []writev2.Sample, labels labels.Labels) error { - var ref storage.SeriesRef - var err error - for _, s := range ss { - ref, err = app.Append(ref, labels, s.GetTimestamp(), s.GetValue()) - if err != nil { - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) - } - return err - } - } - return nil -} - -func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histogram, labels labels.Labels) error { - var err error - for _, hp := range hh { - if hp.IsFloatHistogram() { - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram()) - } else { - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil) - } - if err != nil { - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - // Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is - // a note indicating its inclusion in the future. - if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) - } - return err - } - } - return nil -} - -func (h *writeHandler) appendHistogramsV2(app storage.Appender, hh []writev2.Histogram, labels labels.Labels) error { - var err error - for _, hp := range hh { - if hp.IsFloatHistogram() { - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, hp.ToFloatHistogram()) - } else { - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hp.ToIntHistogram(), nil) - } - if err != nil { - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - // Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is - // a note indicating its inclusion in the future. - if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) - } - return err - } - } - return nil + // TODO(bwplotka): Better concat formatting? Perhaps add size limit? + return samplesWithoutMetadata, http.StatusBadRequest, errors.Join(badRequestErrs...) } // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 24bd7059ae..9b5fb1a6ef 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -16,6 +16,7 @@ package remote import ( "bytes" "context" + "errors" "fmt" "io" "math" @@ -27,6 +28,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -290,64 +292,224 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { } } +func expectHeaderValue(t testing.TB, expected int, got string) { + t.Helper() + + require.NotEmpty(t, got) + i, err := strconv.Atoi(got) + require.NoError(t, err) + require.Equal(t, expected, i) +} + func TestRemoteWriteHandler_V2Message(t *testing.T) { - payload, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeV2RequestFixture.Timeseries, writeV2RequestFixture.Symbols, nil, nil, nil, "snappy") - require.NoError(t, err) + // V2 supports partial writes for non-retriable errors, so test them. + for _, tc := range []struct { + desc string + input []writev2.TimeSeries + expectedCode int + expectedRespBody string - req, err := http.NewRequest("", "", bytes.NewReader(payload)) - require.NoError(t, err) + commitErr error + appendSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error + }{ + { + desc: "All timeseries accepted", + input: writeV2RequestFixture.Timeseries, + expectedCode: http.StatusNoContent, + }, + { + desc: "Partial write; first series with invalid labels (no metric name)", + input: append( + // Series with test_metric1="test_metric1" labels. + []writev2.TimeSeries{{LabelsRefs: []uint32{2, 2}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, + writeV2RequestFixture.Timeseries...), + expectedCode: http.StatusBadRequest, + expectedRespBody: "invalid metric name or labels, got {test_metric1=\"test_metric1\"}\n", + }, + { + desc: "Partial write; first series with invalid labels (empty metric name)", + input: append( + // Series with __name__="" labels. + []writev2.TimeSeries{{LabelsRefs: []uint32{1, 0}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, + writeV2RequestFixture.Timeseries...), + expectedCode: http.StatusBadRequest, + expectedRespBody: "invalid metric name or labels, got {__name__=\"\"}\n", + }, + { + desc: "Partial write; first series with one OOO sample", + input: func() []writev2.TimeSeries { + f := proto.Clone(writeV2RequestFixture).(*writev2.Request) + f.Timeseries[0].Samples = append(f.Timeseries[0].Samples, writev2.Sample{Value: 2, Timestamp: 0}) + return f.Timeseries + }(), + expectedCode: http.StatusBadRequest, + expectedRespBody: "out of order sample for series {__name__=\"test_metric1\", b=\"c\", baz=\"qux\", d=\"e\", foo=\"bar\"}\n", + }, + { + desc: "Partial write; first series with one dup sample", + input: func() []writev2.TimeSeries { + f := proto.Clone(writeV2RequestFixture).(*writev2.Request) + f.Timeseries[0].Samples = append(f.Timeseries[0].Samples, f.Timeseries[0].Samples[0]) + return f.Timeseries + }(), + expectedCode: http.StatusBadRequest, + expectedRespBody: "duplicate sample for timestamp for series {__name__=\"test_metric1\", b=\"c\", baz=\"qux\", d=\"e\", foo=\"bar\"}\n", + }, + { + desc: "Partial write; first series with one OOO histogram sample", + input: func() []writev2.TimeSeries { + f := proto.Clone(writeV2RequestFixture).(*writev2.Request) + f.Timeseries[0].Histograms = append(f.Timeseries[0].Histograms, writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))) + return f.Timeseries + }(), + expectedCode: http.StatusBadRequest, + expectedRespBody: "out of order sample for series {__name__=\"test_metric1\", b=\"c\", baz=\"qux\", d=\"e\", foo=\"bar\"}\n", + }, + { + desc: "Partial write; first series with one dup histogram sample", + input: func() []writev2.TimeSeries { + f := proto.Clone(writeV2RequestFixture).(*writev2.Request) + f.Timeseries[0].Histograms = append(f.Timeseries[0].Histograms, f.Timeseries[0].Histograms[1]) + return f.Timeseries + }(), + expectedCode: http.StatusBadRequest, + expectedRespBody: "duplicate sample for timestamp for series {__name__=\"test_metric1\", b=\"c\", baz=\"qux\", d=\"e\", foo=\"bar\"}\n", + }, + // Non retriable errors from various parts. + { + desc: "Internal sample append error; rollback triggered", + input: writeV2RequestFixture.Timeseries, + appendSampleErr: errors.New("some sample internal append error"), - req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) - req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) + expectedCode: http.StatusInternalServerError, + expectedRespBody: "some sample internal append error\n", + }, + { + desc: "Internal histogram sample append error; rollback triggered", + input: writeV2RequestFixture.Timeseries, + appendHistogramErr: errors.New("some histogram sample internal append error"), - appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + expectedCode: http.StatusInternalServerError, + expectedRespBody: "some histogram sample internal append error\n", + }, + { + desc: "Partial write; skipped exemplar; exemplar storage errs are noop", + input: writeV2RequestFixture.Timeseries, + appendExemplarErr: errors.New("some exemplar append error"), - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + expectedCode: http.StatusNoContent, + }, + { + desc: "Partial write; skipped metadata; metadata storage errs are noop", + input: writeV2RequestFixture.Timeseries, + updateMetadataErr: errors.New("some metadata update error"), - resp := recorder.Result() - require.Equal(t, http.StatusNoContent, resp.StatusCode) + expectedCode: http.StatusNoContent, + }, + { + desc: "Internal commit error; rollback triggered", + input: writeV2RequestFixture.Timeseries, + commitErr: errors.New("storage error"), - b := labels.NewScratchBuilder(0) - i := 0 - j := 0 - k := 0 - for _, ts := range writeV2RequestFixture.Timeseries { - ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) + expectedCode: http.StatusInternalServerError, + expectedRespBody: "storage error\n", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + payload, _, _, err := buildV2WriteRequest(log.NewNopLogger(), tc.input, writeV2RequestFixture.Symbols, nil, nil, nil, "snappy") + require.NoError(t, err) - for _, s := range ts.Samples { - requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) + req, err := http.NewRequest("", "", bytes.NewReader(payload)) + require.NoError(t, err) - switch i { - case 0: - requireEqual(t, mockMetadata{ls, writeV2RequestSeries1Metadata}, appendable.metadata[i]) - case 1: - requireEqual(t, mockMetadata{ls, writeV2RequestSeries2Metadata}, appendable.metadata[i]) - default: - t.Fatal("more series/samples then expected") + req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) + req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) + + appendable := &mockAppendable{ + commitErr: tc.commitErr, + appendSampleErr: tc.appendSampleErr, + appendHistogramErr: tc.appendHistogramErr, + appendExemplarErr: tc.appendExemplarErr, + updateMetadataErr: tc.updateMetadataErr, } - i++ - } - for _, e := range ts.Exemplars { - exemplarLabels := e.ToExemplar(&b, writeV2RequestFixture.Symbols).Labels - requireEqual(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) - j++ - } - for _, hp := range ts.Histograms { - if hp.IsFloatHistogram() { - fh := hp.ToFloatHistogram() - requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, tc.expectedCode, resp.StatusCode) + respBody, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedRespBody, string(respBody)) + + if tc.expectedCode == http.StatusInternalServerError { + // We don't expect writes for partial writes with retry-able code. + expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Samples")) + expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Histograms")) + expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars")) + + require.Empty(t, len(appendable.samples)) + require.Empty(t, len(appendable.histograms)) + require.Empty(t, len(appendable.exemplars)) + require.Empty(t, len(appendable.metadata)) + return + } + + // Double check mandatory 2.0 stats. + // writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each. + expectHeaderValue(t, 2, resp.Header.Get("X-Prometheus-Remote-Write-Written-Samples")) + expectHeaderValue(t, 4, resp.Header.Get("X-Prometheus-Remote-Write-Written-Histograms")) + if tc.appendExemplarErr != nil { + expectHeaderValue(t, 0, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars")) } else { - h := hp.ToIntHistogram() - requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) + expectHeaderValue(t, 2, resp.Header.Get("X-Prometheus-Remote-Write-Written-Exemplars")) } - k++ - } + + // Double check what was actually appended. + var ( + b = labels.NewScratchBuilder(0) + i, j, k, m int + ) + for _, ts := range writeV2RequestFixture.Timeseries { + ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) + + for _, s := range ts.Samples { + requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) + i++ + } + for _, hp := range ts.Histograms { + if hp.IsFloatHistogram() { + fh := hp.ToFloatHistogram() + requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) + } else { + h := hp.ToIntHistogram() + requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) + } + k++ + } + if tc.appendExemplarErr == nil { + for _, e := range ts.Exemplars { + exemplarLabels := e.ToExemplar(&b, writeV2RequestFixture.Symbols).Labels + requireEqual(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) + j++ + } + } + if tc.updateMetadataErr == nil { + expectedMeta := ts.ToMetadata(writeV2RequestFixture.Symbols) + requireEqual(t, mockMetadata{ls, expectedMeta}, appendable.metadata[m]) + m++ + } + } + }) } } +// NOTE: V2 Message is tested in TestRemoteWriteHandler_V2Message. func TestOutOfOrderSample_V1Message(t *testing.T) { for _, tc := range []struct { Name string @@ -372,7 +534,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { req, err := http.NewRequest("", "", bytes.NewReader(payload)) require.NoError(t, err) - appendable := &mockAppendable{latestSample: 100} + appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) recorder := httptest.NewRecorder() @@ -384,49 +546,10 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { } } -func TestOutOfOrderSample_V2Message(t *testing.T) { - for _, tc := range []struct { - Name string - Timestamp int64 - }{ - { - Name: "historic", - Timestamp: 0, - }, - { - Name: "future", - Timestamp: math.MaxInt64, - }, - } { - t.Run(tc.Name, func(t *testing.T) { - payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{ - LabelsRefs: []uint32{1, 2}, - Samples: []writev2.Sample{{Value: 1, Timestamp: tc.Timestamp}}, - }}, []string{"", "__name__", "metric1"}, nil, nil, nil, "snappy") - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(payload)) - require.NoError(t, err) - - req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) - req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) - - appendable := &mockAppendable{latestSample: 100} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) - }) - } -} - // This test case currently aims to verify that the WriteHandler endpoint // don't fail on exemplar ingestion errors since the exemplar storage is // still experimental. +// NOTE: V2 Message is tested in TestRemoteWriteHandler_V2Message. func TestOutOfOrderExemplar_V1Message(t *testing.T) { tests := []struct { Name string @@ -453,7 +576,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) { req, err := http.NewRequest("", "", bytes.NewReader(payload)) require.NoError(t, err) - appendable := &mockAppendable{latestExemplar: 100} + appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) recorder := httptest.NewRecorder() @@ -466,49 +589,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) { } } -func TestOutOfOrderExemplar_V2Message(t *testing.T) { - tests := []struct { - Name string - Timestamp int64 - }{ - { - Name: "historic", - Timestamp: 0, - }, - { - Name: "future", - Timestamp: math.MaxInt64, - }, - } - - for _, tc := range tests { - t.Run(tc.Name, func(t *testing.T) { - payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{ - LabelsRefs: []uint32{1, 2}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{3, 4}, Value: 1, Timestamp: tc.Timestamp}}, - }}, []string{"", "__name__", "metric1", "foo", "bar"}, nil, nil, nil, "snappy") - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(payload)) - require.NoError(t, err) - - req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) - req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) - - appendable := &mockAppendable{latestExemplar: 100} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. - require.Equal(t, http.StatusNoContent, resp.StatusCode) - }) - } -} - +// NOTE: V2 Message is tested in TestRemoteWriteHandler_V2Message. func TestOutOfOrderHistogram_V1Message(t *testing.T) { for _, tc := range []struct { Name string @@ -533,7 +614,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { req, err := http.NewRequest("", "", bytes.NewReader(payload)) require.NoError(t, err) - appendable := &mockAppendable{latestHistogram: 100} + appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) recorder := httptest.NewRecorder() @@ -545,46 +626,6 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { } } -func TestOutOfOrderHistogram_V2Message(t *testing.T) { - for _, tc := range []struct { - Name string - Timestamp int64 - }{ - { - Name: "historic", - Timestamp: 0, - }, - { - Name: "future", - Timestamp: math.MaxInt64, - }, - } { - t.Run(tc.Name, func(t *testing.T) { - payload, _, _, err := buildV2WriteRequest(nil, []writev2.TimeSeries{{ - LabelsRefs: []uint32{0, 1}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(0, &testHistogram), writev2.FromFloatHistogram(1, testHistogram.ToFloat(nil))}, - }}, []string{"__name__", "metric1"}, nil, nil, nil, "snappy") - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(payload)) - require.NoError(t, err) - - req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[config.RemoteWriteProtoMsgV2]) - req.Header.Set("Content-Encoding", string(SnappyBlockCompression)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) - - appendable := &mockAppendable{latestHistogram: 100} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) - }) - } -} - func BenchmarkRemoteWriteHandler(b *testing.B) { const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte" var reqs []*http.Request @@ -719,15 +760,20 @@ func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries { } type mockAppendable struct { - latestSample int64 + latestSample map[uint64]int64 samples []mockSample - latestExemplar int64 + latestExemplar map[uint64]int64 exemplars []mockExemplar - latestHistogram int64 + latestHistogram map[uint64]int64 histograms []mockHistogram metadata []mockMetadata - commitErr error + // optional errors to inject. + commitErr error + appendSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error } type mockSample struct { @@ -765,48 +811,92 @@ func requireEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...inte } func (m *mockAppendable) Appender(_ context.Context) storage.Appender { + if m.latestSample == nil { + m.latestSample = map[uint64]int64{} + } + if m.latestHistogram == nil { + m.latestHistogram = map[uint64]int64{} + } + if m.latestExemplar == nil { + m.latestExemplar = map[uint64]int64{} + } return m } func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if t < m.latestSample { - return 0, storage.ErrOutOfOrderSample + if m.appendSampleErr != nil { + return 0, m.appendSampleErr } - m.latestSample = t + latestTs := m.latestSample[l.Hash()] + if t < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if t == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + m.latestSample[l.Hash()] = t m.samples = append(m.samples, mockSample{l, t, v}) return 0, nil } func (m *mockAppendable) Commit() error { + if m.commitErr != nil { + _ = m.Rollback() // As per Commit method contract. + } return m.commitErr } -func (*mockAppendable) Rollback() error { - return fmt.Errorf("not implemented") +func (m *mockAppendable) Rollback() error { + m.samples = m.samples[:0] + m.exemplars = m.exemplars[:0] + m.histograms = m.histograms[:0] + m.metadata = m.metadata[:0] + return nil } func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - if e.Ts < m.latestExemplar { - return 0, storage.ErrOutOfOrderExemplar + if m.appendExemplarErr != nil { + return 0, m.appendExemplarErr } - m.latestExemplar = e.Ts + latestTs := m.latestExemplar[l.Hash()] + if e.Ts < latestTs { + return 0, storage.ErrOutOfOrderExemplar + } + if e.Ts == latestTs { + return 0, storage.ErrDuplicateExemplar + } + + m.latestExemplar[l.Hash()] = e.Ts m.exemplars = append(m.exemplars, mockExemplar{l, e.Labels, e.Ts, e.Value}) return 0, nil } func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - if t < m.latestHistogram { - return 0, storage.ErrOutOfOrderSample + if m.appendHistogramErr != nil { + return 0, m.appendHistogramErr } - m.latestHistogram = t + latestTs := m.latestHistogram[l.Hash()] + if t < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if t == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + m.latestHistogram[l.Hash()] = t m.histograms = append(m.histograms, mockHistogram{l, t, h, fh}) return 0, nil } func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) { + if m.updateMetadataErr != nil { + return 0, m.updateMetadataErr + } + m.metadata = append(m.metadata, mockMetadata{l: l, m: mp}) return 0, nil } diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 648ec4b17f..6e7422a585 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -369,7 +369,7 @@ func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) { } func TestOTLPWriteHandler(t *testing.T) { - exportRequest := generateOTLPWriteRequest(t) + exportRequest := generateOTLPWriteRequest() buf, err := exportRequest.MarshalProto() require.NoError(t, err) @@ -392,7 +392,7 @@ func TestOTLPWriteHandler(t *testing.T) { require.Len(t, appendable.exemplars, 1) // 1 (exemplar) } -func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest { +func generateOTLPWriteRequest() pmetricotlp.ExportRequest { d := pmetric.NewMetrics() // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram @@ -422,6 +422,7 @@ func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest { counterDataPoint.Attributes().PutStr("foo.bar", "baz") counterExemplar := counterDataPoint.Exemplars().AppendEmpty() + counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) counterExemplar.SetDoubleValue(10.0) counterExemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7})