mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-10 18:31:12 +01:00
refactor(scrape)[PART2]: simplified scrapeLoop constructors & tests; add teststorage.Appendable mock (#17631)
* refactor(scrape): simplified scrapeLoop constructors & tests; add teststorage.Appender mock Signed-off-by: bwplotka <bwplotka@gmail.com> debug * refactor(scrape): simplified newLoop even more Signed-off-by: bwplotka <bwplotka@gmail.com> * refactor(scrape): rename sl -> app, slApp -> app Signed-off-by: bwplotka <bwplotka@gmail.com> * fix TestScrapeLoopRun flakiness Signed-off-by: bwplotka <bwplotka@gmail.com> * fix lint Signed-off-by: bwplotka <bwplotka@gmail.com> * kill unused listSeriesSet code Signed-off-by: bwplotka <bwplotka@gmail.com> * fix closing to not panic Signed-off-by: bwplotka <bwplotka@gmail.com> * added extra benchmark for scrapeAndReport Signed-off-by: bwplotka <bwplotka@gmail.com> * added extra benchmark for restartLoops Signed-off-by: bwplotka <bwplotka@gmail.com> * addressed last comments Signed-off-by: bwplotka <bwplotka@gmail.com> * fix TestConcurrentAppender_ReturnsErrAppender naming Signed-off-by: bwplotka <bwplotka@gmail.com> * addressed small comments Signed-off-by: bwplotka <bwplotka@gmail.com> * refactor(scrape): ensure scrape config is reloaded; added test Signed-off-by: bwplotka <bwplotka@gmail.com> * addressed comments. Signed-off-by: bwplotka <bwplotka@gmail.com> --------- Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
e4b6d443fc
commit
17e06dbab5
@ -1022,7 +1022,7 @@ func ToEscapingScheme(s string, v model.ValidationScheme) (model.EscapingScheme,
|
||||
case model.LegacyValidation:
|
||||
return model.UnderscoreEscaping, nil
|
||||
case model.UnsetValidation:
|
||||
return model.NoEscaping, fmt.Errorf("v is unset: %s", v)
|
||||
return model.NoEscaping, fmt.Errorf("ValidationScheme is unset: %s", v)
|
||||
default:
|
||||
panic(fmt.Errorf("unhandled validation scheme: %s", v))
|
||||
}
|
||||
|
||||
@ -484,7 +484,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte
|
||||
// supposed to be used according to the schema.
|
||||
func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool {
|
||||
if h2 == nil {
|
||||
return false
|
||||
return h == nil
|
||||
}
|
||||
|
||||
if h.Schema != h2.Schema ||
|
||||
|
||||
@ -247,7 +247,7 @@ func (h *Histogram) CumulativeBucketIterator() BucketIterator[uint64] {
|
||||
// supposed to be used according to the schema.
|
||||
func (h *Histogram) Equals(h2 *Histogram) bool {
|
||||
if h2 == nil {
|
||||
return false
|
||||
return h == nil
|
||||
}
|
||||
|
||||
if h.Schema != h2.Schema || h.Count != h2.Count ||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@ -17,240 +17,127 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promslog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/pool"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
||||
type nopAppendable struct{}
|
||||
// For readability.
|
||||
type sample = teststorage.Sample
|
||||
|
||||
func (nopAppendable) Appender(context.Context) storage.Appender {
|
||||
return nopAppender{}
|
||||
}
|
||||
|
||||
type nopAppender struct{}
|
||||
|
||||
func (nopAppender) SetOptions(*storage.AppendOptions) {}
|
||||
|
||||
func (nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
return 2, nil
|
||||
}
|
||||
|
||||
func (nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
return 3, nil
|
||||
}
|
||||
|
||||
func (nopAppender) AppendHistogramSTZeroSample(storage.SeriesRef, labels.Labels, int64, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
|
||||
return 4, nil
|
||||
}
|
||||
|
||||
func (nopAppender) AppendSTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
|
||||
return 5, nil
|
||||
}
|
||||
|
||||
func (nopAppender) Commit() error { return nil }
|
||||
func (nopAppender) Rollback() error { return nil }
|
||||
|
||||
type floatSample struct {
|
||||
metric labels.Labels
|
||||
t int64
|
||||
f float64
|
||||
}
|
||||
|
||||
func equalFloatSamples(a, b floatSample) bool {
|
||||
// Compare Float64bits so NaN values which are exactly the same will compare equal.
|
||||
return labels.Equal(a.metric, b.metric) && a.t == b.t && math.Float64bits(a.f) == math.Float64bits(b.f)
|
||||
}
|
||||
|
||||
type histogramSample struct {
|
||||
metric labels.Labels
|
||||
t int64
|
||||
h *histogram.Histogram
|
||||
fh *histogram.FloatHistogram
|
||||
}
|
||||
|
||||
type metadataEntry struct {
|
||||
m metadata.Metadata
|
||||
metric labels.Labels
|
||||
}
|
||||
|
||||
func metadataEntryEqual(a, b metadataEntry) bool {
|
||||
if !labels.Equal(a.metric, b.metric) {
|
||||
return false
|
||||
func withCtx(ctx context.Context) func(sl *scrapeLoop) {
|
||||
return func(sl *scrapeLoop) {
|
||||
sl.ctx = ctx
|
||||
}
|
||||
if a.m.Type != b.m.Type {
|
||||
return false
|
||||
}
|
||||
if a.m.Unit != b.m.Unit {
|
||||
return false
|
||||
}
|
||||
if a.m.Help != b.m.Help {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type collectResultAppendable struct {
|
||||
*collectResultAppender
|
||||
func withAppendable(appendable storage.Appendable) func(sl *scrapeLoop) {
|
||||
return func(sl *scrapeLoop) {
|
||||
sl.appendable = appendable
|
||||
}
|
||||
}
|
||||
|
||||
func (a *collectResultAppendable) Appender(context.Context) storage.Appender {
|
||||
return a
|
||||
// newTestScrapeLoop is the initial scrape loop for all tests.
|
||||
// It returns scrapeLoop and mock scraper you can customize.
|
||||
//
|
||||
// It's recommended to use withXYZ functions for simple option customizations, e.g:
|
||||
//
|
||||
// appTest := teststorage.NewAppendable()
|
||||
// sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
|
||||
//
|
||||
// However, when changing more than one scrapeLoop options it's more readable to have one explicit opt function:
|
||||
//
|
||||
// ctx, cancel := context.WithCancel(t.Context())
|
||||
// appTest := teststorage.NewAppendable()
|
||||
// sl, scraper := newTestScrapeLoop(t, func(sl *scrapeLoop) {
|
||||
// sl.ctx = ctx
|
||||
// sl.appendable = appTest
|
||||
// // Since we're writing samples directly below we need to provide a protocol fallback.
|
||||
// sl.fallbackScrapeProtocol = "text/plain"
|
||||
// })
|
||||
//
|
||||
// NOTE: Try to NOT add more parameter to this function. Try to NOT add more
|
||||
// newTestScrapeLoop-like constructors. It should be flexible enough with scrapeLoop
|
||||
// used for initial options.
|
||||
func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoop, scraper *testScraper) {
|
||||
metrics := newTestScrapeMetrics(t)
|
||||
sl := &scrapeLoop{
|
||||
stopped: make(chan struct{}),
|
||||
|
||||
l: promslog.NewNopLogger(),
|
||||
cache: newScrapeCache(metrics),
|
||||
|
||||
interval: 10 * time.Millisecond,
|
||||
timeout: 1 * time.Hour,
|
||||
sampleMutator: nopMutator,
|
||||
reportSampleMutator: nopMutator,
|
||||
|
||||
appendable: teststorage.NewAppendable(),
|
||||
buffers: pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) }),
|
||||
metrics: metrics,
|
||||
maxSchema: histogram.ExponentialSchemaMax,
|
||||
honorTimestamps: true,
|
||||
enableCompression: true,
|
||||
validationScheme: model.UTF8Validation,
|
||||
symbolTable: labels.NewSymbolTable(),
|
||||
appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off.
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(sl)
|
||||
}
|
||||
// Validate user opts for convenience.
|
||||
require.Nil(t, sl.parentCtx, "newTestScrapeLoop does not support injecting non-nil parent context")
|
||||
require.Nil(t, sl.appenderCtx, "newTestScrapeLoop does not support injecting non-nil appender context")
|
||||
require.Nil(t, sl.cancel, "newTestScrapeLoop does not support injecting custom cancel function")
|
||||
require.Nil(t, sl.scraper, "newTestScrapeLoop does not support injecting scraper, it's mocked, use the returned scraper")
|
||||
|
||||
rootCtx := t.Context()
|
||||
// Use sl.ctx for context injection.
|
||||
// True contexts (sl.appenderCtx, sl.parentCtx, sl.ctx) are populated from it
|
||||
if sl.ctx != nil {
|
||||
rootCtx = sl.ctx
|
||||
}
|
||||
ctx, cancel := context.WithCancel(rootCtx)
|
||||
sl.ctx = ctx
|
||||
sl.cancel = cancel
|
||||
sl.appenderCtx = rootCtx
|
||||
sl.parentCtx = rootCtx
|
||||
|
||||
scraper = &testScraper{}
|
||||
sl.scraper = scraper
|
||||
return sl, scraper
|
||||
}
|
||||
|
||||
// collectResultAppender records all samples that were added through the appender.
|
||||
// It can be used as its zero value or be backed by another appender it writes samples through.
|
||||
type collectResultAppender struct {
|
||||
mtx sync.Mutex
|
||||
func newTestScrapePool(t *testing.T, injectNewLoop func(options scrapeLoopOptions) loop) *scrapePool {
|
||||
return &scrapePool{
|
||||
ctx: t.Context(),
|
||||
cancel: func() {},
|
||||
logger: promslog.NewNopLogger(),
|
||||
config: &config.ScrapeConfig{},
|
||||
options: &Options{},
|
||||
client: http.DefaultClient,
|
||||
|
||||
next storage.Appender
|
||||
resultFloats []floatSample
|
||||
pendingFloats []floatSample
|
||||
rolledbackFloats []floatSample
|
||||
resultHistograms []histogramSample
|
||||
pendingHistograms []histogramSample
|
||||
rolledbackHistograms []histogramSample
|
||||
resultExemplars []exemplar.Exemplar
|
||||
pendingExemplars []exemplar.Exemplar
|
||||
resultMetadata []metadataEntry
|
||||
pendingMetadata []metadataEntry
|
||||
}
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
injectTestNewLoop: injectNewLoop,
|
||||
|
||||
func (*collectResultAppender) SetOptions(*storage.AppendOptions) {}
|
||||
|
||||
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
a.pendingFloats = append(a.pendingFloats, floatSample{
|
||||
metric: lset,
|
||||
t: t,
|
||||
f: v,
|
||||
})
|
||||
|
||||
if a.next == nil {
|
||||
if ref == 0 {
|
||||
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
|
||||
ref = storage.SeriesRef(lset.Hash())
|
||||
}
|
||||
return ref, nil
|
||||
appendable: teststorage.NewAppendable(),
|
||||
symbolTable: labels.NewSymbolTable(),
|
||||
metrics: newTestScrapeMetrics(t),
|
||||
}
|
||||
|
||||
ref, err := a.next.Append(ref, lset, t, v)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
a.pendingExemplars = append(a.pendingExemplars, e)
|
||||
if a.next == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return a.next.AppendExemplar(ref, l, e)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l})
|
||||
if a.next == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return a.next.AppendHistogram(ref, l, t, h, fh)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
if h != nil {
|
||||
return a.AppendHistogram(ref, l, st, &histogram.Histogram{}, nil)
|
||||
}
|
||||
return a.AppendHistogram(ref, l, st, nil, &histogram.FloatHistogram{})
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
a.pendingMetadata = append(a.pendingMetadata, metadataEntry{metric: l, m: m})
|
||||
if a.next == nil {
|
||||
if ref == 0 {
|
||||
ref = storage.SeriesRef(l.Hash())
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
return a.next.UpdateMetadata(ref, l, m)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64) (storage.SeriesRef, error) {
|
||||
return a.Append(ref, l, st, 0.0)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Commit() error {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
a.resultFloats = append(a.resultFloats, a.pendingFloats...)
|
||||
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
|
||||
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
|
||||
a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...)
|
||||
a.pendingFloats = nil
|
||||
a.pendingExemplars = nil
|
||||
a.pendingHistograms = nil
|
||||
a.pendingMetadata = nil
|
||||
if a.next == nil {
|
||||
return nil
|
||||
}
|
||||
return a.next.Commit()
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Rollback() error {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
a.rolledbackFloats = a.pendingFloats
|
||||
a.rolledbackHistograms = a.pendingHistograms
|
||||
a.pendingFloats = nil
|
||||
a.pendingHistograms = nil
|
||||
if a.next == nil {
|
||||
return nil
|
||||
}
|
||||
return a.next.Rollback()
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) String() string {
|
||||
var sb strings.Builder
|
||||
for _, s := range a.resultFloats {
|
||||
sb.WriteString(fmt.Sprintf("committed: %s %f %d\n", s.metric, s.f, s.t))
|
||||
}
|
||||
for _, s := range a.pendingFloats {
|
||||
sb.WriteString(fmt.Sprintf("pending: %s %f %d\n", s.metric, s.f, s.t))
|
||||
}
|
||||
for _, s := range a.rolledbackFloats {
|
||||
sb.WriteString(fmt.Sprintf("rolledback: %s %f %d\n", s.metric, s.f, s.t))
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// protoMarshalDelimited marshals a MetricFamily into a delimited
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@ -39,8 +39,8 @@ import (
|
||||
"github.com/prometheus/prometheus/util/pool"
|
||||
)
|
||||
|
||||
// NewManager is the Manager constructor.
|
||||
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
|
||||
// NewManager is the Manager constructor using Appendable.
|
||||
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), appendable storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
|
||||
if o == nil {
|
||||
o = &Options{}
|
||||
}
|
||||
@ -54,7 +54,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
append: app,
|
||||
appendable: appendable,
|
||||
opts: o,
|
||||
logger: logger,
|
||||
newScrapeFailureLogger: newScrapeFailureLogger,
|
||||
@ -87,15 +87,15 @@ type Options struct {
|
||||
// Option to enable appending of scraped Metadata to the TSDB/other appenders. Individual appenders
|
||||
// can decide what to do with metadata, but for practical purposes this flag exists so that metadata
|
||||
// can be written to the WAL and thus read for remote write.
|
||||
// TODO: implement some form of metadata storage
|
||||
AppendMetadata bool
|
||||
// Option to increase the interval used by scrape manager to throttle target groups updates.
|
||||
DiscoveryReloadInterval model.Duration
|
||||
|
||||
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
|
||||
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
|
||||
EnableStartTimestampZeroIngestion bool
|
||||
|
||||
// EnableTypeAndUnitLabels
|
||||
// EnableTypeAndUnitLabels represents type-and-unit-labels feature flag.
|
||||
EnableTypeAndUnitLabels bool
|
||||
|
||||
// Optional HTTP client options to use when scraping.
|
||||
@ -111,9 +111,11 @@ type Options struct {
|
||||
// Manager maintains a set of scrape pools and manages start/stop cycles
|
||||
// when receiving new target groups from the discovery manager.
|
||||
type Manager struct {
|
||||
opts *Options
|
||||
logger *slog.Logger
|
||||
append storage.Appendable
|
||||
opts *Options
|
||||
logger *slog.Logger
|
||||
|
||||
appendable storage.Appendable
|
||||
|
||||
graceShut chan struct{}
|
||||
|
||||
offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup.
|
||||
@ -194,7 +196,7 @@ func (m *Manager) reload() {
|
||||
continue
|
||||
}
|
||||
m.metrics.targetScrapePools.Inc()
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
|
||||
sp, err := newScrapePool(scrapeConfig, m.appendable, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
|
||||
if err != nil {
|
||||
m.metrics.targetScrapePoolsFailed.Inc()
|
||||
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@ -51,6 +51,7 @@ import (
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/util/runutil"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
@ -527,21 +528,12 @@ scrape_configs:
|
||||
ch <- struct{}{}
|
||||
return noopLoop()
|
||||
}
|
||||
sp := &scrapePool{
|
||||
appendable: &nopAppendable{},
|
||||
activeTargets: map[uint64]*Target{
|
||||
1: {},
|
||||
},
|
||||
loops: map[uint64]loop{
|
||||
1: noopLoop(),
|
||||
},
|
||||
newLoop: newLoop,
|
||||
logger: nil,
|
||||
config: cfg1.ScrapeConfigs[0],
|
||||
client: http.DefaultClient,
|
||||
metrics: scrapeManager.metrics,
|
||||
symbolTable: labels.NewSymbolTable(),
|
||||
}
|
||||
sp := newTestScrapePool(t, newLoop)
|
||||
sp.activeTargets[1] = &Target{}
|
||||
sp.loops[1] = noopLoop()
|
||||
sp.config = cfg1.ScrapeConfigs[0]
|
||||
sp.metrics = scrapeManager.metrics
|
||||
|
||||
scrapeManager.scrapePools = map[string]*scrapePool{
|
||||
"job1": sp,
|
||||
}
|
||||
@ -691,18 +683,11 @@ scrape_configs:
|
||||
for _, sc := range cfg.ScrapeConfigs {
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
sp := &scrapePool{
|
||||
appendable: &nopAppendable{},
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{
|
||||
1: noopLoop(),
|
||||
},
|
||||
newLoop: newLoop,
|
||||
logger: nil,
|
||||
config: sc,
|
||||
client: http.DefaultClient,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
sp := newTestScrapePool(t, newLoop)
|
||||
sp.loops[1] = noopLoop()
|
||||
sp.config = cfg1.ScrapeConfigs[0]
|
||||
sp.metrics = scrapeManager.metrics
|
||||
for _, c := range sc.ServiceDiscoveryConfigs {
|
||||
staticConfig := c.(discovery.StaticConfig)
|
||||
for _, group := range staticConfig {
|
||||
@ -764,7 +749,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
|
||||
for _, testWithST := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("withST=%v", testWithST), func(t *testing.T) {
|
||||
for _, testSTZeroIngest := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("ctZeroIngest=%v", testSTZeroIngest), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("stZeroIngest=%v", testSTZeroIngest), func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@ -777,11 +762,11 @@ func TestManagerSTZeroIngestion(t *testing.T) {
|
||||
// TODO(bwplotka): Add more types than just counter?
|
||||
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs)
|
||||
|
||||
app := &collectResultAppender{}
|
||||
app := teststorage.NewAppendable()
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: testSTZeroIngest,
|
||||
skipOffsetting: true,
|
||||
}, &collectResultAppendable{app})
|
||||
}, app)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)
|
||||
@ -806,11 +791,8 @@ scrape_configs:
|
||||
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
// Check if scrape happened and grab the relevant samples.
|
||||
if len(app.resultFloats) > 0 {
|
||||
if len(app.ResultSamples()) > 0 {
|
||||
return nil
|
||||
}
|
||||
return errors.New("expected some float samples, got none")
|
||||
@ -818,32 +800,32 @@ scrape_configs:
|
||||
|
||||
// Verify results.
|
||||
// Verify what we got vs expectations around ST injection.
|
||||
samples := findSamplesForMetric(app.resultFloats, expectedMetricName)
|
||||
got := findSamplesForMetric(app.ResultSamples(), expectedMetricName)
|
||||
if testWithST && testSTZeroIngest {
|
||||
require.Len(t, samples, 2)
|
||||
require.Equal(t, 0.0, samples[0].f)
|
||||
require.Equal(t, timestamp.FromTime(stTs), samples[0].t)
|
||||
require.Equal(t, expectedSampleValue, samples[1].f)
|
||||
require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t)
|
||||
require.Len(t, got, 2)
|
||||
require.Equal(t, 0.0, got[0].V)
|
||||
require.Equal(t, timestamp.FromTime(stTs), got[0].T)
|
||||
require.Equal(t, expectedSampleValue, got[1].V)
|
||||
require.Equal(t, timestamp.FromTime(sampleTs), got[1].T)
|
||||
} else {
|
||||
require.Len(t, samples, 1)
|
||||
require.Equal(t, expectedSampleValue, samples[0].f)
|
||||
require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t)
|
||||
require.Len(t, got, 1)
|
||||
require.Equal(t, expectedSampleValue, got[0].V)
|
||||
require.Equal(t, timestamp.FromTime(sampleTs), got[0].T)
|
||||
}
|
||||
|
||||
// Verify what we got vs expectations around additional _created series for OM text.
|
||||
// enableSTZeroInjection also kills that _created line.
|
||||
createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName)
|
||||
gotSTSeries := findSamplesForMetric(app.ResultSamples(), expectedCreatedMetricName)
|
||||
if testFormat == config.OpenMetricsText1_0_0 && testWithST && !testSTZeroIngest {
|
||||
// For OM Text, when counter has ST, and feature flag disabled we should see _created lines.
|
||||
require.Len(t, createdSeriesSamples, 1)
|
||||
require.Len(t, gotSTSeries, 1)
|
||||
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
|
||||
// We don't check the st timestamp as explicit ts was not implemented in expfmt.Encoder,
|
||||
// but exists in OM https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
|
||||
// We can implement this, but we want to potentially get rid of OM 1.0 ST lines
|
||||
require.Equal(t, float64(timestamppb.New(stTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f)
|
||||
require.Equal(t, float64(timestamppb.New(stTs).AsTime().UnixNano())/1e9, gotSTSeries[0].V)
|
||||
} else {
|
||||
require.Empty(t, createdSeriesSamples)
|
||||
require.Empty(t, gotSTSeries)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -885,9 +867,9 @@ func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName
|
||||
}
|
||||
}
|
||||
|
||||
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
|
||||
func findSamplesForMetric(floats []sample, metricName string) (ret []sample) {
|
||||
for _, f := range floats {
|
||||
if f.metric.Get(model.MetricNameLabel) == metricName {
|
||||
if f.L.Get(model.MetricNameLabel) == metricName {
|
||||
ret = append(ret, f)
|
||||
}
|
||||
}
|
||||
@ -964,11 +946,11 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
app := &collectResultAppender{}
|
||||
app := teststorage.NewAppendable()
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
|
||||
skipOffsetting: true,
|
||||
}, &collectResultAppendable{app})
|
||||
}, app)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
once := sync.Once{}
|
||||
@ -1012,43 +994,33 @@ scrape_configs:
|
||||
`, serverURL.Host)
|
||||
applyConfig(t, testConfig, scrapeManager, discoveryManager)
|
||||
|
||||
var got []histogramSample
|
||||
|
||||
// Wait for one scrape.
|
||||
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
// Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug
|
||||
// and it's not worth waiting.
|
||||
for _, h := range app.resultHistograms {
|
||||
if h.metric.Get(model.MetricNameLabel) == mName {
|
||||
got = append(got, h)
|
||||
}
|
||||
}
|
||||
if len(app.resultHistograms) > 0 {
|
||||
if len(app.ResultSamples()) > 0 {
|
||||
return nil
|
||||
}
|
||||
return errors.New("expected some histogram samples, got none")
|
||||
}), "after 1 minute")
|
||||
|
||||
got := findSamplesForMetric(app.ResultSamples(), mName)
|
||||
|
||||
// Check for zero samples, assuming we only injected always one histogram sample.
|
||||
// Did it contain ST to inject? If yes, was ST zero enabled?
|
||||
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableSTZeroIngestion {
|
||||
require.Len(t, got, 2)
|
||||
// Zero sample.
|
||||
require.Equal(t, histogram.Histogram{}, *got[0].h)
|
||||
require.Equal(t, histogram.Histogram{}, *got[0].H)
|
||||
// Quick soft check to make sure it's the same sample or at least not zero.
|
||||
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum)
|
||||
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].H.Sum)
|
||||
return
|
||||
}
|
||||
|
||||
// Expect only one, valid sample.
|
||||
require.Len(t, got, 1)
|
||||
// Quick soft check to make sure it's the same sample or at least not zero.
|
||||
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum)
|
||||
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].H.Sum)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1083,11 +1055,11 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
|
||||
|
||||
ctx := t.Context()
|
||||
|
||||
app := &collectResultAppender{}
|
||||
app := teststorage.NewAppendable()
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: true,
|
||||
skipOffsetting: true,
|
||||
}, &collectResultAppendable{app})
|
||||
}, app)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
once := sync.Once{}
|
||||
@ -1146,33 +1118,19 @@ scrape_configs:
|
||||
return exists
|
||||
}, 5*time.Second, 100*time.Millisecond, "scrape pool should be created for job 'test'")
|
||||
|
||||
// Helper function to get matching histograms to avoid race conditions.
|
||||
getMatchingHistograms := func() []histogramSample {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
var got []histogramSample
|
||||
for _, h := range app.resultHistograms {
|
||||
if h.metric.Get(model.MetricNameLabel) == mName {
|
||||
got = append(got, h)
|
||||
}
|
||||
}
|
||||
return got
|
||||
}
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(getMatchingHistograms()) > 0
|
||||
return len(app.ResultSamples()) > 0
|
||||
}, 1*time.Minute, 100*time.Millisecond, "expected histogram samples, got none")
|
||||
|
||||
// Verify that samples were ingested (proving both features work together).
|
||||
got := getMatchingHistograms()
|
||||
got := findSamplesForMetric(app.ResultSamples(), mName)
|
||||
|
||||
// With ST zero ingestion enabled and a created timestamp present, we expect 2 samples:
|
||||
// one zero sample and one actual sample.
|
||||
require.Len(t, got, 2, "expected 2 histogram samples (zero sample + actual sample)")
|
||||
require.Equal(t, histogram.Histogram{}, *got[0].h, "first sample should be zero sample")
|
||||
require.InDelta(t, expectedHistogramSum, got[1].h.Sum, 1e-9, "second sample should retain the expected sum")
|
||||
require.Len(t, app.resultExemplars, 2, "expected 2 exemplars from histogram buckets")
|
||||
require.Equal(t, histogram.Histogram{}, *got[0].H, "first sample should be zero sample")
|
||||
require.InDelta(t, expectedHistogramSum, got[1].H.Sum, 1e-9, "second sample should retain the expected sum")
|
||||
require.Len(t, got[1].ES, 2, "expected 2 exemplars on second histogram")
|
||||
}
|
||||
|
||||
func applyConfig(
|
||||
@ -1203,7 +1161,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
|
||||
}
|
||||
opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond)
|
||||
if app == nil {
|
||||
app = nopAppendable{}
|
||||
app = teststorage.NewAppendable()
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
@ -1601,7 +1559,7 @@ scrape_configs:
|
||||
|
||||
cfg := loadConfiguration(t, cfgText)
|
||||
|
||||
m, err := NewManager(&Options{}, nil, nil, &nopAppendable{}, prometheus.NewRegistry())
|
||||
m, err := NewManager(&Options{}, nil, nil, teststorage.NewAppendable(), prometheus.NewRegistry())
|
||||
require.NoError(t, err)
|
||||
defer m.Stop()
|
||||
require.NoError(t, m.ApplyConfig(cfg))
|
||||
|
||||
669
scrape/scrape.go
669
scrape/scrape.go
@ -1,4 +1,4 @@
|
||||
// Copyright 2016 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@ -59,6 +59,8 @@ import (
|
||||
"github.com/prometheus/prometheus/util/pool"
|
||||
)
|
||||
|
||||
var aOptionRejectEarlyOOO = storage.AppendOptions{DiscardOutOfOrder: true}
|
||||
|
||||
// ScrapeTimestampTolerance is the tolerance for scrape appends timestamps
|
||||
// alignment, to enable better compression at the TSDB level.
|
||||
// See https://github.com/prometheus/prometheus/issues/7846
|
||||
@ -67,7 +69,7 @@ var ScrapeTimestampTolerance = 2 * time.Millisecond
|
||||
// AlignScrapeTimestamps enables the tolerance for scrape appends timestamps described above.
|
||||
var AlignScrapeTimestamps = true
|
||||
|
||||
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName)
|
||||
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", model.MetricNameLabel)
|
||||
|
||||
var _ FailureLogger = (*logging.JSONFileLogger)(nil)
|
||||
|
||||
@ -82,8 +84,9 @@ type FailureLogger interface {
|
||||
type scrapePool struct {
|
||||
appendable storage.Appendable
|
||||
logger *slog.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
httpOpts []config_util.HTTPClientOption
|
||||
options *Options
|
||||
|
||||
// mtx must not be taken after targetMtx.
|
||||
mtx sync.Mutex
|
||||
@ -102,16 +105,15 @@ type scrapePool struct {
|
||||
droppedTargets []*Target // Subject to KeepDroppedTargets limit.
|
||||
droppedTargetsCount int // Count of all dropped targets.
|
||||
|
||||
// Constructor for new scrape loops. This is settable for testing convenience.
|
||||
newLoop func(scrapeLoopOptions) loop
|
||||
// newLoop injection for testing purposes.
|
||||
injectTestNewLoop func(scrapeLoopOptions) loop
|
||||
|
||||
metrics *scrapeMetrics
|
||||
metrics *scrapeMetrics
|
||||
buffers *pool.Pool
|
||||
offsetSeed uint64
|
||||
|
||||
scrapeFailureLogger FailureLogger
|
||||
scrapeFailureLoggerMtx sync.RWMutex
|
||||
|
||||
validationScheme model.ValidationScheme
|
||||
escapingScheme model.EscapingScheme
|
||||
}
|
||||
|
||||
type labelLimits struct {
|
||||
@ -120,118 +122,80 @@ type labelLimits struct {
|
||||
labelValueLengthLimit int
|
||||
}
|
||||
|
||||
type scrapeLoopOptions struct {
|
||||
target *Target
|
||||
scraper scraper
|
||||
sampleLimit int
|
||||
bucketLimit int
|
||||
maxSchema int32
|
||||
labelLimits *labelLimits
|
||||
honorLabels bool
|
||||
honorTimestamps bool
|
||||
trackTimestampsStaleness bool
|
||||
interval time.Duration
|
||||
timeout time.Duration
|
||||
scrapeNativeHist bool
|
||||
alwaysScrapeClassicHist bool
|
||||
convertClassicHistToNHCB bool
|
||||
fallbackScrapeProtocol string
|
||||
|
||||
mrc []*relabel.Config
|
||||
cache *scrapeCache
|
||||
enableCompression bool
|
||||
}
|
||||
|
||||
const maxAheadTime = 10 * time.Minute
|
||||
|
||||
// returning an empty label set is interpreted as "drop".
|
||||
type labelsMutator func(labels.Labels) labels.Labels
|
||||
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
|
||||
// scrapeLoopAppendAdapter allows support for multiple storage.Appender versions.
|
||||
type scrapeLoopAppendAdapter interface {
|
||||
Commit() error
|
||||
Rollback() error
|
||||
|
||||
addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) error
|
||||
append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error)
|
||||
}
|
||||
|
||||
func newScrapePool(
|
||||
cfg *config.ScrapeConfig,
|
||||
appendable storage.Appendable,
|
||||
offsetSeed uint64,
|
||||
logger *slog.Logger,
|
||||
buffers *pool.Pool,
|
||||
options *Options,
|
||||
metrics *scrapeMetrics,
|
||||
) (*scrapePool, error) {
|
||||
if logger == nil {
|
||||
logger = promslog.NewNopLogger()
|
||||
}
|
||||
if buffers == nil {
|
||||
buffers = pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) })
|
||||
}
|
||||
|
||||
client, err := newScrapeClient(cfg.HTTPClientConfig, cfg.JobName, options.HTTPClientOptions...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Validate scheme so we don't need to do it later.
|
||||
// We also do it on scrapePool.reload(...)
|
||||
// TODO(bwplotka): Can we move it to scrape config validation?
|
||||
if err := namevalidationutil.CheckNameValidationScheme(cfg.MetricNameValidationScheme); err != nil {
|
||||
return nil, errors.New("newScrapePool: MetricNameValidationScheme must be set in scrape configuration")
|
||||
}
|
||||
var escapingScheme model.EscapingScheme
|
||||
escapingScheme, err = config.ToEscapingScheme(cfg.MetricNameEscapingScheme, cfg.MetricNameValidationScheme)
|
||||
if err != nil {
|
||||
if _, err = config.ToEscapingScheme(cfg.MetricNameEscapingScheme, cfg.MetricNameValidationScheme); err != nil {
|
||||
return nil, fmt.Errorf("invalid metric name escaping scheme, %w", err)
|
||||
}
|
||||
|
||||
symbols := labels.NewSymbolTable()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sp := &scrapePool{
|
||||
appendable: appendable,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
appendable: app,
|
||||
options: options,
|
||||
config: cfg,
|
||||
client: client,
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
symbolTable: labels.NewSymbolTable(),
|
||||
symbolTable: symbols,
|
||||
lastSymbolTableCheck: time.Now(),
|
||||
logger: logger,
|
||||
activeTargets: map[uint64]*Target{},
|
||||
metrics: metrics,
|
||||
httpOpts: options.HTTPClientOptions,
|
||||
validationScheme: cfg.MetricNameValidationScheme,
|
||||
escapingScheme: escapingScheme,
|
||||
}
|
||||
sp.newLoop = func(opts scrapeLoopOptions) loop {
|
||||
// Update the targets retrieval function for metadata to a new scrape cache.
|
||||
cache := opts.cache
|
||||
if cache == nil {
|
||||
cache = newScrapeCache(metrics)
|
||||
}
|
||||
opts.target.SetMetadataStore(cache)
|
||||
|
||||
return newScrapeLoop(
|
||||
ctx,
|
||||
opts.scraper,
|
||||
logger.With("target", opts.target),
|
||||
buffers,
|
||||
func(l labels.Labels) labels.Labels {
|
||||
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
|
||||
},
|
||||
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
|
||||
func(ctx context.Context) storage.Appender { return app.Appender(ctx) },
|
||||
cache,
|
||||
sp.symbolTable,
|
||||
offsetSeed,
|
||||
opts.honorTimestamps,
|
||||
opts.trackTimestampsStaleness,
|
||||
opts.enableCompression,
|
||||
opts.sampleLimit,
|
||||
opts.bucketLimit,
|
||||
opts.maxSchema,
|
||||
opts.labelLimits,
|
||||
opts.interval,
|
||||
opts.timeout,
|
||||
opts.alwaysScrapeClassicHist,
|
||||
opts.convertClassicHistToNHCB,
|
||||
cfg.ScrapeNativeHistogramsEnabled(),
|
||||
options.EnableStartTimestampZeroIngestion,
|
||||
options.EnableTypeAndUnitLabels,
|
||||
options.ExtraMetrics,
|
||||
options.AppendMetadata,
|
||||
opts.target,
|
||||
options.PassMetadataInContext,
|
||||
metrics,
|
||||
options.skipOffsetting,
|
||||
sp.validationScheme,
|
||||
sp.escapingScheme,
|
||||
opts.fallbackScrapeProtocol,
|
||||
)
|
||||
buffers: buffers,
|
||||
offsetSeed: offsetSeed,
|
||||
}
|
||||
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
||||
return sp, nil
|
||||
}
|
||||
|
||||
func (sp *scrapePool) newLoop(opts scrapeLoopOptions) loop {
|
||||
if sp.injectTestNewLoop != nil {
|
||||
return sp.injectTestNewLoop(opts)
|
||||
}
|
||||
return newScrapeLoop(opts)
|
||||
}
|
||||
|
||||
func (sp *scrapePool) ActiveTargets() []*Target {
|
||||
sp.targetMtx.Lock()
|
||||
defer sp.targetMtx.Unlock()
|
||||
@ -323,7 +287,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
sp.metrics.targetScrapePoolReloads.Inc()
|
||||
start := time.Now()
|
||||
|
||||
client, err := newScrapeClient(cfg.HTTPClientConfig, cfg.JobName, sp.httpOpts...)
|
||||
client, err := newScrapeClient(cfg.HTTPClientConfig, cfg.JobName, sp.options.HTTPClientOptions...)
|
||||
if err != nil {
|
||||
sp.metrics.targetScrapePoolReloadsFailed.Inc()
|
||||
return err
|
||||
@ -333,17 +297,14 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
sp.config = cfg
|
||||
oldClient := sp.client
|
||||
sp.client = client
|
||||
|
||||
// Validate scheme so we don't need to do it later.
|
||||
if err := namevalidationutil.CheckNameValidationScheme(cfg.MetricNameValidationScheme); err != nil {
|
||||
return errors.New("scrapePool.reload: MetricNameValidationScheme must be set in scrape configuration")
|
||||
}
|
||||
sp.validationScheme = cfg.MetricNameValidationScheme
|
||||
var escapingScheme model.EscapingScheme
|
||||
escapingScheme, err = model.ToEscapingScheme(cfg.MetricNameEscapingScheme)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid metric name escaping scheme, %w", err)
|
||||
if _, err = config.ToEscapingScheme(cfg.MetricNameEscapingScheme, cfg.MetricNameValidationScheme); err != nil {
|
||||
return fmt.Errorf("scrapePool.reload: invalid metric name escaping scheme, %w", err)
|
||||
}
|
||||
sp.escapingScheme = escapingScheme
|
||||
|
||||
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
||||
|
||||
sp.restartLoops(reuseCache)
|
||||
@ -355,30 +316,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
}
|
||||
|
||||
func (sp *scrapePool) restartLoops(reuseCache bool) {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
interval = time.Duration(sp.config.ScrapeInterval)
|
||||
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||
bodySizeLimit = int64(sp.config.BodySizeLimit)
|
||||
sampleLimit = int(sp.config.SampleLimit)
|
||||
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
|
||||
maxSchema = pickSchema(sp.config.NativeHistogramMinBucketFactor)
|
||||
labelLimits = &labelLimits{
|
||||
labelLimit: int(sp.config.LabelLimit),
|
||||
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
|
||||
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
|
||||
}
|
||||
honorLabels = sp.config.HonorLabels
|
||||
honorTimestamps = sp.config.HonorTimestamps
|
||||
enableCompression = sp.config.EnableCompression
|
||||
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
|
||||
mrc = sp.config.MetricRelabelConfigs
|
||||
fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType()
|
||||
scrapeNativeHist = sp.config.ScrapeNativeHistogramsEnabled()
|
||||
alwaysScrapeClassicHist = sp.config.AlwaysScrapeClassicHistogramsEnabled()
|
||||
convertClassicHistToNHCB = sp.config.ConvertClassicHistogramsToNHCBEnabled()
|
||||
)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
sp.targetMtx.Lock()
|
||||
|
||||
forcedErr := sp.refreshTargetLimitErr()
|
||||
@ -392,38 +330,27 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
|
||||
}
|
||||
|
||||
t := sp.activeTargets[fp]
|
||||
targetInterval, targetTimeout, err := t.intervalAndTimeout(interval, timeout)
|
||||
var (
|
||||
s = &targetScraper{
|
||||
targetInterval, targetTimeout, err := t.intervalAndTimeout(
|
||||
time.Duration(sp.config.ScrapeInterval),
|
||||
time.Duration(sp.config.ScrapeTimeout),
|
||||
)
|
||||
escapingScheme, _ := config.ToEscapingScheme(sp.config.MetricNameEscapingScheme, sp.config.MetricNameValidationScheme)
|
||||
newLoop := sp.newLoop(scrapeLoopOptions{
|
||||
target: t,
|
||||
scraper: &targetScraper{
|
||||
Target: t,
|
||||
client: sp.client,
|
||||
timeout: targetTimeout,
|
||||
bodySizeLimit: bodySizeLimit,
|
||||
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, sp.escapingScheme),
|
||||
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
|
||||
bodySizeLimit: int64(sp.config.BodySizeLimit),
|
||||
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, escapingScheme),
|
||||
acceptEncodingHeader: acceptEncodingHeader(sp.config.EnableCompression),
|
||||
metrics: sp.metrics,
|
||||
}
|
||||
newLoop = sp.newLoop(scrapeLoopOptions{
|
||||
target: t,
|
||||
scraper: s,
|
||||
sampleLimit: sampleLimit,
|
||||
bucketLimit: bucketLimit,
|
||||
maxSchema: maxSchema,
|
||||
labelLimits: labelLimits,
|
||||
honorLabels: honorLabels,
|
||||
honorTimestamps: honorTimestamps,
|
||||
enableCompression: enableCompression,
|
||||
trackTimestampsStaleness: trackTimestampsStaleness,
|
||||
mrc: mrc,
|
||||
cache: cache,
|
||||
interval: targetInterval,
|
||||
timeout: targetTimeout,
|
||||
fallbackScrapeProtocol: fallbackScrapeProtocol,
|
||||
scrapeNativeHist: scrapeNativeHist,
|
||||
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
|
||||
convertClassicHistToNHCB: convertClassicHistToNHCB,
|
||||
})
|
||||
)
|
||||
},
|
||||
cache: cache,
|
||||
interval: targetInterval,
|
||||
timeout: targetTimeout,
|
||||
sp: sp,
|
||||
})
|
||||
if err != nil {
|
||||
newLoop.setForcedError(err)
|
||||
}
|
||||
@ -516,31 +443,10 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||
// It returns after all stopped scrape loops terminated.
|
||||
func (sp *scrapePool) sync(targets []*Target) {
|
||||
var (
|
||||
uniqueLoops = make(map[uint64]loop)
|
||||
interval = time.Duration(sp.config.ScrapeInterval)
|
||||
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||
bodySizeLimit = int64(sp.config.BodySizeLimit)
|
||||
sampleLimit = int(sp.config.SampleLimit)
|
||||
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
|
||||
maxSchema = pickSchema(sp.config.NativeHistogramMinBucketFactor)
|
||||
labelLimits = &labelLimits{
|
||||
labelLimit: int(sp.config.LabelLimit),
|
||||
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
|
||||
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
|
||||
}
|
||||
honorLabels = sp.config.HonorLabels
|
||||
honorTimestamps = sp.config.HonorTimestamps
|
||||
enableCompression = sp.config.EnableCompression
|
||||
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
|
||||
mrc = sp.config.MetricRelabelConfigs
|
||||
fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType()
|
||||
scrapeNativeHist = sp.config.ScrapeNativeHistogramsEnabled()
|
||||
alwaysScrapeClassicHist = sp.config.AlwaysScrapeClassicHistogramsEnabled()
|
||||
convertClassicHistToNHCB = sp.config.ConvertClassicHistogramsToNHCBEnabled()
|
||||
)
|
||||
uniqueLoops := make(map[uint64]loop)
|
||||
|
||||
sp.targetMtx.Lock()
|
||||
escapingScheme, _ := config.ToEscapingScheme(sp.config.MetricNameEscapingScheme, sp.config.MetricNameValidationScheme)
|
||||
for _, t := range targets {
|
||||
hash := t.hash()
|
||||
|
||||
@ -549,34 +455,25 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||
// so whether changed via relabeling or not, they'll exist and hold the correct values
|
||||
// for every target.
|
||||
var err error
|
||||
interval, timeout, err = t.intervalAndTimeout(interval, timeout)
|
||||
s := &targetScraper{
|
||||
Target: t,
|
||||
client: sp.client,
|
||||
timeout: timeout,
|
||||
bodySizeLimit: bodySizeLimit,
|
||||
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, sp.escapingScheme),
|
||||
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
|
||||
metrics: sp.metrics,
|
||||
}
|
||||
targetInterval, targetTimeout, err := t.intervalAndTimeout(
|
||||
time.Duration(sp.config.ScrapeInterval),
|
||||
time.Duration(sp.config.ScrapeTimeout),
|
||||
)
|
||||
l := sp.newLoop(scrapeLoopOptions{
|
||||
target: t,
|
||||
scraper: s,
|
||||
sampleLimit: sampleLimit,
|
||||
bucketLimit: bucketLimit,
|
||||
maxSchema: maxSchema,
|
||||
labelLimits: labelLimits,
|
||||
honorLabels: honorLabels,
|
||||
honorTimestamps: honorTimestamps,
|
||||
enableCompression: enableCompression,
|
||||
trackTimestampsStaleness: trackTimestampsStaleness,
|
||||
mrc: mrc,
|
||||
interval: interval,
|
||||
timeout: timeout,
|
||||
scrapeNativeHist: scrapeNativeHist,
|
||||
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
|
||||
convertClassicHistToNHCB: convertClassicHistToNHCB,
|
||||
fallbackScrapeProtocol: fallbackScrapeProtocol,
|
||||
target: t,
|
||||
scraper: &targetScraper{
|
||||
Target: t,
|
||||
client: sp.client,
|
||||
timeout: targetTimeout,
|
||||
bodySizeLimit: int64(sp.config.BodySizeLimit),
|
||||
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, escapingScheme),
|
||||
acceptEncodingHeader: acceptEncodingHeader(sp.config.EnableCompression),
|
||||
metrics: sp.metrics,
|
||||
},
|
||||
cache: newScrapeCache(sp.metrics),
|
||||
interval: targetInterval,
|
||||
timeout: targetTimeout,
|
||||
sp: sp,
|
||||
})
|
||||
if err != nil {
|
||||
l.setForcedError(err)
|
||||
@ -661,7 +558,7 @@ func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
met := lset.Get(labels.MetricName)
|
||||
met := lset.Get(model.MetricNameLabel)
|
||||
if limits.labelLimit > 0 {
|
||||
nbLabels := lset.Len()
|
||||
if nbLabels > limits.labelLimit {
|
||||
@ -749,8 +646,8 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels
|
||||
return lb.Labels()
|
||||
}
|
||||
|
||||
// appender returns an appender for ingested samples from the target.
|
||||
func appender(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender {
|
||||
// appenderWithLimits returns an appender with additional validation.
|
||||
func appenderWithLimits(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender {
|
||||
app = &timeLimitAppender{
|
||||
Appender: app,
|
||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||
@ -927,55 +824,63 @@ type cacheEntry struct {
|
||||
}
|
||||
|
||||
type scrapeLoop struct {
|
||||
scraper scraper
|
||||
l *slog.Logger
|
||||
scrapeFailureLogger FailureLogger
|
||||
scrapeFailureLoggerMtx sync.RWMutex
|
||||
cache *scrapeCache
|
||||
lastScrapeSize int
|
||||
buffers *pool.Pool
|
||||
offsetSeed uint64
|
||||
honorTimestamps bool
|
||||
trackTimestampsStaleness bool
|
||||
enableCompression bool
|
||||
forcedErr error
|
||||
forcedErrMtx sync.Mutex
|
||||
sampleLimit int
|
||||
bucketLimit int
|
||||
maxSchema int32
|
||||
labelLimits *labelLimits
|
||||
interval time.Duration
|
||||
timeout time.Duration
|
||||
validationScheme model.ValidationScheme
|
||||
escapingScheme model.EscapingScheme
|
||||
|
||||
alwaysScrapeClassicHist bool
|
||||
convertClassicHistToNHCB bool
|
||||
enableSTZeroIngestion bool
|
||||
enableTypeAndUnitLabels bool
|
||||
fallbackScrapeProtocol string
|
||||
|
||||
enableNativeHistogramScraping bool
|
||||
|
||||
appender func(ctx context.Context) storage.Appender
|
||||
symbolTable *labels.SymbolTable
|
||||
sampleMutator labelsMutator
|
||||
reportSampleMutator labelsMutator
|
||||
|
||||
parentCtx context.Context
|
||||
appenderCtx context.Context
|
||||
// Parameters.
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
stopped chan struct{}
|
||||
parentCtx context.Context
|
||||
appenderCtx context.Context
|
||||
l *slog.Logger
|
||||
cache *scrapeCache
|
||||
|
||||
interval time.Duration
|
||||
timeout time.Duration
|
||||
sampleMutator labelsMutator
|
||||
reportSampleMutator labelsMutator
|
||||
scraper scraper
|
||||
|
||||
// Static params per scrapePool.
|
||||
appendable storage.Appendable
|
||||
buffers *pool.Pool
|
||||
offsetSeed uint64
|
||||
symbolTable *labels.SymbolTable
|
||||
metrics *scrapeMetrics
|
||||
|
||||
// Options from config.ScrapeConfig.
|
||||
sampleLimit int
|
||||
bucketLimit int
|
||||
maxSchema int32
|
||||
labelLimits *labelLimits
|
||||
honorLabels bool
|
||||
honorTimestamps bool
|
||||
trackTimestampsStaleness bool
|
||||
enableNativeHistogramScraping bool
|
||||
alwaysScrapeClassicHist bool
|
||||
convertClassicHistToNHCB bool
|
||||
fallbackScrapeProtocol string
|
||||
enableCompression bool
|
||||
mrc []*relabel.Config
|
||||
validationScheme model.ValidationScheme
|
||||
|
||||
// Options from scrape.Options.
|
||||
enableSTZeroIngestion bool
|
||||
enableTypeAndUnitLabels bool
|
||||
reportExtraMetrics bool
|
||||
appendMetadataToWAL bool
|
||||
passMetadataInContext bool
|
||||
skipOffsetting bool // For testability.
|
||||
|
||||
// error injection through setForcedError.
|
||||
forcedErr error
|
||||
forcedErrMtx sync.Mutex
|
||||
|
||||
// Special logger set on setScrapeFailureLogger
|
||||
scrapeFailureLoggerMtx sync.RWMutex
|
||||
scrapeFailureLogger FailureLogger
|
||||
|
||||
// Locally cached data.
|
||||
lastScrapeSize int
|
||||
disabledEndOfRunStalenessMarkers atomic.Bool
|
||||
|
||||
reportExtraMetrics bool
|
||||
appendMetadataToWAL bool
|
||||
|
||||
metrics *scrapeMetrics
|
||||
|
||||
skipOffsetting bool // For testability.
|
||||
}
|
||||
|
||||
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
||||
@ -1000,8 +905,8 @@ type scrapeCache struct {
|
||||
seriesCur map[storage.SeriesRef]*cacheEntry
|
||||
seriesPrev map[storage.SeriesRef]*cacheEntry
|
||||
|
||||
// TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to
|
||||
// avoid locking (using metadata API can block scraping).
|
||||
// TODO(bwplotka): Consider moving metadata caching to head. See
|
||||
// https://github.com/prometheus/prometheus/issues/17619.
|
||||
metaMtx sync.Mutex // Mutex is needed due to api touching it when metadata is queried.
|
||||
metadata map[string]*metaEntry // metadata by metric family name.
|
||||
|
||||
@ -1236,99 +1141,87 @@ func (c *scrapeCache) LengthMetadata() int {
|
||||
return len(c.metadata)
|
||||
}
|
||||
|
||||
func newScrapeLoop(ctx context.Context,
|
||||
sc scraper,
|
||||
l *slog.Logger,
|
||||
buffers *pool.Pool,
|
||||
sampleMutator labelsMutator,
|
||||
reportSampleMutator labelsMutator,
|
||||
appender func(ctx context.Context) storage.Appender,
|
||||
cache *scrapeCache,
|
||||
symbolTable *labels.SymbolTable,
|
||||
offsetSeed uint64,
|
||||
honorTimestamps bool,
|
||||
trackTimestampsStaleness bool,
|
||||
enableCompression bool,
|
||||
sampleLimit int,
|
||||
bucketLimit int,
|
||||
maxSchema int32,
|
||||
labelLimits *labelLimits,
|
||||
interval time.Duration,
|
||||
timeout time.Duration,
|
||||
alwaysScrapeClassicHist bool,
|
||||
convertClassicHistToNHCB bool,
|
||||
enableNativeHistogramScraping bool,
|
||||
enableSTZeroIngestion bool,
|
||||
enableTypeAndUnitLabels bool,
|
||||
reportExtraMetrics bool,
|
||||
appendMetadataToWAL bool,
|
||||
target *Target,
|
||||
passMetadataInContext bool,
|
||||
metrics *scrapeMetrics,
|
||||
skipOffsetting bool,
|
||||
validationScheme model.ValidationScheme,
|
||||
escapingScheme model.EscapingScheme,
|
||||
fallbackScrapeProtocol string,
|
||||
) *scrapeLoop {
|
||||
if l == nil {
|
||||
l = promslog.NewNopLogger()
|
||||
}
|
||||
if buffers == nil {
|
||||
buffers = pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) })
|
||||
}
|
||||
if cache == nil {
|
||||
cache = newScrapeCache(metrics)
|
||||
}
|
||||
// scrapeLoopOptions contains static options that do not change per scrapePool lifecycle.
|
||||
type scrapeLoopOptions struct {
|
||||
target *Target
|
||||
scraper scraper
|
||||
cache *scrapeCache
|
||||
interval, timeout time.Duration
|
||||
|
||||
appenderCtx := ctx
|
||||
sp *scrapePool
|
||||
}
|
||||
|
||||
if passMetadataInContext {
|
||||
// newScrapeLoop constructs new scrapeLoop.
|
||||
// NOTE: Technically this could be a scrapePool method, but it's a standalone function to make it clear scrapeLoop
|
||||
// can be used outside scrapePool lifecycle (e.g. in tests).
|
||||
func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
|
||||
// Update the targets retrieval function for metadata to a new target.
|
||||
opts.target.SetMetadataStore(opts.cache)
|
||||
|
||||
appenderCtx := opts.sp.ctx
|
||||
if opts.sp.options.PassMetadataInContext {
|
||||
// Store the cache and target in the context. This is then used by downstream OTel Collector
|
||||
// to lookup the metadata required to process the samples. Not used by Prometheus itself.
|
||||
// TODO(gouthamve) We're using a dedicated context because using the parentCtx caused a memory
|
||||
// leak. We should ideally fix the main leak. See: https://github.com/prometheus/prometheus/pull/10590
|
||||
appenderCtx = ContextWithMetricMetadataStore(appenderCtx, cache)
|
||||
appenderCtx = ContextWithTarget(appenderCtx, target)
|
||||
// TODO(bwplotka): Remove once OpenTelemetry collector uses AppenderV2 (add issue)
|
||||
appenderCtx = ContextWithMetricMetadataStore(appenderCtx, opts.cache)
|
||||
appenderCtx = ContextWithTarget(appenderCtx, opts.target)
|
||||
}
|
||||
|
||||
sl := &scrapeLoop{
|
||||
scraper: sc,
|
||||
buffers: buffers,
|
||||
cache: cache,
|
||||
appender: appender,
|
||||
symbolTable: symbolTable,
|
||||
sampleMutator: sampleMutator,
|
||||
reportSampleMutator: reportSampleMutator,
|
||||
stopped: make(chan struct{}),
|
||||
offsetSeed: offsetSeed,
|
||||
l: l,
|
||||
parentCtx: ctx,
|
||||
appenderCtx: appenderCtx,
|
||||
honorTimestamps: honorTimestamps,
|
||||
trackTimestampsStaleness: trackTimestampsStaleness,
|
||||
enableCompression: enableCompression,
|
||||
sampleLimit: sampleLimit,
|
||||
bucketLimit: bucketLimit,
|
||||
maxSchema: maxSchema,
|
||||
labelLimits: labelLimits,
|
||||
interval: interval,
|
||||
timeout: timeout,
|
||||
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
|
||||
convertClassicHistToNHCB: convertClassicHistToNHCB,
|
||||
enableSTZeroIngestion: enableSTZeroIngestion,
|
||||
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
||||
fallbackScrapeProtocol: fallbackScrapeProtocol,
|
||||
enableNativeHistogramScraping: enableNativeHistogramScraping,
|
||||
reportExtraMetrics: reportExtraMetrics,
|
||||
appendMetadataToWAL: appendMetadataToWAL,
|
||||
metrics: metrics,
|
||||
skipOffsetting: skipOffsetting,
|
||||
validationScheme: validationScheme,
|
||||
escapingScheme: escapingScheme,
|
||||
}
|
||||
sl.ctx, sl.cancel = context.WithCancel(ctx)
|
||||
ctx, cancel := context.WithCancel(opts.sp.ctx)
|
||||
return &scrapeLoop{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
stopped: make(chan struct{}),
|
||||
parentCtx: opts.sp.ctx,
|
||||
appenderCtx: appenderCtx,
|
||||
l: opts.sp.logger.With("target", opts.target),
|
||||
cache: opts.cache,
|
||||
|
||||
return sl
|
||||
interval: opts.interval,
|
||||
timeout: opts.timeout,
|
||||
sampleMutator: func(l labels.Labels) labels.Labels {
|
||||
return mutateSampleLabels(l, opts.target, opts.sp.config.HonorTimestamps, opts.sp.config.MetricRelabelConfigs)
|
||||
},
|
||||
reportSampleMutator: func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
|
||||
scraper: opts.scraper,
|
||||
|
||||
// Static params per scrapePool.
|
||||
appendable: opts.sp.appendable,
|
||||
buffers: opts.sp.buffers,
|
||||
offsetSeed: opts.sp.offsetSeed,
|
||||
symbolTable: opts.sp.symbolTable,
|
||||
metrics: opts.sp.metrics,
|
||||
|
||||
// config.ScrapeConfig.
|
||||
sampleLimit: int(opts.sp.config.SampleLimit),
|
||||
bucketLimit: int(opts.sp.config.NativeHistogramBucketLimit),
|
||||
maxSchema: pickSchema(opts.sp.config.NativeHistogramMinBucketFactor),
|
||||
labelLimits: &labelLimits{
|
||||
labelLimit: int(opts.sp.config.LabelLimit),
|
||||
labelNameLengthLimit: int(opts.sp.config.LabelNameLengthLimit),
|
||||
labelValueLengthLimit: int(opts.sp.config.LabelValueLengthLimit),
|
||||
},
|
||||
honorLabels: opts.sp.config.HonorLabels,
|
||||
honorTimestamps: opts.sp.config.HonorTimestamps,
|
||||
trackTimestampsStaleness: opts.sp.config.TrackTimestampsStaleness,
|
||||
enableNativeHistogramScraping: opts.sp.config.ScrapeNativeHistogramsEnabled(),
|
||||
alwaysScrapeClassicHist: opts.sp.config.AlwaysScrapeClassicHistogramsEnabled(),
|
||||
convertClassicHistToNHCB: opts.sp.config.ConvertClassicHistogramsToNHCBEnabled(),
|
||||
fallbackScrapeProtocol: opts.sp.config.ScrapeFallbackProtocol.HeaderMediaType(),
|
||||
enableCompression: opts.sp.config.EnableCompression,
|
||||
mrc: opts.sp.config.MetricRelabelConfigs,
|
||||
validationScheme: opts.sp.config.MetricNameValidationScheme,
|
||||
|
||||
// scrape.Options.
|
||||
enableSTZeroIngestion: opts.sp.options.EnableStartTimestampZeroIngestion,
|
||||
enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels,
|
||||
reportExtraMetrics: opts.sp.options.ExtraMetrics,
|
||||
appendMetadataToWAL: opts.sp.options.AppendMetadata,
|
||||
passMetadataInContext: opts.sp.options.PassMetadataInContext,
|
||||
skipOffsetting: opts.sp.options.skipOffsetting,
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
|
||||
@ -1407,6 +1300,11 @@ mainLoop:
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {
|
||||
// NOTE(bwplotka): Add AppenderV2 implementation, see https://github.com/prometheus/prometheus/issues/17632.
|
||||
return &scrapeLoopAppender{scrapeLoop: sl, Appender: sl.appendable.Appender(sl.appenderCtx)}
|
||||
}
|
||||
|
||||
// scrapeAndReport performs a scrape and then appends the result to the storage
|
||||
// together with reporting metrics, by using as few appenders as possible.
|
||||
// In the happy scenario, a single appender is used.
|
||||
@ -1428,10 +1326,10 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
|
||||
var total, added, seriesAdded, bytesRead int
|
||||
var err, appErr, scrapeErr error
|
||||
|
||||
app := sl.appender(sl.appenderCtx)
|
||||
app := sl.appender()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
app.Rollback()
|
||||
_ = app.Rollback()
|
||||
return
|
||||
}
|
||||
err = app.Commit()
|
||||
@ -1449,9 +1347,9 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
|
||||
if forcedErr := sl.getForcedError(); forcedErr != nil {
|
||||
scrapeErr = forcedErr
|
||||
// Add stale markers.
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.appenderCtx)
|
||||
if _, _, _, err := app.append([]byte{}, "", appendTime); err != nil {
|
||||
_ = app.Rollback()
|
||||
app = sl.appender()
|
||||
sl.l.Warn("Append failed", "err", err)
|
||||
}
|
||||
if errc != nil {
|
||||
@ -1507,16 +1405,16 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
|
||||
|
||||
// A failed scrape is the same as an empty scrape,
|
||||
// we still call sl.append to trigger stale markers.
|
||||
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
|
||||
total, added, seriesAdded, appErr = app.append(b, contentType, appendTime)
|
||||
if appErr != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.appenderCtx)
|
||||
_ = app.Rollback()
|
||||
app = sl.appender()
|
||||
sl.l.Debug("Append failed", "err", appErr)
|
||||
// The append failed, probably due to a parse error or sample limit.
|
||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.appenderCtx)
|
||||
if _, _, _, err := app.append([]byte{}, "", appendTime); err != nil {
|
||||
_ = app.Rollback()
|
||||
app = sl.appender()
|
||||
sl.l.Warn("Append failed", "err", err)
|
||||
}
|
||||
}
|
||||
@ -1586,11 +1484,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
||||
// If the target has since been recreated and scraped, the
|
||||
// stale markers will be out of order and ignored.
|
||||
// sl.context would have been cancelled, hence using sl.appenderCtx.
|
||||
app := sl.appender(sl.appenderCtx)
|
||||
app := sl.appender()
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
app.Rollback()
|
||||
_ = app.Rollback()
|
||||
return
|
||||
}
|
||||
err = app.Commit()
|
||||
@ -1598,9 +1496,9 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
||||
sl.l.Warn("Stale commit failed", "err", err)
|
||||
}
|
||||
}()
|
||||
if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.appenderCtx)
|
||||
if _, _, _, err = app.append([]byte{}, "", staleTime); err != nil {
|
||||
_ = app.Rollback()
|
||||
app = sl.appender()
|
||||
sl.l.Warn("Stale append failed", "err", err)
|
||||
}
|
||||
if err = sl.reportStale(app, staleTime); err != nil {
|
||||
@ -1634,7 +1532,7 @@ type appendErrors struct {
|
||||
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) {
|
||||
sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool {
|
||||
// Series no longer exposed, mark it stale.
|
||||
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
|
||||
app.SetOptions(&aOptionRejectEarlyOOO)
|
||||
_, err = app.Append(ref, lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||
app.SetOptions(nil)
|
||||
switch {
|
||||
@ -1648,12 +1546,20 @@ func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (e
|
||||
return err
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
|
||||
type scrapeLoopAppender struct {
|
||||
*scrapeLoop
|
||||
|
||||
storage.Appender
|
||||
}
|
||||
|
||||
var _ scrapeLoopAppendAdapter = &scrapeLoopAppender{}
|
||||
|
||||
func (sl *scrapeLoopAppender) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
|
||||
defTime := timestamp.FromTime(ts)
|
||||
|
||||
if len(b) == 0 {
|
||||
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
|
||||
err = sl.updateStaleMarkers(app, defTime)
|
||||
err = sl.updateStaleMarkers(sl.Appender, defTime)
|
||||
sl.cache.iterDone(false)
|
||||
return total, added, seriesAdded, err
|
||||
}
|
||||
@ -1696,7 +1602,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
|
||||
exemplars := make([]exemplar.Exemplar, 0, 1)
|
||||
|
||||
// Take an appender with limits.
|
||||
app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
|
||||
app := appenderWithLimits(sl.Appender, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@ -1785,7 +1691,7 @@ loop:
|
||||
continue
|
||||
}
|
||||
|
||||
if !lset.Has(labels.MetricName) {
|
||||
if !lset.Has(model.MetricNameLabel) {
|
||||
err = errNameLabelMandatory
|
||||
break loop
|
||||
}
|
||||
@ -1859,7 +1765,7 @@ loop:
|
||||
// But make sure we only do this if we have a cache entry (ce) for our series.
|
||||
sl.cache.trackStaleness(ref, ce)
|
||||
}
|
||||
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
|
||||
if sampleLimitErr == nil && bucketLimitErr == nil {
|
||||
seriesAdded++
|
||||
}
|
||||
}
|
||||
@ -1917,7 +1823,7 @@ loop:
|
||||
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
|
||||
// However, optional TYPE etc metadata and broken OM text can break this, detect those cases here.
|
||||
// TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. ST and NHCB parsing).
|
||||
if isSeriesPartOfFamily(lset.Get(labels.MetricName), lastMFName, lastMeta.Type) {
|
||||
if isSeriesPartOfFamily(lset.Get(model.MetricNameLabel), lastMFName, lastMeta.Type) {
|
||||
if _, merr := app.UpdateMetadata(ref, lset, lastMeta.Metadata); merr != nil {
|
||||
// No need to fail the scrape on errors appending metadata.
|
||||
sl.l.Debug("Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", lastMeta.Metadata), "err", merr)
|
||||
@ -2029,7 +1935,7 @@ func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) boo
|
||||
// during normal operation (e.g., accidental cardinality explosion, sudden traffic spikes).
|
||||
// Current case ordering prevents exercising other cases when limits are exceeded.
|
||||
// Remaining error cases typically occur only a few times, often during initial setup.
|
||||
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
|
||||
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (sampleAdded bool, _ error) {
|
||||
switch {
|
||||
case err == nil:
|
||||
return true, nil
|
||||
@ -2141,7 +2047,7 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
|
||||
func (sl *scrapeLoop) report(app scrapeLoopAppendAdapter, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
|
||||
sl.scraper.Report(start, duration, scrapeErr)
|
||||
|
||||
ts := timestamp.FromTime(start)
|
||||
@ -2152,71 +2058,70 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
|
||||
}
|
||||
b := labels.NewBuilderWithSymbolTable(sl.symbolTable)
|
||||
|
||||
if err = sl.addReportSample(app, scrapeHealthMetric, ts, health, b); err != nil {
|
||||
if err = app.addReportSample(scrapeHealthMetric, ts, health, b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeDurationMetric, ts, duration.Seconds(), b); err != nil {
|
||||
if err = app.addReportSample(scrapeDurationMetric, ts, duration.Seconds(), b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeSamplesMetric, ts, float64(scraped), b); err != nil {
|
||||
if err = app.addReportSample(scrapeSamplesMetric, ts, float64(scraped), b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, samplesPostRelabelMetric, ts, float64(added), b); err != nil {
|
||||
if err = app.addReportSample(samplesPostRelabelMetric, ts, float64(added), b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeSeriesAddedMetric, ts, float64(seriesAdded), b); err != nil {
|
||||
if err = app.addReportSample(scrapeSeriesAddedMetric, ts, float64(seriesAdded), b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if sl.reportExtraMetrics {
|
||||
if err = sl.addReportSample(app, scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b); err != nil {
|
||||
if err = app.addReportSample(scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b); err != nil {
|
||||
if err = app.addReportSample(scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeBodySizeBytesMetric, ts, float64(bytes), b); err != nil {
|
||||
if err = app.addReportSample(scrapeBodySizeBytesMetric, ts, float64(bytes), b, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) {
|
||||
func (sl *scrapeLoop) reportStale(app scrapeLoopAppendAdapter, start time.Time) (err error) {
|
||||
ts := timestamp.FromTime(start)
|
||||
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
|
||||
stale := math.Float64frombits(value.StaleNaN)
|
||||
b := labels.NewBuilder(labels.EmptyLabels())
|
||||
|
||||
if err = sl.addReportSample(app, scrapeHealthMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(scrapeHealthMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeDurationMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(scrapeDurationMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeSamplesMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(scrapeSamplesMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, samplesPostRelabelMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(samplesPostRelabelMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeSeriesAddedMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(scrapeSeriesAddedMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if sl.reportExtraMetrics {
|
||||
if err = sl.addReportSample(app, scrapeTimeoutMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(scrapeTimeoutMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeSampleLimitMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(scrapeSampleLimitMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sl.addReportSample(app, scrapeBodySizeBytesMetric, ts, stale, b); err != nil {
|
||||
if err = app.addReportSample(scrapeBodySizeBytesMetric, ts, stale, b, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t int64, v float64, b *labels.Builder) error {
|
||||
func (sl *scrapeLoopAppender) addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) (err error) {
|
||||
ce, ok, _ := sl.cache.get(s.name)
|
||||
var ref storage.SeriesRef
|
||||
var lset labels.Labels
|
||||
@ -2228,18 +2133,26 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t in
|
||||
// with scraped metrics in the cache.
|
||||
// We have to drop it when building the actual metric.
|
||||
b.Reset(labels.EmptyLabels())
|
||||
b.Set(labels.MetricName, string(s.name[:len(s.name)-1]))
|
||||
b.Set(model.MetricNameLabel, string(s.name[:len(s.name)-1]))
|
||||
lset = sl.reportSampleMutator(b.Labels())
|
||||
}
|
||||
|
||||
ref, err := app.Append(ref, lset, t, v)
|
||||
// This will be improved in AppenderV2.
|
||||
if rejectOOO {
|
||||
sl.SetOptions(&aOptionRejectEarlyOOO)
|
||||
ref, err = sl.Append(ref, lset, t, v)
|
||||
sl.SetOptions(nil)
|
||||
} else {
|
||||
ref, err = sl.Append(ref, lset, t, v)
|
||||
}
|
||||
|
||||
switch {
|
||||
case err == nil:
|
||||
if !ok {
|
||||
sl.cache.addRef(s.name, ref, lset, lset.Hash())
|
||||
// We only need to add metadata once a scrape target appears.
|
||||
if sl.appendMetadataToWAL {
|
||||
if _, merr := app.UpdateMetadata(ref, lset, s.Metadata); merr != nil {
|
||||
if _, merr := sl.UpdateMetadata(ref, lset, s.Metadata); merr != nil {
|
||||
sl.l.Debug("Error when appending metadata in addReportSample", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", s.Metadata), "err", merr)
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -1,4 +1,4 @@
|
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@ -14,7 +14,6 @@
|
||||
package scrape
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
@ -36,7 +35,7 @@ import (
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -611,12 +610,12 @@ func TestBucketLimitAppender(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
resApp := &collectResultAppender{}
|
||||
appTest := teststorage.NewAppendable()
|
||||
|
||||
for _, c := range cases {
|
||||
for _, floatHisto := range []bool{true, false} {
|
||||
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
||||
app := &bucketLimitAppender{Appender: resApp, limit: c.limit}
|
||||
app := &bucketLimitAppender{Appender: appTest.Appender(t.Context()), limit: c.limit}
|
||||
ts := int64(10 * time.Minute / time.Millisecond)
|
||||
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
|
||||
var err error
|
||||
@ -697,12 +696,12 @@ func TestMaxSchemaAppender(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
resApp := &collectResultAppender{}
|
||||
appTest := teststorage.NewAppendable()
|
||||
|
||||
for _, c := range cases {
|
||||
for _, floatHisto := range []bool{true, false} {
|
||||
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
||||
app := &maxSchemaAppender{Appender: resApp, maxSchema: c.maxSchema}
|
||||
app := &maxSchemaAppender{Appender: appTest.Appender(t.Context()), maxSchema: c.maxSchema}
|
||||
ts := int64(10 * time.Minute / time.Millisecond)
|
||||
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
|
||||
var err error
|
||||
@ -723,17 +722,12 @@ func TestMaxSchemaAppender(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test sample_limit when a scrape containst Native Histograms.
|
||||
// Test sample_limit when a scrape contains Native Histograms.
|
||||
func TestAppendWithSampleLimitAndNativeHistogram(t *testing.T) {
|
||||
const sampleLimit = 2
|
||||
resApp := &collectResultAppender{}
|
||||
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender {
|
||||
return resApp
|
||||
}, 0)
|
||||
sl.sampleLimit = sampleLimit
|
||||
appTest := teststorage.NewAppendable()
|
||||
|
||||
now := time.Now()
|
||||
app := appender(sl.appender(context.Background()), sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
|
||||
app := appenderWithLimits(appTest.Appender(t.Context()), 2, 0, histogram.ExponentialSchemaMax)
|
||||
|
||||
// sample_limit is set to 2, so first two scrapes should work
|
||||
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "foo"), timestamp.FromTime(now), 1)
|
||||
|
||||
399
util/teststorage/appender.go
Normal file
399
util/teststorage/appender.go
Normal file
@ -0,0 +1,399 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package teststorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// Sample represents test, combined sample for mocking storage.AppenderV2.
|
||||
type Sample struct {
|
||||
MF string
|
||||
L labels.Labels
|
||||
M metadata.Metadata
|
||||
ST, T int64
|
||||
V float64
|
||||
H *histogram.Histogram
|
||||
FH *histogram.FloatHistogram
|
||||
ES []exemplar.Exemplar
|
||||
}
|
||||
|
||||
func (s Sample) String() string {
|
||||
// Attempting to format similar to ~ OpenMetrics 2.0 for readability.
|
||||
b := strings.Builder{}
|
||||
if s.M.Help != "" {
|
||||
b.WriteString("HELP ")
|
||||
b.WriteString(s.M.Help)
|
||||
b.WriteString("\n")
|
||||
}
|
||||
if s.M.Type != model.MetricTypeUnknown && s.M.Type != "" {
|
||||
b.WriteString("type@")
|
||||
b.WriteString(string(s.M.Type))
|
||||
b.WriteString(" ")
|
||||
}
|
||||
if s.M.Unit != "" {
|
||||
b.WriteString("unit@")
|
||||
b.WriteString(s.M.Unit)
|
||||
b.WriteString(" ")
|
||||
}
|
||||
// Print all value types on purpose, to catch bugs for appending multiple sample types at once.
|
||||
h := ""
|
||||
if s.H != nil {
|
||||
h = s.H.String()
|
||||
}
|
||||
fh := ""
|
||||
if s.FH != nil {
|
||||
fh = s.FH.String()
|
||||
}
|
||||
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T))
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func (s Sample) Equals(other Sample) bool {
|
||||
return strings.Compare(s.MF, other.MF) == 0 &&
|
||||
labels.Equal(s.L, other.L) &&
|
||||
s.M.Equals(other.M) &&
|
||||
s.ST == other.ST &&
|
||||
s.T == other.T &&
|
||||
math.Float64bits(s.V) == math.Float64bits(other.V) && // Compare Float64bits so NaN values which are exactly the same will compare equal.
|
||||
s.H.Equals(other.H) &&
|
||||
s.FH.Equals(other.FH) &&
|
||||
slices.EqualFunc(s.ES, other.ES, exemplar.Exemplar.Equals)
|
||||
}
|
||||
|
||||
// Appendable is a storage.Appendable mock.
|
||||
// It allows recording all samples that were added through the appender and injecting errors.
|
||||
// Appendable will panic if more than one Appender is open.
|
||||
type Appendable struct {
|
||||
appendErrFn func(ls labels.Labels) error // If non-nil, inject appender error on every Append, AppendHistogram and ST zero calls.
|
||||
appendExemplarsError error // If non-nil, inject exemplar error.
|
||||
commitErr error // If non-nil, inject commit error.
|
||||
|
||||
mtx sync.Mutex
|
||||
openAppenders atomic.Int32 // Guard against multi-appender use.
|
||||
|
||||
// Recorded results.
|
||||
pendingSamples []Sample
|
||||
resultSamples []Sample
|
||||
rolledbackSamples []Sample
|
||||
|
||||
// Optional chain (Appender will collect samples, then run next).
|
||||
next storage.Appendable
|
||||
}
|
||||
|
||||
// NewAppendable returns mock Appendable.
|
||||
func NewAppendable() *Appendable {
|
||||
return &Appendable{}
|
||||
}
|
||||
|
||||
// Then chains another appender from the provided appendable for the Appender calls.
|
||||
func (a *Appendable) Then(appendable storage.Appendable) *Appendable {
|
||||
a.next = appendable
|
||||
return a
|
||||
}
|
||||
|
||||
// WithErrs allows injecting errors to the appender.
|
||||
func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendExemplarsError, commitErr error) *Appendable {
|
||||
a.appendErrFn = appendErrFn
|
||||
a.appendExemplarsError = appendExemplarsError
|
||||
a.commitErr = commitErr
|
||||
return a
|
||||
}
|
||||
|
||||
// PendingSamples returns pending samples (samples appended without commit).
|
||||
func (a *Appendable) PendingSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
ret := make([]Sample, len(a.pendingSamples))
|
||||
copy(ret, a.pendingSamples)
|
||||
return ret
|
||||
}
|
||||
|
||||
// ResultSamples returns committed samples.
|
||||
func (a *Appendable) ResultSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
ret := make([]Sample, len(a.resultSamples))
|
||||
copy(ret, a.resultSamples)
|
||||
return ret
|
||||
}
|
||||
|
||||
// RolledbackSamples returns rolled back samples.
|
||||
func (a *Appendable) RolledbackSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
ret := make([]Sample, len(a.rolledbackSamples))
|
||||
copy(ret, a.rolledbackSamples)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (a *Appendable) ResultReset() {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
a.pendingSamples = a.pendingSamples[:0]
|
||||
a.resultSamples = a.resultSamples[:0]
|
||||
a.rolledbackSamples = a.rolledbackSamples[:0]
|
||||
}
|
||||
|
||||
// ResultMetadata returns resultSamples with samples only containing L and M.
|
||||
// This is for compatibility with tests that only focus on metadata.
|
||||
//
|
||||
// TODO: Rewrite tests to test metadata on resultSamples instead.
|
||||
func (a *Appendable) ResultMetadata() []Sample {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
var ret []Sample
|
||||
for _, s := range a.resultSamples {
|
||||
if s.M.IsEmpty() {
|
||||
continue
|
||||
}
|
||||
ret = append(ret, Sample{L: s.L, M: s.M})
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (a *Appendable) String() string {
|
||||
var sb strings.Builder
|
||||
sb.WriteString("committed:\n")
|
||||
for _, s := range a.resultSamples {
|
||||
sb.WriteString("\n")
|
||||
sb.WriteString(s.String())
|
||||
}
|
||||
sb.WriteString("pending:\n")
|
||||
for _, s := range a.pendingSamples {
|
||||
sb.WriteString("\n")
|
||||
sb.WriteString(s.String())
|
||||
}
|
||||
sb.WriteString("rolledback:\n")
|
||||
for _, s := range a.rolledbackSamples {
|
||||
sb.WriteString("\n")
|
||||
sb.WriteString(s.String())
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
var errClosedAppender = errors.New("appender was already committed/rolledback")
|
||||
|
||||
type appender struct {
|
||||
err error
|
||||
next storage.Appender
|
||||
|
||||
a *Appendable
|
||||
}
|
||||
|
||||
func (a *appender) checkErr() error {
|
||||
a.a.mtx.Lock()
|
||||
defer a.a.mtx.Unlock()
|
||||
|
||||
return a.err
|
||||
}
|
||||
|
||||
func (a *Appendable) Appender(ctx context.Context) storage.Appender {
|
||||
ret := &appender{a: a}
|
||||
if a.openAppenders.Inc() > 1 {
|
||||
ret.err = errors.New("teststorage.Appendable.Appender() concurrent use is not supported; attempted opening new Appender() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
ret.next = a.next.Appender(ctx)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (*appender) SetOptions(*storage.AppendOptions) {}
|
||||
|
||||
func (a *appender) Append(ref storage.SeriesRef, ls labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if a.a.appendErrFn != nil {
|
||||
if err := a.a.appendErrFn(ls); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{L: ls, T: t, V: v})
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
if a.next != nil {
|
||||
return a.next.Append(ref, ls, t, v)
|
||||
}
|
||||
|
||||
return computeOrCheckRef(ref, ls)
|
||||
}
|
||||
|
||||
func computeOrCheckRef(ref storage.SeriesRef, ls labels.Labels) (storage.SeriesRef, error) {
|
||||
h := ls.Hash()
|
||||
if ref == 0 {
|
||||
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
|
||||
return storage.SeriesRef(h), nil
|
||||
}
|
||||
|
||||
if storage.SeriesRef(h) != ref {
|
||||
// Check for buggy ref while we at it.
|
||||
return 0, errors.New("teststorage.appender: found input ref not matching labels; potential bug in Appendable user")
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (a *appender) AppendHistogram(ref storage.SeriesRef, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if a.a.appendErrFn != nil {
|
||||
if err := a.a.appendErrFn(ls); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{L: ls, T: t, H: h, FH: fh})
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
if a.next != nil {
|
||||
return a.next.AppendHistogram(ref, ls, t, h, fh)
|
||||
}
|
||||
|
||||
return computeOrCheckRef(ref, ls)
|
||||
}
|
||||
|
||||
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if a.a.appendExemplarsError != nil {
|
||||
return 0, a.a.appendExemplarsError
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
// NOTE(bwplotka): Eventually exemplar has to be attached to a series and soon
|
||||
// the AppenderV2 will guarantee that for TSDB. Assume this from the mock perspective
|
||||
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
|
||||
i := len(a.a.pendingSamples) - 1
|
||||
for ; i >= 0; i-- { // Attach exemplars to the last matching sample.
|
||||
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
|
||||
a.a.pendingSamples[i].ES = append(a.a.pendingSamples[i].ES, e)
|
||||
break
|
||||
}
|
||||
}
|
||||
a.a.mtx.Unlock()
|
||||
if i < 0 {
|
||||
return 0, fmt.Errorf("teststorage.appender: exemplar appender without series; ref %v; l %v; exemplar: %v", ref, l, e)
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
return a.next.AppendExemplar(ref, l, e)
|
||||
}
|
||||
return computeOrCheckRef(ref, l)
|
||||
}
|
||||
|
||||
func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64) (storage.SeriesRef, error) {
|
||||
return a.Append(ref, l, st, 0.0) // This will change soon with AppenderV2, but we already report ST as 0 samples.
|
||||
}
|
||||
|
||||
func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
if h != nil {
|
||||
return a.AppendHistogram(ref, l, st, &histogram.Histogram{}, nil)
|
||||
}
|
||||
return a.AppendHistogram(ref, l, st, nil, &histogram.FloatHistogram{}) // This will change soon with AppenderV2, but we already report ST as 0 histograms.
|
||||
}
|
||||
|
||||
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
// NOTE(bwplotka): Eventually metadata has to be attached to a series and soon
|
||||
// the AppenderV2 will guarantee that for TSDB. Assume this from the mock perspective
|
||||
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
|
||||
i := len(a.a.pendingSamples) - 1
|
||||
for ; i >= 0; i-- { // Attach metadata to the last matching sample.
|
||||
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
|
||||
a.a.pendingSamples[i].M = m
|
||||
break
|
||||
}
|
||||
}
|
||||
a.a.mtx.Unlock()
|
||||
if i < 0 {
|
||||
return 0, fmt.Errorf("teststorage.appender: metadata update without series; ref %v; l %v; m: %v", ref, l, m)
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
return a.next.UpdateMetadata(ref, l, m)
|
||||
}
|
||||
return computeOrCheckRef(ref, l)
|
||||
}
|
||||
|
||||
func (a *appender) Commit() error {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.a.openAppenders.Dec()
|
||||
|
||||
if a.a.commitErr != nil {
|
||||
return a.a.commitErr
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
a.err = errClosedAppender
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
if a.a.next != nil {
|
||||
return a.next.Commit()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *appender) Rollback() error {
|
||||
if err := a.checkErr(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.a.openAppenders.Dec()
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
a.err = errClosedAppender
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
if a.next != nil {
|
||||
return a.next.Rollback()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
131
util/teststorage/appender_test.go
Normal file
131
util/teststorage/appender_test.go
Normal file
@ -0,0 +1,131 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package teststorage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
// TestSample_RequireEqual ensures standard testutil.RequireEqual is enough for comparisons.
|
||||
// This is thanks to the fact metadata has now Equals method.
|
||||
func TestSample_RequireEqual(t *testing.T) {
|
||||
a := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
}
|
||||
testutil.RequireEqual(t, a, a)
|
||||
|
||||
b1 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2_diff", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123}, // test_metric2_diff is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b1)
|
||||
|
||||
b2 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo2")}}}, // exemplar is different.
|
||||
}
|
||||
requireNotEqual(t, a, b2)
|
||||
|
||||
b3 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123, T: 123}, // Timestamp is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b3)
|
||||
|
||||
b4 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 456.456}, // Value is different.
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b4)
|
||||
|
||||
b5 := []Sample{
|
||||
{},
|
||||
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter2", Unit: "metric", Help: "some help text"}}, // Different type.
|
||||
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||
}
|
||||
requireNotEqual(t, a, b5)
|
||||
}
|
||||
|
||||
// TODO(bwplotka): While this mimick testutil.RequireEqual just making it negative, this does not literally test
|
||||
// testutil.RequireEqual. Either build test suita that mocks `testing.TB` or get rid of testutil.RequireEqual somehow.
|
||||
func requireNotEqual(t testing.TB, a, b any) {
|
||||
t.Helper()
|
||||
if !cmp.Equal(a, b, cmp.Comparer(labels.Equal)) {
|
||||
return
|
||||
}
|
||||
require.Fail(t, fmt.Sprintf("Equal, but expected not: \n"+
|
||||
"a: %s\n"+
|
||||
"b: %s", a, b))
|
||||
}
|
||||
|
||||
func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) {
|
||||
a := NewAppendable()
|
||||
|
||||
// Non-concurrent multiple use if fine.
|
||||
app := a.Appender(t.Context())
|
||||
require.Equal(t, int32(1), a.openAppenders.Load())
|
||||
require.NoError(t, app.Commit())
|
||||
// Repeated commit fails.
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
app = a.Appender(t.Context())
|
||||
require.NoError(t, app.Rollback())
|
||||
// Commit after rollback fails.
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
a.WithErrs(
|
||||
nil,
|
||||
nil,
|
||||
errors.New("commit err"),
|
||||
)
|
||||
app = a.Appender(t.Context())
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
a.WithErrs(nil, nil, nil)
|
||||
app = a.Appender(t.Context())
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, int32(0), a.openAppenders.Load())
|
||||
|
||||
// Concurrent use should return appender that errors.
|
||||
_ = a.Appender(t.Context())
|
||||
app = a.Appender(t.Context())
|
||||
_, err := app.Append(0, labels.EmptyLabels(), 0, 0)
|
||||
require.Error(t, err)
|
||||
_, err = app.AppendHistogram(0, labels.EmptyLabels(), 0, nil, nil)
|
||||
require.Error(t, err)
|
||||
require.Error(t, app.Commit())
|
||||
require.Error(t, app.Rollback())
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user