Merge pull request #17867 from prometheus/bwplotka/a2-scrape-1

refactor(appenderV2)[PART5a]: add AppendableV2 support to scrape loop + tests
This commit is contained in:
Bartlomiej Plotka 2026-01-21 08:21:56 +00:00 committed by GitHub
parent e3b6eee437
commit 664b255699
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1878 additions and 530 deletions

View File

@ -45,6 +45,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/teststorage"
@ -1201,7 +1202,9 @@ func TestRuleMovedBetweenGroups(t *testing.T) {
t.Skip("skipping test in short mode.")
}
storage := teststorage.New(t, 600000)
storage := teststorage.New(t, func(opt *tsdb.Options) {
opt.OutOfOrderTimeWindow = 600000
})
defer storage.Close()
opts := promql.EngineOpts{
Logger: nil,

View File

@ -17,6 +17,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"net/http"
"testing"
"time"
@ -38,15 +39,22 @@ import (
// For readability.
type sample = teststorage.Sample
type compatAppendable interface {
storage.Appendable
storage.AppendableV2
}
func withCtx(ctx context.Context) func(sl *scrapeLoop) {
return func(sl *scrapeLoop) {
sl.ctx = ctx
}
}
func withAppendable(appendable storage.Appendable) func(sl *scrapeLoop) {
func withAppendable(app compatAppendable, appV2 bool) func(sl *scrapeLoop) {
return func(sl *scrapeLoop) {
sl.appendable = appendable
sa := selectAppendable(app, appV2)
sl.appendable = sa.V1()
sl.appendableV2 = sa.V2()
}
}
@ -55,8 +63,7 @@ func withAppendable(appendable storage.Appendable) func(sl *scrapeLoop) {
//
// It's recommended to use withXYZ functions for simple option customizations, e.g:
//
// appTest := teststorage.NewAppendable()
// sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
// sl, _ := newTestScrapeLoop(t, withCtx(customCtx))
//
// However, when changing more than one scrapeLoop options it's more readable to have one explicit opt function:
//
@ -64,7 +71,7 @@ func withAppendable(appendable storage.Appendable) func(sl *scrapeLoop) {
// appTest := teststorage.NewAppendable()
// sl, scraper := newTestScrapeLoop(t, func(sl *scrapeLoop) {
// sl.ctx = ctx
// sl.appendable = appTest
// sl.appendableV2 = appTest
// // Since we're writing samples directly below we need to provide a protocol fallback.
// sl.fallbackScrapeProtocol = "text/plain"
// })
@ -84,8 +91,6 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
timeout: 1 * time.Hour,
sampleMutator: nopMutator,
reportSampleMutator: nopMutator,
appendable: teststorage.NewAppendable(),
buffers: pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) }),
metrics: metrics,
maxSchema: histogram.ExponentialSchemaMax,
@ -98,6 +103,11 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
for _, o := range opts {
o(sl)
}
if sl.appendable != nil && sl.appendableV2 != nil {
t.Fatal("select the appendable to use, both were passed, likely a bug")
}
// Validate user opts for convenience.
require.Nil(t, sl.parentCtx, "newTestScrapeLoop does not support injecting non-nil parent context")
require.Nil(t, sl.appenderCtx, "newTestScrapeLoop does not support injecting non-nil appender context")
@ -121,7 +131,8 @@ func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoo
return sl, scraper
}
func newTestScrapePool(t *testing.T, injectNewLoop func(options scrapeLoopOptions) loop) *scrapePool {
func newTestScrapePool(t *testing.T, app compatAppendable, appV2 bool, injectNewLoop func(options scrapeLoopOptions) loop) *scrapePool {
sa := selectAppendable(app, appV2)
return &scrapePool{
ctx: t.Context(),
cancel: func() {},
@ -134,7 +145,8 @@ func newTestScrapePool(t *testing.T, injectNewLoop func(options scrapeLoopOption
loops: map[uint64]loop{},
injectTestNewLoop: injectNewLoop,
appendable: teststorage.NewAppendable(),
appendable: sa.V1(), appendableV2: sa.V2(),
symbolTable: labels.NewSymbolTable(),
metrics: newTestScrapeMetrics(t),
}
@ -158,3 +170,66 @@ func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte {
buf.Write(protoBuf)
return buf.Bytes()
}
type selectedAppendable struct {
useV2 bool
app compatAppendable
}
// V1 returns Appendable if V1 is selected, otherwise nil.
func (s selectedAppendable) V1() storage.Appendable {
if s.useV2 {
return nil
}
return s.app
}
// V2 returns AppendableV2 if V2 is selected, otherwise nil.
func (s selectedAppendable) V2() storage.AppendableV2 {
if !s.useV2 {
return nil
}
return s.app
}
// selectAppendable allows to specify which appendable callers should use when the struct
// implements both. This is how all callers are making the decision - if one appendable is nil, they
// take another. selectAppendable allows to inject nil to e.g. storage.AppendableV2 when appV2 is false.
func selectAppendable(app compatAppendable, appV2 bool) selectedAppendable {
s := selectedAppendable{
app: app,
useV2: appV2,
}
return s
}
func foreachAppendable(t *testing.T, f func(t *testing.T, appV2 bool)) {
for _, appV2 := range []bool{false, true} {
t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) {
f(t, appV2)
})
}
}
func TestSelectAppendable(t *testing.T) {
var i int
foreachAppendable(t, func(t *testing.T, appV2 bool) {
defer func() { i++ }()
switch i {
case 0:
require.False(t, appV2)
s := selectAppendable(teststorage.NewAppendable(), appV2)
require.NotNil(t, s.V1())
require.Nil(t, s.V2())
case 1:
require.True(t, appV2)
s := selectAppendable(teststorage.NewAppendable(), appV2)
require.Nil(t, s.V1())
require.NotNil(t, s.V2())
default:
t.Fatal("too many iterations")
}
})
}

View File

@ -114,7 +114,8 @@ type Manager struct {
opts *Options
logger *slog.Logger
appendable storage.Appendable
appendable storage.Appendable
appendableV2 storage.AppendableV2
graceShut chan struct{}
@ -196,7 +197,7 @@ func (m *Manager) reload() {
continue
}
m.metrics.targetScrapePools.Inc()
sp, err := newScrapePool(scrapeConfig, m.appendable, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
sp, err := newScrapePool(scrapeConfig, m.appendable, m.appendableV2, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
if err != nil {
m.metrics.targetScrapePoolsFailed.Inc()
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)

View File

@ -528,7 +528,7 @@ scrape_configs:
ch <- struct{}{}
return noopLoop()
}
sp := newTestScrapePool(t, newLoop)
sp := newTestScrapePool(t, nil, false, newLoop)
sp.activeTargets[1] = &Target{}
sp.loops[1] = noopLoop()
sp.config = cfg1.ScrapeConfigs[0]
@ -684,7 +684,7 @@ scrape_configs:
_, cancel := context.WithCancel(context.Background())
defer cancel()
sp := newTestScrapePool(t, newLoop)
sp := newTestScrapePool(t, nil, false, newLoop)
sp.loops[1] = noopLoop()
sp.config = cfg1.ScrapeConfigs[0]
sp.metrics = scrapeManager.metrics

View File

@ -82,11 +82,12 @@ type FailureLogger interface {
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable storage.Appendable
logger *slog.Logger
ctx context.Context
cancel context.CancelFunc
options *Options
appendable storage.Appendable
appendableV2 storage.AppendableV2
logger *slog.Logger
ctx context.Context
cancel context.CancelFunc
options *Options
// mtx must not be taken after targetMtx.
mtx sync.Mutex
@ -139,6 +140,7 @@ type scrapeLoopAppendAdapter interface {
func newScrapePool(
cfg *config.ScrapeConfig,
appendable storage.Appendable,
appendableV2 storage.AppendableV2,
offsetSeed uint64,
logger *slog.Logger,
buffers *pool.Pool,
@ -171,6 +173,7 @@ func newScrapePool(
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
appendable: appendable,
appendableV2: appendableV2,
logger: logger,
ctx: ctx,
cancel: cancel,
@ -842,11 +845,12 @@ type scrapeLoop struct {
scraper scraper
// Static params per scrapePool.
appendable storage.Appendable
buffers *pool.Pool
offsetSeed uint64
symbolTable *labels.SymbolTable
metrics *scrapeMetrics
appendable storage.Appendable
appendableV2 storage.AppendableV2
buffers *pool.Pool
offsetSeed uint64
symbolTable *labels.SymbolTable
metrics *scrapeMetrics
// Options from config.ScrapeConfig.
sampleLimit int
@ -1190,11 +1194,12 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
scraper: opts.scraper,
// Static params per scrapePool.
appendable: opts.sp.appendable,
buffers: opts.sp.buffers,
offsetSeed: opts.sp.offsetSeed,
symbolTable: opts.sp.symbolTable,
metrics: opts.sp.metrics,
appendable: opts.sp.appendable,
appendableV2: opts.sp.appendableV2,
buffers: opts.sp.buffers,
offsetSeed: opts.sp.offsetSeed,
symbolTable: opts.sp.symbolTable,
metrics: opts.sp.metrics,
// config.ScrapeConfig.
sampleLimit: int(opts.sp.config.SampleLimit),
@ -1303,7 +1308,9 @@ mainLoop:
}
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {
// NOTE(bwplotka): Add AppenderV2 implementation, see https://github.com/prometheus/prometheus/issues/17632.
if sl.appendableV2 != nil {
return &scrapeLoopAppenderV2{scrapeLoop: sl, AppenderV2: sl.appendableV2.AppenderV2(sl.appenderCtx)}
}
return &scrapeLoopAppender{scrapeLoop: sl, Appender: sl.appendable.Appender(sl.appenderCtx)}
}
@ -1637,7 +1644,7 @@ loop:
break
}
switch et {
// TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram()
// TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram()`
// otherwise we can expose metadata without series on metadata API.
case textparse.EntryType:
// TODO(bwplotka): Build meta entry directly instead of locking and updating the map. This will
@ -1753,7 +1760,7 @@ loop:
}
}
sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
sampleAdded, err = sl.checkAddError(met, nil, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
sl.l.Debug("Unexpected error", "series", string(met), "err", err)
@ -1829,7 +1836,7 @@ loop:
if !seriesCached || lastMeta.lastIterChange == sl.cache.iter {
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
// However, optional TYPE etc metadata and broken OM text can break this, detect those cases here.
// TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. ST and NHCB parsing).
// TODO(https://github.com/prometheus/prometheus/issues/17900): Move this to text and OM parser.
if isSeriesPartOfFamily(lset.Get(model.MetricNameLabel), lastMFName, lastMeta.Type) {
if _, merr := app.UpdateMetadata(ref, lset, lastMeta.Metadata); merr != nil {
// No need to fail the scrape on errors appending metadata.
@ -1871,6 +1878,7 @@ loop:
return total, added, seriesAdded, err
}
// TODO(https://github.com/prometheus/prometheus/issues/17900): Move this to text and OM parser.
func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) bool {
mfNameStr := yoloString(mfName)
if !strings.HasPrefix(mName, mfNameStr) { // Fast path.
@ -1942,7 +1950,7 @@ func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) boo
// during normal operation (e.g., accidental cardinality explosion, sudden traffic spikes).
// Current case ordering prevents exercising other cases when limits are exceeded.
// Remaining error cases typically occur only a few times, often during initial setup.
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (sampleAdded bool, _ error) {
func (sl *scrapeLoop) checkAddError(met []byte, exemplars []exemplar.Exemplar, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (sampleAdded bool, _ error) {
switch {
case err == nil:
return true, nil
@ -1974,6 +1982,26 @@ func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucke
case errors.Is(err, storage.ErrNotFound):
return false, storage.ErrNotFound
default:
// If nothing from the above, check for partial errors. Do this here to not alloc the pErr on a hot path.
var pErr *storage.AppendPartialError
if errors.As(err, &pErr) {
outOfOrderExemplars := 0
for _, e := range pErr.ExemplarErrors {
if errors.Is(e, storage.ErrOutOfOrderExemplar) {
outOfOrderExemplars++
}
// Since exemplar storage is still experimental, we don't fail or check other errors.
// Debug log is emitted in TSDB already.
}
if outOfOrderExemplars > 0 && outOfOrderExemplars == len(exemplars) {
// Only report out of order exemplars if all are out of order, otherwise this was a partial update
// to some existing set of exemplars.
appErrs.numExemplarOutOfOrder += outOfOrderExemplars
sl.l.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", exemplars[len(exemplars)-1]))
sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
}
return true, nil
}
return false, err
}
}

416
scrape/scrape_append_v2.go Normal file
View File

@ -0,0 +1,416 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
import (
"errors"
"fmt"
"io"
"math"
"slices"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
)
// appenderWithLimits returns an appender with additional validation.
func appenderV2WithLimits(app storage.AppenderV2, sampleLimit, bucketLimit int, maxSchema int32) storage.AppenderV2 {
app = &timeLimitAppenderV2{
AppenderV2: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
// The sampleLimit is applied after metrics are potentially dropped via relabeling.
if sampleLimit > 0 {
app = &limitAppenderV2{
AppenderV2: app,
limit: sampleLimit,
}
}
if bucketLimit > 0 {
app = &bucketLimitAppenderV2{
AppenderV2: app,
limit: bucketLimit,
}
}
if maxSchema < histogram.ExponentialSchemaMax {
app = &maxSchemaAppenderV2{
AppenderV2: app,
maxSchema: maxSchema,
}
}
return app
}
func (sl *scrapeLoop) updateStaleMarkersV2(app storage.AppenderV2, defTime int64) (err error) {
sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
_, err = app.Append(ref, lset, 0, defTime, math.Float64frombits(value.StaleNaN), nil, nil, storage.AOptions{RejectOutOfOrder: true})
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
return err
}
type scrapeLoopAppenderV2 struct {
*scrapeLoop
storage.AppenderV2
}
var _ scrapeLoopAppendAdapter = &scrapeLoopAppenderV2{}
func (sl *scrapeLoopAppenderV2) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
defTime := timestamp.FromTime(ts)
if len(b) == 0 {
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
err = sl.updateStaleMarkersV2(sl.AppenderV2, defTime)
sl.cache.iterDone(false)
return total, added, seriesAdded, err
}
p, err := textparse.New(b, contentType, sl.symbolTable, textparse.ParserOptions{
EnableTypeAndUnitLabels: sl.enableTypeAndUnitLabels,
IgnoreNativeHistograms: !sl.enableNativeHistogramScraping,
ConvertClassicHistogramsToNHCB: sl.convertClassicHistToNHCB,
KeepClassicOnClassicAndNativeHistograms: sl.alwaysScrapeClassicHist,
OpenMetricsSkipSTSeries: sl.enableSTZeroIngestion,
FallbackContentType: sl.fallbackScrapeProtocol,
})
if p == nil {
sl.l.Error(
"Failed to determine correct type of scrape target.",
"content_type", contentType,
"fallback_media_type", sl.fallbackScrapeProtocol,
"err", err,
)
return total, added, seriesAdded, err
}
if err != nil {
sl.l.Debug(
"Invalid content type on scrape, using fallback setting.",
"content_type", contentType,
"fallback_media_type", sl.fallbackScrapeProtocol,
"err", err,
)
}
var (
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
lset labels.Labels // Escapes to heap so hoisted out of loop.
e exemplar.Exemplar // Escapes to heap so hoisted out of loop.
lastMeta *metaEntry
lastMFName []byte
)
exemplars := make([]exemplar.Exemplar, 0, 1)
// Take an appender with limits.
app := appenderV2WithLimits(sl.AppenderV2, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
defer func() {
if err != nil {
return
}
// Flush and swap the cache as the scrape was non-empty.
sl.cache.iterDone(true)
}()
loop:
for {
var (
et textparse.Entry
sampleAdded, isHistogram bool
met []byte
parsedTimestamp *int64
val float64
h *histogram.Histogram
fh *histogram.FloatHistogram
)
if et, err = p.Next(); err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
break
}
switch et {
// TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram()
// otherwise we can expose metadata without series on metadata API.
case textparse.EntryType:
// TODO(bwplotka): Build meta entry directly instead of locking and updating the map. This will
// allow to properly update metadata when e.g unit was added, then removed;
lastMFName, lastMeta = sl.cache.setType(p.Type())
continue
case textparse.EntryHelp:
lastMFName, lastMeta = sl.cache.setHelp(p.Help())
continue
case textparse.EntryUnit:
lastMFName, lastMeta = sl.cache.setUnit(p.Unit())
continue
case textparse.EntryComment:
continue
case textparse.EntryHistogram:
isHistogram = true
default:
}
total++
t := defTime
if isHistogram {
met, parsedTimestamp, h, fh = p.Histogram()
} else {
met, parsedTimestamp, val = p.Series()
}
if !sl.honorTimestamps {
parsedTimestamp = nil
}
if parsedTimestamp != nil {
t = *parsedTimestamp
}
if sl.cache.getDropped(met) {
continue
}
ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met)
var (
ref storage.SeriesRef
hash uint64
)
if seriesCached {
ref = ce.ref
lset = ce.lset
hash = ce.hash
} else {
p.Labels(&lset)
hash = lset.Hash()
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)
// The label set may be set to empty to indicate dropping.
if lset.IsEmpty() {
sl.cache.addDropped(met)
continue
}
if !lset.Has(model.MetricNameLabel) {
err = errNameLabelMandatory
break loop
}
if !lset.IsValid(sl.validationScheme) {
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
break loop
}
// If any label limits is exceeded the scrape should fail.
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
break loop
}
}
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
if seriesAlreadyScraped && parsedTimestamp == nil {
err = storage.ErrDuplicateSampleForTimestamp
} else {
// Double check we don't append float 0 for
// histogram case where parser returns bad data.
// This can only happen when parser has a bug.
if isHistogram && h == nil && fh == nil {
err = fmt.Errorf("parser returned nil histogram/float histogram for a histogram entry type for %v series; parser bug; aborting", lset.String())
break loop
}
st := int64(0)
if sl.enableSTZeroIngestion {
// p.StartTimestamp() tend to be expensive (e.g. OM1). Do it only if we care.
st = p.StartTimestamp()
}
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
if !e.HasTs {
if isHistogram {
// We drop exemplars for native histograms if they don't have a timestamp.
// Missing timestamps are deliberately not supported as we want to start
// enforcing timestamps for exemplars as otherwise proper deduplication
// is inefficient and purely based on heuristics: we cannot distinguish
// between repeated exemplars and new instances with the same values.
// This is done silently without logs as it is not an error but out of spec.
// This does not affect classic histograms so that behaviour is unchanged.
e = exemplar.Exemplar{} // Reset for the next fetch.
continue
}
e.Ts = t
}
exemplars = append(exemplars, e)
e = exemplar.Exemplar{} // Reset for the next fetch.
}
// Prepare append call.
appOpts := storage.AOptions{}
if len(exemplars) > 0 {
// Sort so that checking for duplicates / out of order is more efficient during validation.
slices.SortFunc(exemplars, exemplar.Compare)
appOpts.Exemplars = exemplars
}
// Metadata path mimicks the scrape appender V1 flow. Once we remove v2
// flow we should rename "appendMetadataToWAL" flag to "passMetadata" because for v2 flow
// the metadata storage detail is behind the appendableV2 contract. V2 also means we always pass the metadata,
// we don't check if it changed (that code can be removed).
//
// Long term, we should always attach the metadata without any flag. Unfortunately because of the limitation
// of the TEXT and OpenMetrics 1.0 (hopefully fixed in OpenMetrics 2.0) there are edge cases around unknown
// metadata + suffixes that is expensive (isSeriesPartOfFamily) or in some cases impossible to detect. For this
// reason metadata (appendMetadataToWAL=true) appender V2 flow scrape might taking ~3% more CPU in our benchmarks.
//
// TODO(https://github.com/prometheus/prometheus/issues/17900): Optimize this, notably move this check to parsers that require this (ensuring parser
// interface always yields correct metadata), deliver OpenMetrics 2.0 that removes suffixes.
if sl.appendMetadataToWAL && lastMeta != nil {
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
// However, optional TYPE, etc metadata and broken OM text can break this, detect those cases here.
if !isSeriesPartOfFamily(lset.Get(model.MetricNameLabel), lastMFName, lastMeta.Type) {
lastMeta = nil // Don't pass knowingly broken metadata, now, nor on the next line.
}
if lastMeta != nil {
// Metric family name has the same source as metadata.
appOpts.MetricFamilyName = yoloString(lastMFName)
appOpts.Metadata = lastMeta.Metadata
}
}
// Append sample to the storage.
ref, err = app.Append(ref, lset, st, t, val, h, fh, appOpts)
}
sampleAdded, err = sl.checkAddError(met, exemplars, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
sl.l.Debug("Unexpected error", "series", string(met), "err", err)
}
break loop
}
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.ref, ce)
}
// If series wasn't cached (is new, not seen on previous scrape) we need to add it to the scrape cache.
// But we only do this for series that were appended to TSDB without errors.
// If a series was new, but we didn't append it due to sample_limit or other errors then we don't need
// it in the scrape cache because we don't need to emit StaleNaNs for it when it disappears.
if !seriesCached && sampleAdded {
ce = sl.cache.addRef(met, ref, lset, hash)
if ce != nil && (parsedTimestamp == nil || sl.trackTimestampsStaleness) {
// Bypass staleness logic if there is an explicit timestamp.
// But make sure we only do this if we have a cache entry (ce) for our series.
sl.cache.trackStaleness(ref, ce)
}
if sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}
}
// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabeling.
// We still report duplicated samples here since this number should be the exact number
// of time series exposed on a scrape after relabelling.
added++
}
if sampleLimitErr != nil {
if err == nil {
err = sampleLimitErr
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
sl.metrics.targetScrapeSampleLimit.Inc()
}
if bucketLimitErr != nil {
if err == nil {
err = bucketLimitErr // If sample limit is hit, that error takes precedence.
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
sl.metrics.targetScrapeNativeHistogramBucketLimit.Inc()
}
if appErrs.numOutOfOrder > 0 {
sl.l.Warn("Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
}
if appErrs.numDuplicates > 0 {
sl.l.Warn("Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
}
if appErrs.numOutOfBounds > 0 {
sl.l.Warn("Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
}
if appErrs.numExemplarOutOfOrder > 0 {
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
}
if err == nil {
err = sl.updateStaleMarkersV2(app, defTime)
}
return total, added, seriesAdded, err
}
func (sl *scrapeLoopAppenderV2) addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) (err error) {
ce, ok, _ := sl.cache.get(s.name)
var ref storage.SeriesRef
var lset labels.Labels
if ok {
ref = ce.ref
lset = ce.lset
} else {
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
// with scraped metrics in the cache.
// We have to drop it when building the actual metric.
b.Reset(labels.EmptyLabels())
b.Set(model.MetricNameLabel, string(s.name[:len(s.name)-1]))
lset = sl.reportSampleMutator(b.Labels())
}
ref, err = sl.Append(ref, lset, 0, t, v, nil, nil, storage.AOptions{
MetricFamilyName: yoloString(s.name),
Metadata: s.Metadata,
RejectOutOfOrder: rejectOOO,
})
switch {
case err == nil:
if !ok {
sl.cache.addRef(s.name, ref, lset, lset.Hash())
}
return nil
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not log here, as this is expected if a target goes away and comes back
// again with a new scrape loop.
return nil
default:
return err
}
}

File diff suppressed because it is too large Load Diff

View File

@ -454,6 +454,105 @@ func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels
return ref, nil
}
// limitAppender limits the number of total appended samples in a batch.
type limitAppenderV2 struct {
storage.AppenderV2
limit int
i int
}
func (app *limitAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
// Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero).
// This ensures that if a series is already in TSDB then we always write the marker.
if ref == 0 || !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit
}
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
type timeLimitAppenderV2 struct {
storage.AppenderV2
maxTime int64
}
func (app *timeLimitAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, storage.ErrOutOfBounds
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
// bucketLimitAppender limits the number of total appended samples in a batch.
type bucketLimitAppenderV2 struct {
storage.AppenderV2
limit int
}
func (app *bucketLimitAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) {
if h != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) {
return 0, errBucketLimit
}
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
if h.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
if err = h.ReduceResolution(h.Schema - 1); err != nil {
return 0, err
}
}
}
if fh != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) {
return 0, errBucketLimit
}
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
if fh.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
if err = fh.ReduceResolution(fh.Schema - 1); err != nil {
return 0, err
}
}
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
type maxSchemaAppenderV2 struct {
storage.AppenderV2
maxSchema int32
}
func (app *maxSchemaAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) {
if h != nil {
if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema {
if err = h.ReduceResolution(app.maxSchema); err != nil {
return 0, err
}
}
}
if fh != nil {
if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema {
if err = fh.ReduceResolution(app.maxSchema); err != nil {
return 0, err
}
}
}
return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
}
// PopulateDiscoveredLabels sets base labels on lb from target and group labels and scrape configuration, before relabeling.
func PopulateDiscoveredLabels(lb *labels.Builder, cfg *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) {
lb.Reset(labels.EmptyLabels())

View File

@ -35,6 +35,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/teststorage"
)
@ -610,37 +611,65 @@ func TestBucketLimitAppender(t *testing.T) {
},
}
appTest := teststorage.NewAppendable()
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
app := &bucketLimitAppender{Appender: appTest.Appender(t.Context()), limit: c.limit}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
fh := c.h.Copy().ToFloat(nil)
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
if c.expectError {
require.Error(t, err)
t.Run("appV2=false", func(t *testing.T) {
app := &bucketLimitAppender{Appender: teststorage.NewAppendable().Appender(t.Context()), limit: c.limit}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
fh := c.h.Copy().ToFloat(nil)
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
if c.expectError {
require.Error(t, err)
} else {
require.Equal(t, c.expectSchema, fh.Schema)
require.Equal(t, c.expectBucketCount, len(fh.NegativeBuckets)+len(fh.PositiveBuckets))
require.NoError(t, err)
}
} else {
require.Equal(t, c.expectSchema, fh.Schema)
require.Equal(t, c.expectBucketCount, len(fh.NegativeBuckets)+len(fh.PositiveBuckets))
require.NoError(t, err)
h := c.h.Copy()
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
if c.expectError {
require.Error(t, err)
} else {
require.Equal(t, c.expectSchema, h.Schema)
require.Equal(t, c.expectBucketCount, len(h.NegativeBuckets)+len(h.PositiveBuckets))
require.NoError(t, err)
}
}
} else {
h := c.h.Copy()
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
if c.expectError {
require.Error(t, err)
require.NoError(t, app.Commit())
})
t.Run("appV2=true", func(t *testing.T) {
app := &bucketLimitAppenderV2{AppenderV2: teststorage.NewAppendable().AppenderV2(t.Context()), limit: c.limit}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
fh := c.h.Copy().ToFloat(nil)
_, err = app.Append(0, lbls, 0, ts, 0, nil, fh, storage.AOptions{})
if c.expectError {
require.Error(t, err)
} else {
require.Equal(t, c.expectSchema, fh.Schema)
require.Equal(t, c.expectBucketCount, len(fh.NegativeBuckets)+len(fh.PositiveBuckets))
require.NoError(t, err)
}
} else {
require.Equal(t, c.expectSchema, h.Schema)
require.Equal(t, c.expectBucketCount, len(h.NegativeBuckets)+len(h.PositiveBuckets))
require.NoError(t, err)
h := c.h.Copy()
_, err = app.Append(0, lbls, 0, ts, 0, h, nil, storage.AOptions{})
if c.expectError {
require.Error(t, err)
} else {
require.Equal(t, c.expectSchema, h.Schema)
require.Equal(t, c.expectBucketCount, len(h.NegativeBuckets)+len(h.PositiveBuckets))
require.NoError(t, err)
}
}
}
require.NoError(t, app.Commit())
require.NoError(t, app.Commit())
})
})
}
}
@ -696,27 +725,45 @@ func TestMaxSchemaAppender(t *testing.T) {
},
}
appTest := teststorage.NewAppendable()
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
app := &maxSchemaAppender{Appender: appTest.Appender(t.Context()), maxSchema: c.maxSchema}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
fh := c.h.Copy().ToFloat(nil)
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
require.Equal(t, c.expectSchema, fh.Schema)
require.NoError(t, err)
} else {
h := c.h.Copy()
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
require.Equal(t, c.expectSchema, h.Schema)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
t.Run("appV2=false", func(t *testing.T) {
app := &maxSchemaAppender{Appender: teststorage.NewAppendable().Appender(t.Context()), maxSchema: c.maxSchema}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
fh := c.h.Copy().ToFloat(nil)
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
require.Equal(t, c.expectSchema, fh.Schema)
require.NoError(t, err)
} else {
h := c.h.Copy()
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
require.Equal(t, c.expectSchema, h.Schema)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
})
t.Run("appV2=true", func(t *testing.T) {
app := &maxSchemaAppenderV2{AppenderV2: teststorage.NewAppendable().AppenderV2(t.Context()), maxSchema: c.maxSchema}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
if floatHisto {
fh := c.h.Copy().ToFloat(nil)
_, err = app.Append(0, lbls, 0, ts, 0, nil, fh, storage.AOptions{})
require.Equal(t, c.expectSchema, fh.Schema)
require.NoError(t, err)
} else {
h := c.h.Copy()
_, err = app.Append(0, lbls, 0, ts, 0, h, nil, storage.AOptions{})
require.Equal(t, c.expectSchema, h.Schema)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
})
})
}
}
@ -724,32 +771,65 @@ func TestMaxSchemaAppender(t *testing.T) {
// Test sample_limit when a scrape contains Native Histograms.
func TestAppendWithSampleLimitAndNativeHistogram(t *testing.T) {
appTest := teststorage.NewAppendable()
now := time.Now()
app := appenderWithLimits(appTest.Appender(t.Context()), 2, 0, histogram.ExponentialSchemaMax)
t.Run("appV2=false", func(t *testing.T) {
app := appenderWithLimits(teststorage.NewAppendable().Appender(t.Context()), 2, 0, histogram.ExponentialSchemaMax)
// sample_limit is set to 2, so first two scrapes should work
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "foo"), timestamp.FromTime(now), 1)
require.NoError(t, err)
// sample_limit is set to 2, so first two scrapes should work
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "foo"), timestamp.FromTime(now), 1)
require.NoError(t, err)
// Second sample, should be ok.
_, err = app.AppendHistogram(
0,
labels.FromStrings(model.MetricNameLabel, "my_histogram1"),
timestamp.FromTime(now),
&histogram.Histogram{},
nil,
)
require.NoError(t, err)
// Second sample, should be ok.
_, err = app.AppendHistogram(
0,
labels.FromStrings(model.MetricNameLabel, "my_histogram1"),
timestamp.FromTime(now),
&histogram.Histogram{},
nil,
)
require.NoError(t, err)
// This is third sample with sample_limit=2, it should trigger errSampleLimit.
_, err = app.AppendHistogram(
0,
labels.FromStrings(model.MetricNameLabel, "my_histogram2"),
timestamp.FromTime(now),
&histogram.Histogram{},
nil,
)
require.ErrorIs(t, err, errSampleLimit)
// This is third sample with sample_limit=2, it should trigger errSampleLimit.
_, err = app.AppendHistogram(
0,
labels.FromStrings(model.MetricNameLabel, "my_histogram2"),
timestamp.FromTime(now),
&histogram.Histogram{},
nil,
)
require.ErrorIs(t, err, errSampleLimit)
})
t.Run("appV2=true", func(t *testing.T) {
app := appenderV2WithLimits(teststorage.NewAppendable().AppenderV2(t.Context()), 2, 0, histogram.ExponentialSchemaMax)
// sample_limit is set to 2, so first two scrapes should work
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "foo"), 0, timestamp.FromTime(now), 1, nil, nil, storage.AOptions{})
require.NoError(t, err)
// Second sample, should be ok.
_, err = app.Append(
0,
labels.FromStrings(model.MetricNameLabel, "my_histogram1"),
0,
timestamp.FromTime(now),
0,
&histogram.Histogram{},
nil,
storage.AOptions{},
)
require.NoError(t, err)
// This is third sample with sample_limit=2, it should trigger errSampleLimit.
_, err = app.Append(
0,
labels.FromStrings(model.MetricNameLabel, "my_histogram2"),
0,
timestamp.FromTime(now),
0,
&histogram.Histogram{},
nil,
storage.AOptions{},
)
require.ErrorIs(t, err, errSampleLimit)
})
}

View File

@ -21,15 +21,21 @@ import (
"slices"
"strings"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/testutil"
)
// Sample represents test, combined sample for mocking storage.AppenderV2.
@ -91,6 +97,56 @@ func (s Sample) Equals(other Sample) bool {
slices.EqualFunc(s.ES, other.ES, exemplar.Exemplar.Equals)
}
var (
sampleComparer = cmp.Comparer(func(a, b Sample) bool {
return a.Equals(b)
})
byLabelSort = cmpopts.SortSlices(func(a, b Sample) int {
return labels.Compare(a.L, b.L)
})
)
func includeStaleNaNs(s []Sample) bool {
for _, e := range s {
if value.IsStaleNaN(e.V) {
return true
}
}
return false
}
// RequireEqual is a special require equal that correctly compare Prometheus structures.
//
// In comparison to testutil.RequireEqual, this function adds special logic for comparing []Samples.
//
// It also ignores ordering when expected slice contains at least one StaleNaN. This is because the
// scrape StaleNan samples are generated by iterating over a map, thus expectedly different.
//
// TODO(bwplotka): We should likely reorder only within a group of sequential NaNs or only in scrape package.
func RequireEqual(t testing.TB, expected, got []Sample, msgAndArgs ...any) {
opts := []cmp.Option{sampleComparer}
if includeStaleNaNs(expected) {
opts = append(opts, byLabelSort)
}
testutil.RequireEqualWithOptions(t, expected, got, opts, msgAndArgs...)
}
// RequireNotEqual is the negation of RequireEqual.
func RequireNotEqual(t testing.TB, expected, got []Sample, msgAndArgs ...any) {
t.Helper()
opts := []cmp.Option{cmp.Comparer(labels.Equal), sampleComparer}
if includeStaleNaNs(expected) {
opts = append(opts, byLabelSort)
}
if !cmp.Equal(expected, got, opts...) {
return
}
require.Fail(t, fmt.Sprintf("Equal, but expected not: \n"+
"a: %s\n"+
"b: %s", expected, got), msgAndArgs...)
}
// Appendable is a storage.Appendable mock.
// It allows recording all samples that were added through the appender and injecting errors.
// Appendable will panic if more than one Appender is open.
@ -108,8 +164,7 @@ type Appendable struct {
rolledbackSamples []Sample
// Optional chain (Appender will collect samples, then run next).
next storage.Appendable
nextV2 storage.AppendableV2
next compatAppendable
}
// NewAppendable returns mock Appendable.
@ -117,15 +172,14 @@ func NewAppendable() *Appendable {
return &Appendable{}
}
// Then chains another appender from the provided Appendable for the Appender calls.
func (a *Appendable) Then(appendable storage.Appendable) *Appendable {
a.next = appendable
return a
type compatAppendable interface {
storage.Appendable
storage.AppendableV2
}
// ThenV2 chains another appenderV2 from the provided AppendableV2 for the AppenderV2 calls.
func (a *Appendable) ThenV2(appendable storage.AppendableV2) *Appendable {
a.nextV2 = appendable
// Then chains another appender from the provided Appendable for the Appender calls.
func (a *Appendable) Then(appendable compatAppendable) *Appendable {
a.next = appendable
return a
}
@ -289,13 +343,12 @@ func (a *Appendable) Appender(ctx context.Context) storage.Appender {
ret := &appender{baseAppender: baseAppender{a: a}}
if a.openAppenders.Inc() > 1 {
ret.err = errors.New("teststorage.Appendable.Appender() concurrent use is not supported; attempted opening new Appender() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
return ret
}
if a.next != nil {
app := a.next.Appender(ctx)
ret.next, ret.nextTr = app, app
} else if a.nextV2 != nil {
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.Appender() invoked with .ThenV2 but no .Then was supplied; likely bug"))
}
return ret
}
@ -332,7 +385,8 @@ func computeOrCheckRef(ref storage.SeriesRef, ls labels.Labels) (storage.SeriesR
}
if storage.SeriesRef(h) != ref {
// Check for buggy ref while we at it.
// Check for buggy ref while we are at it. This only makes sense for cases without .Then*, because further appendable
// might have a different ref computation logic e.g. TSDB uses atomic increments.
return 0, errors.New("teststorage.appender: found input ref not matching labels; potential bug in Appendable usage")
}
return ref, nil
@ -442,13 +496,12 @@ func (a *Appendable) AppenderV2(ctx context.Context) storage.AppenderV2 {
ret := &appenderV2{baseAppender: baseAppender{a: a}}
if a.openAppenders.Inc() > 1 {
ret.err = errors.New("teststorage.Appendable.AppenderV2() concurrent use is not supported; attempted opening new AppenderV2() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
return ret
}
if a.nextV2 != nil {
app := a.nextV2.AppenderV2(ctx)
if a.next != nil {
app := a.next.AppenderV2(ctx)
ret.next, ret.nextTr = app, app
} else if a.next != nil {
ret.err = errors.Join(ret.err, errors.New("teststorage.Appendable.AppenderV2() invoked with .Then but no .ThenV2 was supplied; likely bug"))
}
return ret
}
@ -498,13 +551,14 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
if a.next != nil {
ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)
if err != nil {
return 0, err
}
} else {
ref, err = computeOrCheckRef(ref, ls)
if err != nil {
return ref, err
}
}
ref, err = computeOrCheckRef(ref, ls)
if err != nil {
return ref, err
}
return ref, partialErr
}

View File

@ -15,16 +15,16 @@ package teststorage
import (
"errors"
"fmt"
"math"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil"
@ -140,24 +140,11 @@ func TestAppendable_Then(t *testing.T) {
// Ensure next mock record all the appends when appending to app.
testAppendableV1(t, nextAppTest, app)
// V2 should fail as Then was supplied with Appendable V1.
require.Error(t, app.AppenderV2(t.Context()).Commit())
}
func TestAppendable_ThenV2(t *testing.T) {
nextAppTest := NewAppendable()
app := NewAppendable().ThenV2(nextAppTest)
// Ensure next mock record all the appends when appending to app.
testAppendableV2(t, nextAppTest, app)
// V1 should fail as ThenV2 was supplied with Appendable V2.
require.Error(t, app.Appender(t.Context()).Commit())
}
// TestSample_RequireEqual ensures standard testutil.RequireEqual is enough for comparisons.
// This is thanks to the fact metadata has now Equals method.
// TestSample_RequireEqual.
func TestSample_RequireEqual(t *testing.T) {
a := []Sample{
{},
@ -165,7 +152,7 @@ func TestSample_RequireEqual(t *testing.T) {
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
testutil.RequireEqual(t, a, a)
RequireEqual(t, a, a)
b1 := []Sample{
{},
@ -173,7 +160,7 @@ func TestSample_RequireEqual(t *testing.T) {
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2_diff", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123}, // test_metric2_diff is different.
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
requireNotEqual(t, a, b1)
RequireNotEqual(t, a, b1)
b2 := []Sample{
{},
@ -181,7 +168,7 @@ func TestSample_RequireEqual(t *testing.T) {
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo2")}}}, // exemplar is different.
}
requireNotEqual(t, a, b2)
RequireNotEqual(t, a, b2)
b3 := []Sample{
{},
@ -189,7 +176,7 @@ func TestSample_RequireEqual(t *testing.T) {
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123, T: 123}, // Timestamp is different.
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
requireNotEqual(t, a, b3)
RequireNotEqual(t, a, b3)
b4 := []Sample{
{},
@ -197,7 +184,7 @@ func TestSample_RequireEqual(t *testing.T) {
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 456.456}, // Value is different.
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
requireNotEqual(t, a, b4)
RequireNotEqual(t, a, b4)
b5 := []Sample{
{},
@ -205,19 +192,43 @@ func TestSample_RequireEqual(t *testing.T) {
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
requireNotEqual(t, a, b5)
}
RequireNotEqual(t, a, b5)
// TODO(bwplotka): While this mimick testutil.RequireEqual just making it negative, this does not literally test
// testutil.RequireEqual. Either build test suita that mocks `testing.TB` or get rid of testutil.RequireEqual somehow.
func requireNotEqual(t testing.TB, a, b any) {
t.Helper()
if !cmp.Equal(a, b, cmp.Comparer(labels.Equal)) {
return
// NaN comparison.
a = []Sample{
{},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: math.Float64frombits(value.StaleNaN)},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
require.Fail(t, fmt.Sprintf("Equal, but expected not: \n"+
"a: %s\n"+
"b: %s", a, b))
RequireEqual(t, a, a)
// NaN comparison with different order.
a = []Sample{
{},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric10", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: math.Float64frombits(value.StaleNaN)},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: math.Float64frombits(value.StaleNaN)},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
b6 := []Sample{
{},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: math.Float64frombits(value.StaleNaN)},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric10", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: math.Float64frombits(value.StaleNaN)},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
RequireEqual(t, a, b6)
// Not equal with NaNs.
b7 := []Sample{
{},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric10", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: math.Float64frombits(value.StaleNaN)},
{L: labels.FromStrings(model.MetricNameLabel, "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge2", Unit: "", Help: "other help text"}, V: math.Float64frombits(value.StaleNaN)}, // metadata different
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings(model.MetricNameLabel, "yolo")}}},
}
RequireNotEqual(t, a, b7)
}
func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) {

View File

@ -28,34 +28,34 @@ import (
"github.com/prometheus/prometheus/util/testutil"
)
type Option func(opt *tsdb.Options)
// New returns a new TestStorage for testing purposes
// that removes all associated files on closing.
func New(t testutil.T, outOfOrderTimeWindow ...int64) *TestStorage {
stor, err := NewWithError(outOfOrderTimeWindow...)
func New(t testutil.T, o ...Option) *TestStorage {
s, err := NewWithError(o...)
require.NoError(t, err)
return stor
return s
}
// NewWithError returns a new TestStorage for user facing tests, which reports
// errors directly.
func NewWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
dir, err := os.MkdirTemp("", "test_storage")
if err != nil {
return nil, fmt.Errorf("opening test directory: %w", err)
}
func NewWithError(o ...Option) (*TestStorage, error) {
// Tests just load data for a series sequentially. Thus we
// need a long appendable window.
opts := tsdb.DefaultOptions()
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0
opts.OutOfOrderTimeWindow = 0
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
if len(outOfOrderTimeWindow) > 0 {
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
} else {
opts.OutOfOrderTimeWindow = 0 // Default value is zero
for _, opt := range o {
opt(opts)
}
dir, err := os.MkdirTemp("", "test_storage")
if err != nil {
return nil, fmt.Errorf("opening test directory: %w", err)
}
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())