prometheus/tsdb/agent/db_append_v2.go
Bryan Boreham f1719fa1d4
[BUGFIX] Agent: fix crash from invalid type in pool (#17802)
We have separate pools for Appender and AppenderV2 objects, and must not
put another kind of object into them.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
2026-01-07 14:01:02 +00:00

219 lines
7.1 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"context"
"errors"
"fmt"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
// AppenderV2 implements storage.AppenderV2.
func (db *DB) AppenderV2(context.Context) storage.AppenderV2 {
return db.appenderV2Pool.Get().(storage.AppenderV2)
}
type appenderV2 struct {
appenderBase
}
// Append appends pending sample to agent's DB.
// TODO: Wire metadata in the Agent's appender.
func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
var (
// Avoid shadowing err variables for reliability.
valErr, partialErr error
sampleMetricType = sampleMetricTypeFloat
isStale bool
)
// Fail fast on incorrect histograms.
switch {
case fh != nil:
sampleMetricType = sampleMetricTypeHistogram
valErr = fh.Validate()
case h != nil:
sampleMetricType = sampleMetricTypeHistogram
valErr = h.Validate()
}
if valErr != nil {
return 0, valErr
}
// series references and chunk references are identical for agent mode.
s := a.series.GetByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, err = a.getOrCreate(ls)
if err != nil {
return 0, err
}
}
s.Lock()
lastTS := s.lastTs
s.Unlock()
// TODO(bwplotka): Handle ST natively (as per PROM-60).
if a.opts.EnableSTAsZeroSample && st != 0 {
a.bestEffortAppendSTZeroSample(s, ls, lastTS, st, t, h, fh)
}
if t <= a.minValidTime(lastTS) {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
switch {
case fh != nil:
isStale = value.IsStaleNaN(fh.Sum)
// NOTE: always modify pendingFloatHistograms and floatHistogramSeries together
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: t,
FH: fh,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
case h != nil:
isStale = value.IsStaleNaN(h.Sum)
// NOTE: always modify pendingHistograms and histogramSeries together
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{
Ref: s.ref,
T: t,
H: h,
})
a.histogramSeries = append(a.histogramSeries, s)
default:
isStale = value.IsStaleNaN(v)
// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: s.ref,
T: t,
V: v,
})
a.sampleSeries = append(a.sampleSeries, s)
}
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricType).Inc()
if isStale {
// For stale values we never attempt to process metadata/exemplars, claim the success.
return storage.SeriesRef(s.ref), nil
}
// Append exemplars if any and if storage was configured for it.
// TODO(bwplotka): Agent does not have equivalent of a.head.opts.EnableExemplarStorage && a.head.opts.MaxExemplars.Load() > 0 ?
if len(opts.Exemplars) > 0 {
// Currently only exemplars can return partial errors.
partialErr = a.appendExemplars(s, opts.Exemplars)
}
return storage.SeriesRef(s.ref), partialErr
}
func (a *appenderV2) Commit() error {
defer a.appenderV2Pool.Put(a)
return a.commit()
}
func (a *appenderV2) Rollback() error {
defer a.appenderV2Pool.Put(a)
return a.rollback()
}
func (a *appenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemplar) error {
var errs []error
for _, e := range exemplar {
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()
if err := a.validateExemplar(s.ref, e); err != nil {
if !errors.Is(err, storage.ErrDuplicateExemplar) {
// Except duplicates, return partial errors.
errs = append(errs, err)
continue
}
if !errors.Is(err, storage.ErrOutOfOrderExemplar) {
a.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", e)
}
continue
}
a.series.SetLatestExemplar(s.ref, &e)
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
Ref: s.ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})
a.metrics.totalAppendedExemplars.Inc()
}
if len(errs) > 0 {
return &storage.AppendPartialError{ExemplarErrors: errs}
}
return nil
}
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
//
// ST is an experimental feature, we don't fail the append on errors, just debug log.
func (a *appenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.Labels, lastTS, st, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) {
// NOTE: Use lset instead of s.lset to avoid locking memSeries. Using s.ref is acceptable without locking.
if st >= t {
a.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrSTNewerThanSample)
return
}
if st <= lastTS {
a.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrOutOfOrderST)
return
}
switch {
case fh != nil:
zeroFloatHistogram := &histogram.FloatHistogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
CustomValues: fh.CustomValues,
}
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{Ref: s.ref, T: st, FH: zeroFloatHistogram})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
case h != nil:
zeroHistogram := &histogram.Histogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
CustomValues: h.CustomValues,
}
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{Ref: s.ref, T: st, H: zeroHistogram})
a.histogramSeries = append(a.histogramSeries, s)
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
default:
a.pendingSamples = append(a.pendingSamples, record.RefSample{Ref: s.ref, T: st, V: 0})
a.sampleSeries = append(a.sampleSeries, s)
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
}
}