mirror of
https://github.com/prometheus/prometheus.git
synced 2026-03-04 21:21:02 +01:00
The OTLP write handler and the PRW v2 histogram append path were missing ErrTooOldSample from their error type checks, causing these errors to fall through to the default case and return HTTP 500 Internal Server Error. This triggered unnecessary retries in OTLP clients like the Python SDK. The PRW v1 write handler (line 115) and the PRW v2 sample append path (line 377) already correctly handle ErrTooOldSample as a 400, and this change makes the remaining paths consistent. Also adds ErrTooOldSample to the v1 sample/histogram log checks so these errors are properly logged instead of silently returned. Fixes #16645 Signed-off-by: Varun Chawla <varun_6april@hotmail.com>
277 lines
10 KiB
Go
277 lines
10 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package remote
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"time"
|
|
|
|
deltatocumulative "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/consumer"
|
|
"go.opentelemetry.io/collector/pdata/pmetric"
|
|
"go.opentelemetry.io/collector/processor"
|
|
"go.opentelemetry.io/otel/metric/noop"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/storage"
|
|
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
|
|
)
|
|
|
|
type OTLPOptions struct {
|
|
// Convert delta samples to their cumulative equivalent by aggregating in-memory
|
|
ConvertDelta bool
|
|
// Store the raw delta samples as metrics with unknown type (we don't have a proper type for delta yet, therefore
|
|
// marking the metric type as unknown for now).
|
|
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/)
|
|
NativeDelta bool
|
|
// LookbackDelta is the query lookback delta.
|
|
// Used to calculate the target_info sample timestamp interval.
|
|
LookbackDelta time.Duration
|
|
// Add type and unit labels to the metrics.
|
|
EnableTypeAndUnitLabels bool
|
|
}
|
|
|
|
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
|
|
// writes them to the provided appendable.
|
|
func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.AppendableV2, configFunc func() config.Config, opts OTLPOptions) http.Handler {
|
|
if opts.NativeDelta && opts.ConvertDelta {
|
|
// This should be validated when iterating through feature flags, so not expected to fail here.
|
|
panic("cannot enable native delta ingestion and delta2cumulative conversion at the same time")
|
|
}
|
|
|
|
ex := &rwExporter{
|
|
logger: logger,
|
|
appendable: newOTLPInstrumentedAppendable(reg, appendable),
|
|
config: configFunc,
|
|
allowDeltaTemporality: opts.NativeDelta,
|
|
lookbackDelta: opts.LookbackDelta,
|
|
enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels,
|
|
}
|
|
|
|
wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex}
|
|
|
|
if opts.ConvertDelta {
|
|
fac := deltatocumulative.NewFactory()
|
|
set := processor.Settings{
|
|
ID: component.NewID(fac.Type()),
|
|
TelemetrySettings: component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()},
|
|
}
|
|
d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.defaultConsumer)
|
|
if err != nil {
|
|
// fac.CreateMetrics directly calls [deltatocumulativeprocessor.createMetricsProcessor],
|
|
// which only errors if:
|
|
// - cfg.(type) != *Config
|
|
// - telemetry.New fails due to bad set.TelemetrySettings
|
|
//
|
|
// both cannot be the case, as we pass a valid *Config and valid TelemetrySettings.
|
|
// as such, we assume this error to never occur.
|
|
// if it is, our assumptions are broken in which case a panic seems acceptable.
|
|
panic(fmt.Errorf("failed to create metrics processor: %w", err))
|
|
}
|
|
if err := d2c.Start(context.Background(), nil); err != nil {
|
|
// deltatocumulative does not error on start. see above for panic reasoning
|
|
panic(err)
|
|
}
|
|
wh.d2cConsumer = d2c
|
|
}
|
|
|
|
return wh
|
|
}
|
|
|
|
type rwExporter struct {
|
|
logger *slog.Logger
|
|
appendable storage.AppendableV2
|
|
config func() config.Config
|
|
allowDeltaTemporality bool
|
|
lookbackDelta time.Duration
|
|
enableTypeAndUnitLabels bool
|
|
}
|
|
|
|
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
|
otlpCfg := rw.config().OTLPConfig
|
|
app := &remoteWriteAppenderV2{
|
|
AppenderV2: rw.appendable.AppenderV2(ctx),
|
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
|
}
|
|
converter := otlptranslator.NewPrometheusConverter(app)
|
|
annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{
|
|
AddMetricSuffixes: otlpCfg.TranslationStrategy.ShouldAddSuffixes(),
|
|
AllowUTF8: !otlpCfg.TranslationStrategy.ShouldEscape(),
|
|
PromoteResourceAttributes: otlptranslator.NewPromoteResourceAttributes(otlpCfg),
|
|
KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes,
|
|
ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB,
|
|
PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata,
|
|
AllowDeltaTemporality: rw.allowDeltaTemporality,
|
|
LookbackDelta: rw.lookbackDelta,
|
|
EnableTypeAndUnitLabels: rw.enableTypeAndUnitLabels,
|
|
LabelNameUnderscoreSanitization: otlpCfg.LabelNameUnderscoreSanitization,
|
|
LabelNamePreserveMultipleUnderscores: otlpCfg.LabelNamePreserveMultipleUnderscores,
|
|
})
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
_ = app.Rollback()
|
|
return
|
|
}
|
|
err = app.Commit()
|
|
}()
|
|
ws, _ := annots.AsStrings("", 0, 0)
|
|
if len(ws) > 0 {
|
|
rw.logger.Warn("Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (*rwExporter) Capabilities() consumer.Capabilities {
|
|
return consumer.Capabilities{MutatesData: false}
|
|
}
|
|
|
|
type otlpWriteHandler struct {
|
|
logger *slog.Logger
|
|
|
|
defaultConsumer consumer.Metrics // stores deltas as-is
|
|
d2cConsumer consumer.Metrics // converts deltas to cumulative
|
|
}
|
|
|
|
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
req, err := DecodeOTLPWriteRequest(r)
|
|
if err != nil {
|
|
h.logger.Error("Error decoding OTLP write request", "err", err.Error())
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
md := req.Metrics()
|
|
// If deltatocumulative conversion enabled AND delta samples exist, use slower conversion path.
|
|
// While deltatocumulative can also accept cumulative metrics (and then just forwards them as-is), it currently
|
|
// holds a sync.Mutex when entering ConsumeMetrics. This is slow and not necessary when ingesting cumulative metrics.
|
|
if h.d2cConsumer != nil && hasDelta(md) {
|
|
err = h.d2cConsumer.ConsumeMetrics(r.Context(), md)
|
|
} else {
|
|
// Otherwise use default consumer (alongside cumulative samples, this will accept delta samples and write as-is
|
|
// if native-delta-support is enabled).
|
|
err = h.defaultConsumer.ConsumeMetrics(r.Context(), md)
|
|
}
|
|
|
|
switch {
|
|
case err == nil:
|
|
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample):
|
|
// Indicated an out of order sample is a bad request to prevent retries.
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
default:
|
|
h.logger.Error("Error appending remote write", "err", err.Error())
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
func hasDelta(md pmetric.Metrics) bool {
|
|
for i := range md.ResourceMetrics().Len() {
|
|
sms := md.ResourceMetrics().At(i).ScopeMetrics()
|
|
for i := range sms.Len() {
|
|
ms := sms.At(i).Metrics()
|
|
for i := range ms.Len() {
|
|
temporality := pmetric.AggregationTemporalityUnspecified
|
|
m := ms.At(i)
|
|
switch ms.At(i).Type() {
|
|
case pmetric.MetricTypeSum:
|
|
temporality = m.Sum().AggregationTemporality()
|
|
case pmetric.MetricTypeExponentialHistogram:
|
|
temporality = m.ExponentialHistogram().AggregationTemporality()
|
|
case pmetric.MetricTypeHistogram:
|
|
temporality = m.Histogram().AggregationTemporality()
|
|
}
|
|
if temporality == pmetric.AggregationTemporalityDelta {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type otlpInstrumentedAppendable struct {
|
|
storage.AppendableV2
|
|
|
|
samplesAppendedWithoutMetadata prometheus.Counter
|
|
outOfOrderExemplars prometheus.Counter
|
|
}
|
|
|
|
// newOTLPInstrumentedAppendable instruments some OTLP metrics per append and
|
|
// handles partial errors, so the caller does not need to.
|
|
func newOTLPInstrumentedAppendable(reg prometheus.Registerer, app storage.AppendableV2) *otlpInstrumentedAppendable {
|
|
return &otlpInstrumentedAppendable{
|
|
AppendableV2: app,
|
|
samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "api",
|
|
Name: "otlp_appended_samples_without_metadata_total",
|
|
Help: "The total number of samples ingested from OTLP without corresponding metadata.",
|
|
}),
|
|
outOfOrderExemplars: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "api",
|
|
Name: "otlp_out_of_order_exemplars_total",
|
|
Help: "The total number of received OTLP exemplars which were rejected because they were out of order.",
|
|
}),
|
|
}
|
|
}
|
|
|
|
func (a *otlpInstrumentedAppendable) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
|
return &otlpInstrumentedAppender{
|
|
AppenderV2: a.AppendableV2.AppenderV2(ctx),
|
|
|
|
samplesAppendedWithoutMetadata: a.samplesAppendedWithoutMetadata,
|
|
outOfOrderExemplars: a.outOfOrderExemplars,
|
|
}
|
|
}
|
|
|
|
type otlpInstrumentedAppender struct {
|
|
storage.AppenderV2
|
|
|
|
samplesAppendedWithoutMetadata prometheus.Counter
|
|
outOfOrderExemplars prometheus.Counter
|
|
}
|
|
|
|
func (app *otlpInstrumentedAppender) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
|
|
ref, err := app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
|
|
if err != nil {
|
|
var partialErr *storage.AppendPartialError
|
|
partialErr, hErr := partialErr.Handle(err)
|
|
if hErr != nil {
|
|
// Not a partial error, return err.
|
|
return 0, err
|
|
}
|
|
app.outOfOrderExemplars.Add(float64(len(partialErr.ExemplarErrors)))
|
|
// Hide the partial error as otlp converter does not handle it.
|
|
}
|
|
if opts.Metadata.IsEmpty() {
|
|
app.samplesAppendedWithoutMetadata.Inc()
|
|
}
|
|
return ref, nil
|
|
}
|