mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-11 02:41:03 +01:00
We have separate pools for Appender and AppenderV2 objects, and must not put another kind of object into them. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
219 lines
7.1 KiB
Go
219 lines
7.1 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/value"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
)
|
|
|
|
// AppenderV2 implements storage.AppenderV2.
|
|
func (db *DB) AppenderV2(context.Context) storage.AppenderV2 {
|
|
return db.appenderV2Pool.Get().(storage.AppenderV2)
|
|
}
|
|
|
|
type appenderV2 struct {
|
|
appenderBase
|
|
}
|
|
|
|
// Append appends pending sample to agent's DB.
|
|
// TODO: Wire metadata in the Agent's appender.
|
|
func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
|
|
var (
|
|
// Avoid shadowing err variables for reliability.
|
|
valErr, partialErr error
|
|
sampleMetricType = sampleMetricTypeFloat
|
|
isStale bool
|
|
)
|
|
// Fail fast on incorrect histograms.
|
|
switch {
|
|
case fh != nil:
|
|
sampleMetricType = sampleMetricTypeHistogram
|
|
valErr = fh.Validate()
|
|
case h != nil:
|
|
sampleMetricType = sampleMetricTypeHistogram
|
|
valErr = h.Validate()
|
|
}
|
|
if valErr != nil {
|
|
return 0, valErr
|
|
}
|
|
|
|
// series references and chunk references are identical for agent mode.
|
|
s := a.series.GetByID(chunks.HeadSeriesRef(ref))
|
|
if s == nil {
|
|
var err error
|
|
s, err = a.getOrCreate(ls)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
s.Lock()
|
|
lastTS := s.lastTs
|
|
s.Unlock()
|
|
|
|
// TODO(bwplotka): Handle ST natively (as per PROM-60).
|
|
if a.opts.EnableSTAsZeroSample && st != 0 {
|
|
a.bestEffortAppendSTZeroSample(s, ls, lastTS, st, t, h, fh)
|
|
}
|
|
|
|
if t <= a.minValidTime(lastTS) {
|
|
a.metrics.totalOutOfOrderSamples.Inc()
|
|
return 0, storage.ErrOutOfOrderSample
|
|
}
|
|
|
|
switch {
|
|
case fh != nil:
|
|
isStale = value.IsStaleNaN(fh.Sum)
|
|
// NOTE: always modify pendingFloatHistograms and floatHistogramSeries together
|
|
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{
|
|
Ref: s.ref,
|
|
T: t,
|
|
FH: fh,
|
|
})
|
|
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
|
|
case h != nil:
|
|
isStale = value.IsStaleNaN(h.Sum)
|
|
// NOTE: always modify pendingHistograms and histogramSeries together
|
|
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{
|
|
Ref: s.ref,
|
|
T: t,
|
|
H: h,
|
|
})
|
|
a.histogramSeries = append(a.histogramSeries, s)
|
|
default:
|
|
isStale = value.IsStaleNaN(v)
|
|
|
|
// NOTE: always modify pendingSamples and sampleSeries together.
|
|
a.pendingSamples = append(a.pendingSamples, record.RefSample{
|
|
Ref: s.ref,
|
|
T: t,
|
|
V: v,
|
|
})
|
|
a.sampleSeries = append(a.sampleSeries, s)
|
|
}
|
|
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricType).Inc()
|
|
if isStale {
|
|
// For stale values we never attempt to process metadata/exemplars, claim the success.
|
|
return storage.SeriesRef(s.ref), nil
|
|
}
|
|
|
|
// Append exemplars if any and if storage was configured for it.
|
|
// TODO(bwplotka): Agent does not have equivalent of a.head.opts.EnableExemplarStorage && a.head.opts.MaxExemplars.Load() > 0 ?
|
|
if len(opts.Exemplars) > 0 {
|
|
// Currently only exemplars can return partial errors.
|
|
partialErr = a.appendExemplars(s, opts.Exemplars)
|
|
}
|
|
return storage.SeriesRef(s.ref), partialErr
|
|
}
|
|
|
|
func (a *appenderV2) Commit() error {
|
|
defer a.appenderV2Pool.Put(a)
|
|
return a.commit()
|
|
}
|
|
|
|
func (a *appenderV2) Rollback() error {
|
|
defer a.appenderV2Pool.Put(a)
|
|
return a.rollback()
|
|
}
|
|
|
|
func (a *appenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemplar) error {
|
|
var errs []error
|
|
for _, e := range exemplar {
|
|
// Ensure no empty labels have gotten through.
|
|
e.Labels = e.Labels.WithoutEmpty()
|
|
|
|
if err := a.validateExemplar(s.ref, e); err != nil {
|
|
if !errors.Is(err, storage.ErrDuplicateExemplar) {
|
|
// Except duplicates, return partial errors.
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
if !errors.Is(err, storage.ErrOutOfOrderExemplar) {
|
|
a.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", e)
|
|
}
|
|
continue
|
|
}
|
|
|
|
a.series.SetLatestExemplar(s.ref, &e)
|
|
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
|
|
Ref: s.ref,
|
|
T: e.Ts,
|
|
V: e.Value,
|
|
Labels: e.Labels,
|
|
})
|
|
a.metrics.totalAppendedExemplars.Inc()
|
|
}
|
|
if len(errs) > 0 {
|
|
return &storage.AppendPartialError{ExemplarErrors: errs}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
|
|
// is implemented.
|
|
//
|
|
// ST is an experimental feature, we don't fail the append on errors, just debug log.
|
|
func (a *appenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.Labels, lastTS, st, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) {
|
|
// NOTE: Use lset instead of s.lset to avoid locking memSeries. Using s.ref is acceptable without locking.
|
|
if st >= t {
|
|
a.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrSTNewerThanSample)
|
|
return
|
|
}
|
|
if st <= lastTS {
|
|
a.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrOutOfOrderST)
|
|
return
|
|
}
|
|
|
|
switch {
|
|
case fh != nil:
|
|
zeroFloatHistogram := &histogram.FloatHistogram{
|
|
// The STZeroSample represents a counter reset by definition.
|
|
CounterResetHint: histogram.CounterReset,
|
|
// Replicate other fields to avoid needless chunk creation.
|
|
Schema: fh.Schema,
|
|
ZeroThreshold: fh.ZeroThreshold,
|
|
CustomValues: fh.CustomValues,
|
|
}
|
|
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{Ref: s.ref, T: st, FH: zeroFloatHistogram})
|
|
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
|
|
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
|
|
case h != nil:
|
|
zeroHistogram := &histogram.Histogram{
|
|
// The STZeroSample represents a counter reset by definition.
|
|
CounterResetHint: histogram.CounterReset,
|
|
// Replicate other fields to avoid needless chunk creation.
|
|
Schema: h.Schema,
|
|
ZeroThreshold: h.ZeroThreshold,
|
|
CustomValues: h.CustomValues,
|
|
}
|
|
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{Ref: s.ref, T: st, H: zeroHistogram})
|
|
a.histogramSeries = append(a.histogramSeries, s)
|
|
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
|
|
default:
|
|
a.pendingSamples = append(a.pendingSamples, record.RefSample{Ref: s.ref, T: st, V: 0})
|
|
a.sampleSeries = append(a.sampleSeries, s)
|
|
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
|
|
}
|
|
}
|