mirror of
https://github.com/prometheus/prometheus.git
synced 2025-09-21 05:41:01 +02:00
222 lines
8.6 KiB
Go
222 lines
8.6 KiB
Go
// Copyright 2025 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/metadata"
|
|
)
|
|
|
|
// AsAppender exposes new V2 appender implementation as old appender.
|
|
//
|
|
// Because old appender interface allow exemplar, metadata and CT updates to
|
|
// be disjointed from sample and histogram updates, you need extra appender pieces
|
|
// to handle the old behaviour.
|
|
func AsAppender(appender AppenderV2, exemplar ExemplarAppender, metadata MetadataUpdater, ct combinedCTAppender) Appender {
|
|
return &asAppender{app: appender, exemplar: exemplar, metadata: metadata, ct: ct}
|
|
}
|
|
|
|
// For historical reason this interface is not consistent
|
|
// (HistogramAppender has CTZero but Appender does not have for Sample).
|
|
// No need to fix this elsewhere, given we plan to deprecate old interface and those methods.
|
|
type combinedCTAppender interface {
|
|
CreatedTimestampAppender
|
|
AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
|
|
}
|
|
|
|
type asAppender struct {
|
|
opts AppendV2Options
|
|
app AppenderV2
|
|
|
|
exemplar ExemplarAppender
|
|
metadata MetadataUpdater
|
|
ct combinedCTAppender
|
|
}
|
|
|
|
func (a *asAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
|
|
return a.app.AppendSample(ref, l, Metadata{}, 0, t, v, nil)
|
|
}
|
|
|
|
func (a *asAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
|
return a.app.AppendHistogram(ref, l, Metadata{}, 0, t, h, fh, nil)
|
|
}
|
|
|
|
func (a *asAppender) Commit() error {
|
|
return a.app.Commit()
|
|
}
|
|
|
|
func (a *asAppender) Rollback() error {
|
|
return a.app.Rollback()
|
|
}
|
|
|
|
func (a *asAppender) SetOptions(opts *AppendOptions) {
|
|
if opts == nil {
|
|
return
|
|
}
|
|
|
|
a.opts.DiscardOutOfOrder = opts.DiscardOutOfOrder
|
|
a.app.SetOptions(&a.opts)
|
|
}
|
|
|
|
func (a *asAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64) (SeriesRef, error) {
|
|
// We could consider using AppendSample with AppendCTAsZero true, but this option is stateful,
|
|
// so use compatibility ct interface.
|
|
return a.ct.AppendCTZeroSample(ref, l, t, ct)
|
|
}
|
|
|
|
func (a *asAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
|
// We could consider using AppendHistogram with AppendCTAsZero true, but this option is stateful,
|
|
// so use compatibility ct interface.
|
|
return a.ct.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
|
|
}
|
|
|
|
func (a *asAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) {
|
|
return a.exemplar.AppendExemplar(ref, l, e)
|
|
}
|
|
|
|
func (a *asAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
|
|
return a.metadata.UpdateMetadata(ref, l, m)
|
|
}
|
|
|
|
// AsAppenderV2 exposes old appender implementation as a new v2 appender.
|
|
func AsAppenderV2(log *slog.Logger, appender Appender) AppenderV2 {
|
|
if log == nil {
|
|
log = slog.Default()
|
|
}
|
|
return &asAppenderV2{app: appender, log: log}
|
|
}
|
|
|
|
type asAppenderV2 struct {
|
|
app Appender
|
|
opts AppendV2Options
|
|
|
|
log *slog.Logger
|
|
}
|
|
|
|
func (a *asAppenderV2) AppendSample(ref SeriesRef, ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (_ SeriesRef, err error) {
|
|
if ct != 0 && a.opts.AppendCTAsZero {
|
|
ref, err = a.app.AppendCTZeroSample(ref, ls, t, ct)
|
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1787C5-L1791C7
|
|
if err != nil && !errors.Is(err, ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
|
|
// CT is an experimental feature. For now, we don't need to fail the
|
|
// scrape on errors updating the created timestamp, log debug.
|
|
a.log.Debug("error when appending CT in scrape loop", "series", meta, "ct", ct, "t", t, "err", err)
|
|
}
|
|
}
|
|
ref, err = a.app.Append(ref, ls, t, v)
|
|
if err != nil {
|
|
return ref, err
|
|
}
|
|
|
|
outOfOrderExemplars := 0
|
|
for _, e := range es {
|
|
_, exemplarErr := a.app.AppendExemplar(ref, ls, e)
|
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1862C3-L1881C4
|
|
switch {
|
|
case exemplarErr == nil:
|
|
// Do nothing.
|
|
case errors.Is(exemplarErr, ErrOutOfOrderExemplar):
|
|
outOfOrderExemplars++
|
|
default:
|
|
// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
|
|
a.log.Debug("error while adding exemplar in AppendExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
|
|
}
|
|
}
|
|
if outOfOrderExemplars > 0 && outOfOrderExemplars == len(es) {
|
|
// Only report out of order exemplars if all are out of order, otherwise this was a partial update
|
|
// to some existing set of exemplars.
|
|
// TODO: Decide what to do with scrape logic for exemplar error handling.
|
|
// appErrs.numExemplarOutOfOrder += outOfOrderExemplars
|
|
a.log.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", es[len(es)-1]))
|
|
// sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
|
|
}
|
|
|
|
if !meta.IsEmpty() {
|
|
ref, err = a.app.UpdateMetadata(ref, ls, meta.Metadata)
|
|
if err != nil {
|
|
return ref, err
|
|
}
|
|
}
|
|
return ref, err
|
|
}
|
|
|
|
func (a *asAppenderV2) AppendHistogram(ref SeriesRef, ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (_ SeriesRef, err error) {
|
|
if ct != 0 && a.opts.AppendCTAsZero {
|
|
ref, err = a.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, fh)
|
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1787C5-L1791C7
|
|
if err != nil && !errors.Is(err, ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
|
|
// CT is an experimental feature. For now, we don't need to fail the
|
|
// scrape on errors updating the created timestamp, log debug.
|
|
a.log.Debug("error when appending CT in scrape loop", "series", meta, "ct", ct, "t", t, "err", err)
|
|
}
|
|
}
|
|
ref, err = a.app.AppendHistogram(ref, ls, t, h, fh)
|
|
if err != nil {
|
|
return ref, err
|
|
}
|
|
|
|
outOfOrderExemplars := 0
|
|
for _, e := range es {
|
|
_, exemplarErr := a.app.AppendExemplar(ref, ls, e)
|
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1862C3-L1881C4
|
|
switch {
|
|
case exemplarErr == nil:
|
|
// Do nothing.
|
|
case errors.Is(exemplarErr, ErrOutOfOrderExemplar):
|
|
outOfOrderExemplars++
|
|
default:
|
|
// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
|
|
a.log.Debug("error while adding exemplar in AppendExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
|
|
}
|
|
}
|
|
if outOfOrderExemplars > 0 && outOfOrderExemplars == len(es) {
|
|
// Only report out of order exemplars if all are out of order, otherwise this was a partial update
|
|
// to some existing set of exemplars.
|
|
// TODO: Decide what to do with scrape logic for exemplar error handling.
|
|
// appErrs.numExemplarOutOfOrder += outOfOrderExemplars
|
|
a.log.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", es[len(es)-1]))
|
|
// sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
|
|
}
|
|
|
|
if !meta.IsEmpty() {
|
|
ref, err = a.app.UpdateMetadata(ref, ls, meta.Metadata)
|
|
if err != nil {
|
|
return ref, err
|
|
}
|
|
}
|
|
return ref, err
|
|
}
|
|
|
|
func (a *asAppenderV2) Commit() error { return a.app.Commit() }
|
|
|
|
func (a *asAppenderV2) Rollback() error { return a.app.Rollback() }
|
|
|
|
func (a *asAppenderV2) SetOptions(opts *AppendV2Options) {
|
|
if opts == nil {
|
|
return
|
|
}
|
|
a.opts = *opts
|
|
a.app.SetOptions(&AppendOptions{DiscardOutOfOrder: opts.DiscardOutOfOrder})
|
|
}
|