mirror of
https://github.com/prometheus/prometheus.git
synced 2025-12-05 01:21:23 +01:00
refactor(scrape): simplified scrapeLoop constructors & tests; add teststorage.Appender mock
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
b0649e08c4
commit
fd6a28a7da
@ -484,7 +484,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte
|
|||||||
// supposed to be used according to the schema.
|
// supposed to be used according to the schema.
|
||||||
func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool {
|
func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool {
|
||||||
if h2 == nil {
|
if h2 == nil {
|
||||||
return false
|
return h == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.Schema != h2.Schema ||
|
if h.Schema != h2.Schema ||
|
||||||
|
|||||||
@ -247,7 +247,7 @@ func (h *Histogram) CumulativeBucketIterator() BucketIterator[uint64] {
|
|||||||
// supposed to be used according to the schema.
|
// supposed to be used according to the schema.
|
||||||
func (h *Histogram) Equals(h2 *Histogram) bool {
|
func (h *Histogram) Equals(h2 *Histogram) bool {
|
||||||
if h2 == nil {
|
if h2 == nil {
|
||||||
return false
|
return h == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.Schema != h2.Schema || h.Count != h2.Count ||
|
if h.Schema != h2.Schema || h.Count != h2.Count ||
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2022 The Prometheus Authors
|
// Copyright The Prometheus Authors
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
@ -13,7 +13,11 @@
|
|||||||
|
|
||||||
package metadata
|
package metadata
|
||||||
|
|
||||||
import "github.com/prometheus/common/model"
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
)
|
||||||
|
|
||||||
// Metadata stores a series' metadata information.
|
// Metadata stores a series' metadata information.
|
||||||
type Metadata struct {
|
type Metadata struct {
|
||||||
@ -21,3 +25,26 @@ type Metadata struct {
|
|||||||
Unit string `json:"unit"`
|
Unit string `json:"unit"`
|
||||||
Help string `json:"help"`
|
Help string `json:"help"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsEmpty returns true if metadata structure is empty, including unknown type case.
|
||||||
|
func (m Metadata) IsEmpty() bool {
|
||||||
|
return (m.Type == "" || m.Type == model.MetricTypeUnknown) && m.Unit == "" && m.Help == ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Equals returns true if m is semantically the same as other metadata.
|
||||||
|
func (m Metadata) Equals(other Metadata) bool {
|
||||||
|
if strings.Compare(m.Unit, other.Unit) != 0 || strings.Compare(m.Help, other.Help) != 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Type == "" || m.Type == model.MetricTypeUnknown {
|
||||||
|
if m.Type != "" && m.Type != model.MetricTypeUnknown {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if m.Type != other.Type {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2013 The Prometheus Authors
|
// Copyright The Prometheus Authors
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
@ -17,10 +17,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
|
||||||
"math"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
@ -32,8 +28,12 @@ import (
|
|||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/metadata"
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/util/teststorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// For readability.
|
||||||
|
type sample = teststorage.Sample
|
||||||
|
|
||||||
type nopAppendable struct{}
|
type nopAppendable struct{}
|
||||||
|
|
||||||
func (nopAppendable) Appender(context.Context) storage.Appender {
|
func (nopAppendable) Appender(context.Context) storage.Appender {
|
||||||
@ -71,188 +71,6 @@ func (nopAppender) AppendSTZeroSample(storage.SeriesRef, labels.Labels, int64, i
|
|||||||
func (nopAppender) Commit() error { return nil }
|
func (nopAppender) Commit() error { return nil }
|
||||||
func (nopAppender) Rollback() error { return nil }
|
func (nopAppender) Rollback() error { return nil }
|
||||||
|
|
||||||
type floatSample struct {
|
|
||||||
metric labels.Labels
|
|
||||||
t int64
|
|
||||||
f float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func equalFloatSamples(a, b floatSample) bool {
|
|
||||||
// Compare Float64bits so NaN values which are exactly the same will compare equal.
|
|
||||||
return labels.Equal(a.metric, b.metric) && a.t == b.t && math.Float64bits(a.f) == math.Float64bits(b.f)
|
|
||||||
}
|
|
||||||
|
|
||||||
type histogramSample struct {
|
|
||||||
metric labels.Labels
|
|
||||||
t int64
|
|
||||||
h *histogram.Histogram
|
|
||||||
fh *histogram.FloatHistogram
|
|
||||||
}
|
|
||||||
|
|
||||||
type metadataEntry struct {
|
|
||||||
m metadata.Metadata
|
|
||||||
metric labels.Labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func metadataEntryEqual(a, b metadataEntry) bool {
|
|
||||||
if !labels.Equal(a.metric, b.metric) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if a.m.Type != b.m.Type {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if a.m.Unit != b.m.Unit {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if a.m.Help != b.m.Help {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
type collectResultAppendable struct {
|
|
||||||
*collectResultAppender
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppendable) Appender(context.Context) storage.Appender {
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
// collectResultAppender records all samples that were added through the appender.
|
|
||||||
// It can be used as its zero value or be backed by another appender it writes samples through.
|
|
||||||
type collectResultAppender struct {
|
|
||||||
mtx sync.Mutex
|
|
||||||
|
|
||||||
next storage.Appender
|
|
||||||
resultFloats []floatSample
|
|
||||||
pendingFloats []floatSample
|
|
||||||
rolledbackFloats []floatSample
|
|
||||||
resultHistograms []histogramSample
|
|
||||||
pendingHistograms []histogramSample
|
|
||||||
rolledbackHistograms []histogramSample
|
|
||||||
resultExemplars []exemplar.Exemplar
|
|
||||||
pendingExemplars []exemplar.Exemplar
|
|
||||||
resultMetadata []metadataEntry
|
|
||||||
pendingMetadata []metadataEntry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*collectResultAppender) SetOptions(*storage.AppendOptions) {}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
|
||||||
a.mtx.Lock()
|
|
||||||
defer a.mtx.Unlock()
|
|
||||||
a.pendingFloats = append(a.pendingFloats, floatSample{
|
|
||||||
metric: lset,
|
|
||||||
t: t,
|
|
||||||
f: v,
|
|
||||||
})
|
|
||||||
|
|
||||||
if a.next == nil {
|
|
||||||
if ref == 0 {
|
|
||||||
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
|
|
||||||
ref = storage.SeriesRef(lset.Hash())
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ref, err := a.next.Append(ref, lset, t, v)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
|
||||||
a.mtx.Lock()
|
|
||||||
defer a.mtx.Unlock()
|
|
||||||
a.pendingExemplars = append(a.pendingExemplars, e)
|
|
||||||
if a.next == nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return a.next.AppendExemplar(ref, l, e)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
|
||||||
a.mtx.Lock()
|
|
||||||
defer a.mtx.Unlock()
|
|
||||||
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l})
|
|
||||||
if a.next == nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return a.next.AppendHistogram(ref, l, t, h, fh)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
|
||||||
if h != nil {
|
|
||||||
return a.AppendHistogram(ref, l, st, &histogram.Histogram{}, nil)
|
|
||||||
}
|
|
||||||
return a.AppendHistogram(ref, l, st, nil, &histogram.FloatHistogram{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
|
||||||
a.mtx.Lock()
|
|
||||||
defer a.mtx.Unlock()
|
|
||||||
a.pendingMetadata = append(a.pendingMetadata, metadataEntry{metric: l, m: m})
|
|
||||||
if a.next == nil {
|
|
||||||
if ref == 0 {
|
|
||||||
ref = storage.SeriesRef(l.Hash())
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return a.next.UpdateMetadata(ref, l, m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64) (storage.SeriesRef, error) {
|
|
||||||
return a.Append(ref, l, st, 0.0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) Commit() error {
|
|
||||||
a.mtx.Lock()
|
|
||||||
defer a.mtx.Unlock()
|
|
||||||
a.resultFloats = append(a.resultFloats, a.pendingFloats...)
|
|
||||||
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
|
|
||||||
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
|
|
||||||
a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...)
|
|
||||||
a.pendingFloats = nil
|
|
||||||
a.pendingExemplars = nil
|
|
||||||
a.pendingHistograms = nil
|
|
||||||
a.pendingMetadata = nil
|
|
||||||
if a.next == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return a.next.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) Rollback() error {
|
|
||||||
a.mtx.Lock()
|
|
||||||
defer a.mtx.Unlock()
|
|
||||||
a.rolledbackFloats = a.pendingFloats
|
|
||||||
a.rolledbackHistograms = a.pendingHistograms
|
|
||||||
a.pendingFloats = nil
|
|
||||||
a.pendingHistograms = nil
|
|
||||||
if a.next == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return a.next.Rollback()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *collectResultAppender) String() string {
|
|
||||||
var sb strings.Builder
|
|
||||||
for _, s := range a.resultFloats {
|
|
||||||
sb.WriteString(fmt.Sprintf("committed: %s %f %d\n", s.metric, s.f, s.t))
|
|
||||||
}
|
|
||||||
for _, s := range a.pendingFloats {
|
|
||||||
sb.WriteString(fmt.Sprintf("pending: %s %f %d\n", s.metric, s.f, s.t))
|
|
||||||
}
|
|
||||||
for _, s := range a.rolledbackFloats {
|
|
||||||
sb.WriteString(fmt.Sprintf("rolledback: %s %f %d\n", s.metric, s.f, s.t))
|
|
||||||
}
|
|
||||||
return sb.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
// protoMarshalDelimited marshals a MetricFamily into a delimited
|
// protoMarshalDelimited marshals a MetricFamily into a delimited
|
||||||
// Prometheus proto exposition format bytes (known as `encoding=delimited`)
|
// Prometheus proto exposition format bytes (known as `encoding=delimited`)
|
||||||
//
|
//
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2013 The Prometheus Authors
|
// Copyright The Prometheus Authors
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
@ -38,8 +38,8 @@ import (
|
|||||||
"github.com/prometheus/prometheus/util/pool"
|
"github.com/prometheus/prometheus/util/pool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewManager is the Manager constructor.
|
// NewManager is the Manager constructor using Appendable.
|
||||||
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
|
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), appendable storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
|
||||||
if o == nil {
|
if o == nil {
|
||||||
o = &Options{}
|
o = &Options{}
|
||||||
}
|
}
|
||||||
@ -53,7 +53,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str
|
|||||||
}
|
}
|
||||||
|
|
||||||
m := &Manager{
|
m := &Manager{
|
||||||
append: app,
|
appendable: appendable,
|
||||||
opts: o,
|
opts: o,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
newScrapeFailureLogger: newScrapeFailureLogger,
|
newScrapeFailureLogger: newScrapeFailureLogger,
|
||||||
@ -79,15 +79,15 @@ type Options struct {
|
|||||||
// Option to enable appending of scraped Metadata to the TSDB/other appenders. Individual appenders
|
// Option to enable appending of scraped Metadata to the TSDB/other appenders. Individual appenders
|
||||||
// can decide what to do with metadata, but for practical purposes this flag exists so that metadata
|
// can decide what to do with metadata, but for practical purposes this flag exists so that metadata
|
||||||
// can be written to the WAL and thus read for remote write.
|
// can be written to the WAL and thus read for remote write.
|
||||||
// TODO: implement some form of metadata storage
|
|
||||||
AppendMetadata bool
|
AppendMetadata bool
|
||||||
// Option to increase the interval used by scrape manager to throttle target groups updates.
|
// Option to increase the interval used by scrape manager to throttle target groups updates.
|
||||||
DiscoveryReloadInterval model.Duration
|
DiscoveryReloadInterval model.Duration
|
||||||
|
|
||||||
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
|
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
|
||||||
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
|
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
|
||||||
EnableStartTimestampZeroIngestion bool
|
EnableStartTimestampZeroIngestion bool
|
||||||
|
|
||||||
// EnableTypeAndUnitLabels
|
// EnableTypeAndUnitLabels represents type-and-unit-labels feature flag.
|
||||||
EnableTypeAndUnitLabels bool
|
EnableTypeAndUnitLabels bool
|
||||||
|
|
||||||
// Optional HTTP client options to use when scraping.
|
// Optional HTTP client options to use when scraping.
|
||||||
@ -102,7 +102,9 @@ type Options struct {
|
|||||||
type Manager struct {
|
type Manager struct {
|
||||||
opts *Options
|
opts *Options
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
append storage.Appendable
|
|
||||||
|
appendable storage.Appendable
|
||||||
|
|
||||||
graceShut chan struct{}
|
graceShut chan struct{}
|
||||||
|
|
||||||
offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup.
|
offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup.
|
||||||
@ -183,7 +185,7 @@ func (m *Manager) reload() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m.metrics.targetScrapePools.Inc()
|
m.metrics.targetScrapePools.Inc()
|
||||||
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
|
sp, err := newScrapePool(scrapeConfig, m.appendable, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.metrics.targetScrapePoolsFailed.Inc()
|
m.metrics.targetScrapePoolsFailed.Inc()
|
||||||
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)
|
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2013 The Prometheus Authors
|
// Copyright The Prometheus Authors
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
@ -51,6 +51,7 @@ import (
|
|||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
"github.com/prometheus/prometheus/util/runutil"
|
"github.com/prometheus/prometheus/util/runutil"
|
||||||
|
"github.com/prometheus/prometheus/util/teststorage"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -764,7 +765,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
|
|||||||
for _, testWithST := range []bool{false, true} {
|
for _, testWithST := range []bool{false, true} {
|
||||||
t.Run(fmt.Sprintf("withST=%v", testWithST), func(t *testing.T) {
|
t.Run(fmt.Sprintf("withST=%v", testWithST), func(t *testing.T) {
|
||||||
for _, testSTZeroIngest := range []bool{false, true} {
|
for _, testSTZeroIngest := range []bool{false, true} {
|
||||||
t.Run(fmt.Sprintf("ctZeroIngest=%v", testSTZeroIngest), func(t *testing.T) {
|
t.Run(fmt.Sprintf("stZeroIngest=%v", testSTZeroIngest), func(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -777,11 +778,11 @@ func TestManagerSTZeroIngestion(t *testing.T) {
|
|||||||
// TODO(bwplotka): Add more types than just counter?
|
// TODO(bwplotka): Add more types than just counter?
|
||||||
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs)
|
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs)
|
||||||
|
|
||||||
app := &collectResultAppender{}
|
app := teststorage.NewAppender()
|
||||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||||
EnableStartTimestampZeroIngestion: testSTZeroIngest,
|
EnableStartTimestampZeroIngestion: testSTZeroIngest,
|
||||||
skipOffsetting: true,
|
skipOffsetting: true,
|
||||||
}, &collectResultAppendable{app})
|
}, app)
|
||||||
defer scrapeManager.Stop()
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)
|
server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)
|
||||||
@ -806,11 +807,8 @@ scrape_configs:
|
|||||||
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
||||||
app.mtx.Lock()
|
|
||||||
defer app.mtx.Unlock()
|
|
||||||
|
|
||||||
// Check if scrape happened and grab the relevant samples.
|
// Check if scrape happened and grab the relevant samples.
|
||||||
if len(app.resultFloats) > 0 {
|
if app.ResultSamplesGreaterThan(0) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return errors.New("expected some float samples, got none")
|
return errors.New("expected some float samples, got none")
|
||||||
@ -818,32 +816,32 @@ scrape_configs:
|
|||||||
|
|
||||||
// Verify results.
|
// Verify results.
|
||||||
// Verify what we got vs expectations around ST injection.
|
// Verify what we got vs expectations around ST injection.
|
||||||
samples := findSamplesForMetric(app.resultFloats, expectedMetricName)
|
got := findSamplesForMetric(app.PendingSamples, expectedMetricName)
|
||||||
if testWithST && testSTZeroIngest {
|
if testWithST && testSTZeroIngest {
|
||||||
require.Len(t, samples, 2)
|
require.Len(t, got, 2)
|
||||||
require.Equal(t, 0.0, samples[0].f)
|
require.Equal(t, 0.0, got[0].V)
|
||||||
require.Equal(t, timestamp.FromTime(stTs), samples[0].t)
|
require.Equal(t, timestamp.FromTime(stTs), got[0].T)
|
||||||
require.Equal(t, expectedSampleValue, samples[1].f)
|
require.Equal(t, expectedSampleValue, got[1].V)
|
||||||
require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t)
|
require.Equal(t, timestamp.FromTime(sampleTs), got[1].T)
|
||||||
} else {
|
} else {
|
||||||
require.Len(t, samples, 1)
|
require.Len(t, got, 1)
|
||||||
require.Equal(t, expectedSampleValue, samples[0].f)
|
require.Equal(t, expectedSampleValue, got[0].V)
|
||||||
require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t)
|
require.Equal(t, timestamp.FromTime(sampleTs), got[0].T)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify what we got vs expectations around additional _created series for OM text.
|
// Verify what we got vs expectations around additional _created series for OM text.
|
||||||
// enableSTZeroInjection also kills that _created line.
|
// enableSTZeroInjection also kills that _created line.
|
||||||
createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName)
|
stSeriesSaender := findSamplesForMetric(app.PendingSamples, expectedCreatedMetricName)
|
||||||
if testFormat == config.OpenMetricsText1_0_0 && testWithST && !testSTZeroIngest {
|
if testFormat == config.OpenMetricsText1_0_0 && testWithST && !testSTZeroIngest {
|
||||||
// For OM Text, when counter has ST, and feature flag disabled we should see _created lines.
|
// For OM Text, when counter has ST, and feature flag disabled we should see _created lines.
|
||||||
require.Len(t, createdSeriesSamples, 1)
|
require.Len(t, stSeriesSaender, 1)
|
||||||
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
|
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
|
||||||
// We don't check the st timestamp as explicit ts was not implemented in expfmt.Encoder,
|
// We don't check the st timestamp as explicit ts was not implemented in expfmt.Encoder,
|
||||||
// but exists in OM https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
|
// but exists in OM https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
|
||||||
// We can implement this, but we want to potentially get rid of OM 1.0 ST lines
|
// We can implement this, but we want to potentially get rid of OM 1.0 ST lines
|
||||||
require.Equal(t, float64(timestamppb.New(stTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f)
|
require.Equal(t, float64(timestamppb.New(stTs).AsTime().UnixNano())/1e9, stSeriesSaender[0].V)
|
||||||
} else {
|
} else {
|
||||||
require.Empty(t, createdSeriesSamples)
|
require.Empty(t, stSeriesSaender)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -885,9 +883,9 @@ func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
|
func findSamplesForMetric(floats []sample, metricName string) (ret []sample) {
|
||||||
for _, f := range floats {
|
for _, f := range floats {
|
||||||
if f.metric.Get(model.MetricNameLabel) == metricName {
|
if f.L.Get(model.MetricNameLabel) == metricName {
|
||||||
ret = append(ret, f)
|
ret = append(ret, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -964,11 +962,11 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
app := &collectResultAppender{}
|
app := teststorage.NewAppender()
|
||||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||||
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
|
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
|
||||||
skipOffsetting: true,
|
skipOffsetting: true,
|
||||||
}, &collectResultAppendable{app})
|
}, app)
|
||||||
defer scrapeManager.Stop()
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
once := sync.Once{}
|
once := sync.Once{}
|
||||||
@ -1012,43 +1010,33 @@ scrape_configs:
|
|||||||
`, serverURL.Host)
|
`, serverURL.Host)
|
||||||
applyConfig(t, testConfig, scrapeManager, discoveryManager)
|
applyConfig(t, testConfig, scrapeManager, discoveryManager)
|
||||||
|
|
||||||
var got []histogramSample
|
|
||||||
|
|
||||||
// Wait for one scrape.
|
// Wait for one scrape.
|
||||||
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
||||||
app.mtx.Lock()
|
if app.ResultSamplesGreaterThan(0) {
|
||||||
defer app.mtx.Unlock()
|
|
||||||
|
|
||||||
// Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug
|
|
||||||
// and it's not worth waiting.
|
|
||||||
for _, h := range app.resultHistograms {
|
|
||||||
if h.metric.Get(model.MetricNameLabel) == mName {
|
|
||||||
got = append(got, h)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(app.resultHistograms) > 0 {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return errors.New("expected some histogram samples, got none")
|
return errors.New("expected some histogram samples, got none")
|
||||||
}), "after 1 minute")
|
}), "after 1 minute")
|
||||||
|
|
||||||
|
got := findSamplesForMetric(app.PendingSamples, mName)
|
||||||
|
|
||||||
// Check for zero samples, assuming we only injected always one histogram sample.
|
// Check for zero samples, assuming we only injected always one histogram sample.
|
||||||
// Did it contain ST to inject? If yes, was ST zero enabled?
|
// Did it contain ST to inject? If yes, was ST zero enabled?
|
||||||
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableSTZeroIngestion {
|
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableSTZeroIngestion {
|
||||||
require.Len(t, got, 2)
|
require.Len(t, got, 2)
|
||||||
// Zero sample.
|
// Zero sample.
|
||||||
require.Equal(t, histogram.Histogram{}, *got[0].h)
|
require.Equal(t, histogram.Histogram{}, *got[0].H)
|
||||||
// Quick soft check to make sure it's the same sample or at least not zero.
|
// Quick soft check to make sure it's the same sample or at least not zero.
|
||||||
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum)
|
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].H.Sum)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expect only one, valid sample.
|
// Expect only one, valid sample.
|
||||||
require.Len(t, got, 1)
|
require.Len(t, got, 1)
|
||||||
// Quick soft check to make sure it's the same sample or at least not zero.
|
// Quick soft check to make sure it's the same sample or at least not zero.
|
||||||
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum)
|
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].H.Sum)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1083,11 +1071,11 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
|
|||||||
|
|
||||||
ctx := t.Context()
|
ctx := t.Context()
|
||||||
|
|
||||||
app := &collectResultAppender{}
|
app := teststorage.NewAppender()
|
||||||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||||
EnableStartTimestampZeroIngestion: true,
|
EnableStartTimestampZeroIngestion: true,
|
||||||
skipOffsetting: true,
|
skipOffsetting: true,
|
||||||
}, &collectResultAppendable{app})
|
}, app)
|
||||||
defer scrapeManager.Stop()
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
once := sync.Once{}
|
once := sync.Once{}
|
||||||
@ -1146,33 +1134,19 @@ scrape_configs:
|
|||||||
return exists
|
return exists
|
||||||
}, 5*time.Second, 100*time.Millisecond, "scrape pool should be created for job 'test'")
|
}, 5*time.Second, 100*time.Millisecond, "scrape pool should be created for job 'test'")
|
||||||
|
|
||||||
// Helper function to get matching histograms to avoid race conditions.
|
|
||||||
getMatchingHistograms := func() []histogramSample {
|
|
||||||
app.mtx.Lock()
|
|
||||||
defer app.mtx.Unlock()
|
|
||||||
|
|
||||||
var got []histogramSample
|
|
||||||
for _, h := range app.resultHistograms {
|
|
||||||
if h.metric.Get(model.MetricNameLabel) == mName {
|
|
||||||
got = append(got, h)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return got
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
return len(getMatchingHistograms()) > 0
|
return app.ResultSamplesGreaterThan(0)
|
||||||
}, 1*time.Minute, 100*time.Millisecond, "expected histogram samples, got none")
|
}, 1*time.Minute, 100*time.Millisecond, "expected histogram samples, got none")
|
||||||
|
|
||||||
// Verify that samples were ingested (proving both features work together).
|
// Verify that samples were ingested (proving both features work together).
|
||||||
got := getMatchingHistograms()
|
got := findSamplesForMetric(app.PendingSamples, mName)
|
||||||
|
|
||||||
// With ST zero ingestion enabled and a created timestamp present, we expect 2 samples:
|
// With ST zero ingestion enabled and a created timestamp present, we expect 2 samples:
|
||||||
// one zero sample and one actual sample.
|
// one zero sample and one actual sample.
|
||||||
require.Len(t, got, 2, "expected 2 histogram samples (zero sample + actual sample)")
|
require.Len(t, got, 2, "expected 2 histogram samples (zero sample + actual sample)")
|
||||||
require.Equal(t, histogram.Histogram{}, *got[0].h, "first sample should be zero sample")
|
require.Equal(t, histogram.Histogram{}, *got[0].H, "first sample should be zero sample")
|
||||||
require.InDelta(t, expectedHistogramSum, got[1].h.Sum, 1e-9, "second sample should retain the expected sum")
|
require.InDelta(t, expectedHistogramSum, got[1].H.Sum, 1e-9, "second sample should retain the expected sum")
|
||||||
require.Len(t, app.resultExemplars, 2, "expected 2 exemplars from histogram buckets")
|
require.Len(t, got[1].ES, 2, "expected 2 exemplars on second histogram")
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyConfig(
|
func applyConfig(
|
||||||
|
|||||||
449
scrape/scrape.go
449
scrape/scrape.go
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2016 The Prometheus Authors
|
// Copyright The Prometheus Authors
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
@ -81,6 +81,7 @@ type FailureLogger interface {
|
|||||||
// scrapePool manages scrapes for sets of targets.
|
// scrapePool manages scrapes for sets of targets.
|
||||||
type scrapePool struct {
|
type scrapePool struct {
|
||||||
appendable storage.Appendable
|
appendable storage.Appendable
|
||||||
|
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
httpOpts []config_util.HTTPClientOption
|
httpOpts []config_util.HTTPClientOption
|
||||||
@ -147,7 +148,17 @@ const maxAheadTime = 10 * time.Minute
|
|||||||
// returning an empty label set is interpreted as "drop".
|
// returning an empty label set is interpreted as "drop".
|
||||||
type labelsMutator func(labels.Labels) labels.Labels
|
type labelsMutator func(labels.Labels) labels.Labels
|
||||||
|
|
||||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
|
// scrapeLoopAppendAdapter allows support for multiple storage.Appender versions.
|
||||||
|
// NOTE(bwplotka): Required for: https://github.com/prometheus/prometheus/pull/17610
|
||||||
|
type scrapeLoopAppendAdapter interface {
|
||||||
|
Commit() error
|
||||||
|
Rollback() error
|
||||||
|
|
||||||
|
addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) error
|
||||||
|
append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newScrapePool(cfg *config.ScrapeConfig, appendable storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = promslog.NewNopLogger()
|
logger = promslog.NewNopLogger()
|
||||||
}
|
}
|
||||||
@ -169,7 +180,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
sp := &scrapePool{
|
sp := &scrapePool{
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
appendable: app,
|
appendable: appendable,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
client: client,
|
client: client,
|
||||||
activeTargets: map[uint64]*Target{},
|
activeTargets: map[uint64]*Target{},
|
||||||
@ -183,55 +194,75 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
|
|||||||
escapingScheme: escapingScheme,
|
escapingScheme: escapingScheme,
|
||||||
}
|
}
|
||||||
sp.newLoop = func(opts scrapeLoopOptions) loop {
|
sp.newLoop = func(opts scrapeLoopOptions) loop {
|
||||||
// Update the targets retrieval function for metadata to a new scrape cache.
|
// NOTE: Formatting matches scrapeLoop fields order for readability.
|
||||||
cache := opts.cache
|
sl := &scrapeLoop{
|
||||||
if cache == nil {
|
buffers: buffers,
|
||||||
cache = newScrapeCache(metrics)
|
appendable: appendable,
|
||||||
}
|
sampleMutator: func(l labels.Labels) labels.Labels {
|
||||||
opts.target.SetMetadataStore(cache)
|
|
||||||
|
|
||||||
return newScrapeLoop(
|
|
||||||
ctx,
|
|
||||||
opts.scraper,
|
|
||||||
logger.With("target", opts.target),
|
|
||||||
buffers,
|
|
||||||
func(l labels.Labels) labels.Labels {
|
|
||||||
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
|
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
|
||||||
},
|
},
|
||||||
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
|
reportSampleMutator: func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
|
||||||
func(ctx context.Context) storage.Appender { return app.Appender(ctx) },
|
offsetSeed: offsetSeed,
|
||||||
cache,
|
metrics: metrics,
|
||||||
sp.symbolTable,
|
|
||||||
offsetSeed,
|
symbolTable: sp.symbolTable,
|
||||||
opts.honorTimestamps,
|
validationScheme: sp.validationScheme,
|
||||||
opts.trackTimestampsStaleness,
|
escapingScheme: sp.escapingScheme,
|
||||||
opts.enableCompression,
|
|
||||||
opts.sampleLimit,
|
enableNativeHistogramScraping: cfg.ScrapeNativeHistogramsEnabled(),
|
||||||
opts.bucketLimit,
|
|
||||||
opts.maxSchema,
|
enableSTZeroIngestion: options.EnableStartTimestampZeroIngestion,
|
||||||
opts.labelLimits,
|
enableTypeAndUnitLabels: options.EnableTypeAndUnitLabels,
|
||||||
opts.interval,
|
reportExtraMetrics: options.ExtraMetrics,
|
||||||
opts.timeout,
|
appendMetadataToWAL: options.AppendMetadata,
|
||||||
opts.alwaysScrapeClassicHist,
|
skipOffsetting: options.skipOffsetting,
|
||||||
opts.convertClassicHistToNHCB,
|
|
||||||
cfg.ScrapeNativeHistogramsEnabled(),
|
scrapeLoopOptions: opts,
|
||||||
options.EnableStartTimestampZeroIngestion,
|
}
|
||||||
options.EnableTypeAndUnitLabels,
|
sl.init(ctx, options.PassMetadataInContext)
|
||||||
options.ExtraMetrics,
|
return sl
|
||||||
options.AppendMetadata,
|
|
||||||
opts.target,
|
|
||||||
options.PassMetadataInContext,
|
|
||||||
metrics,
|
|
||||||
options.skipOffsetting,
|
|
||||||
sp.validationScheme,
|
|
||||||
sp.escapingScheme,
|
|
||||||
opts.fallbackScrapeProtocol,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
||||||
return sp, nil
|
return sp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// init prepares scrapeLoop after raw construction.
|
||||||
|
// NOTE: While newScrapeLoop constructor pattern would be safer, it has proven to be
|
||||||
|
// highly not readable (too many params). Instead, we follow init pattern.
|
||||||
|
func (sl *scrapeLoop) init(ctx context.Context, passMetadataInContext bool) {
|
||||||
|
if sl.l == nil {
|
||||||
|
sl.l = promslog.NewNopLogger()
|
||||||
|
}
|
||||||
|
sl.parentCtx = ctx
|
||||||
|
sl.stopped = make(chan struct{})
|
||||||
|
if sl.buffers == nil {
|
||||||
|
sl.buffers = pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) })
|
||||||
|
}
|
||||||
|
if sl.cache == nil {
|
||||||
|
sl.cache = newScrapeCache(sl.metrics)
|
||||||
|
if sl.target != nil {
|
||||||
|
// Update the targets retrieval function for metadata to a new scrape cache.
|
||||||
|
sl.target.SetMetadataStore(sl.cache)
|
||||||
|
// TODO(bwplotka): Not sure why, but doing this before sl.target.SetMetadataStore(sl.cache) blocks goroutines...
|
||||||
|
// Debug, something is odd.
|
||||||
|
sl.l = sl.l.With("target", sl.target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
appenderCtx := ctx
|
||||||
|
if passMetadataInContext {
|
||||||
|
// Store the cache and target in the context. This is then used by downstream OTel Collector
|
||||||
|
// to lookup the metadata required to process the samples. Not used by Prometheus itself.
|
||||||
|
// TODO(gouthamve) We're using a dedicated context because using the parentCtx caused a memory
|
||||||
|
// leak. We should ideally fix the main leak. See: https://github.com/prometheus/prometheus/pull/10590
|
||||||
|
// TODO(bwplotka): Remove once OpenTelemetry collector uses AppenderV2 (add issue)
|
||||||
|
appenderCtx = ContextWithMetricMetadataStore(appenderCtx, sl.cache)
|
||||||
|
appenderCtx = ContextWithTarget(appenderCtx, sl.target)
|
||||||
|
}
|
||||||
|
sl.appenderCtx = appenderCtx
|
||||||
|
sl.ctx, sl.cancel = context.WithCancel(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (sp *scrapePool) ActiveTargets() []*Target {
|
func (sp *scrapePool) ActiveTargets() []*Target {
|
||||||
sp.targetMtx.Lock()
|
sp.targetMtx.Lock()
|
||||||
defer sp.targetMtx.Unlock()
|
defer sp.targetMtx.Unlock()
|
||||||
@ -392,6 +423,8 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
t := sp.activeTargets[fp]
|
t := sp.activeTargets[fp]
|
||||||
|
// Update the targets retrieval function for metadata to a new target.
|
||||||
|
t.SetMetadataStore(cache)
|
||||||
targetInterval, targetTimeout, err := t.intervalAndTimeout(interval, timeout)
|
targetInterval, targetTimeout, err := t.intervalAndTimeout(interval, timeout)
|
||||||
var (
|
var (
|
||||||
s = &targetScraper{
|
s = &targetScraper{
|
||||||
@ -753,39 +786,6 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels
|
|||||||
return lb.Labels()
|
return lb.Labels()
|
||||||
}
|
}
|
||||||
|
|
||||||
// appender returns an appender for ingested samples from the target.
|
|
||||||
func appender(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender {
|
|
||||||
app = &timeLimitAppender{
|
|
||||||
Appender: app,
|
|
||||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
|
||||||
}
|
|
||||||
|
|
||||||
// The sampleLimit is applied after metrics are potentially dropped via relabeling.
|
|
||||||
if sampleLimit > 0 {
|
|
||||||
app = &limitAppender{
|
|
||||||
Appender: app,
|
|
||||||
limit: sampleLimit,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if bucketLimit > 0 {
|
|
||||||
app = &bucketLimitAppender{
|
|
||||||
Appender: app,
|
|
||||||
limit: bucketLimit,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if maxSchema < histogram.ExponentialSchemaMax {
|
|
||||||
app = &maxSchemaAppender{
|
|
||||||
Appender: app,
|
|
||||||
maxSchema: maxSchema,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return app
|
|
||||||
}
|
|
||||||
|
|
||||||
// A scraper retrieves samples and accepts a status report at the end.
|
|
||||||
type scraper interface {
|
type scraper interface {
|
||||||
scrape(ctx context.Context) (*http.Response, error)
|
scrape(ctx context.Context) (*http.Response, error)
|
||||||
readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error)
|
readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error)
|
||||||
@ -931,55 +931,49 @@ type cacheEntry struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type scrapeLoop struct {
|
type scrapeLoop struct {
|
||||||
scraper scraper
|
// Parameters.
|
||||||
l *slog.Logger
|
|
||||||
scrapeFailureLogger FailureLogger
|
|
||||||
scrapeFailureLoggerMtx sync.RWMutex
|
|
||||||
cache *scrapeCache
|
|
||||||
lastScrapeSize int
|
|
||||||
buffers *pool.Pool
|
|
||||||
offsetSeed uint64
|
|
||||||
honorTimestamps bool
|
|
||||||
trackTimestampsStaleness bool
|
|
||||||
enableCompression bool
|
|
||||||
forcedErr error
|
|
||||||
forcedErrMtx sync.Mutex
|
|
||||||
sampleLimit int
|
|
||||||
bucketLimit int
|
|
||||||
maxSchema int32
|
|
||||||
labelLimits *labelLimits
|
|
||||||
interval time.Duration
|
|
||||||
timeout time.Duration
|
|
||||||
validationScheme model.ValidationScheme
|
|
||||||
escapingScheme model.EscapingScheme
|
|
||||||
|
|
||||||
alwaysScrapeClassicHist bool
|
|
||||||
convertClassicHistToNHCB bool
|
|
||||||
enableSTZeroIngestion bool
|
|
||||||
enableTypeAndUnitLabels bool
|
|
||||||
fallbackScrapeProtocol string
|
|
||||||
|
|
||||||
enableNativeHistogramScraping bool
|
|
||||||
|
|
||||||
appender func(ctx context.Context) storage.Appender
|
|
||||||
symbolTable *labels.SymbolTable
|
|
||||||
sampleMutator labelsMutator
|
|
||||||
reportSampleMutator labelsMutator
|
|
||||||
|
|
||||||
parentCtx context.Context
|
|
||||||
appenderCtx context.Context
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
|
parentCtx context.Context
|
||||||
disabledEndOfRunStalenessMarkers atomic.Bool
|
appenderCtx context.Context
|
||||||
|
l *slog.Logger
|
||||||
reportExtraMetrics bool
|
buffers *pool.Pool
|
||||||
appendMetadataToWAL bool
|
appendable storage.Appendable
|
||||||
|
sampleMutator labelsMutator
|
||||||
|
reportSampleMutator labelsMutator
|
||||||
|
offsetSeed uint64
|
||||||
metrics *scrapeMetrics
|
metrics *scrapeMetrics
|
||||||
|
|
||||||
|
// Scrape pool shared data.
|
||||||
|
symbolTable *labels.SymbolTable
|
||||||
|
validationScheme model.ValidationScheme
|
||||||
|
escapingScheme model.EscapingScheme
|
||||||
|
|
||||||
|
// Options inherited from config.ScrapeConfig.
|
||||||
|
enableNativeHistogramScraping bool
|
||||||
|
|
||||||
|
// Options inherited from scrape.Options.
|
||||||
|
enableSTZeroIngestion bool
|
||||||
|
enableTypeAndUnitLabels bool
|
||||||
|
reportExtraMetrics bool
|
||||||
|
appendMetadataToWAL bool
|
||||||
skipOffsetting bool // For testability.
|
skipOffsetting bool // For testability.
|
||||||
|
|
||||||
|
// Common options.
|
||||||
|
scrapeLoopOptions
|
||||||
|
|
||||||
|
// error injection through setForcedError.
|
||||||
|
forcedErr error
|
||||||
|
forcedErrMtx sync.Mutex
|
||||||
|
|
||||||
|
// Special logger set on setScrapeFailureLogger
|
||||||
|
scrapeFailureLoggerMtx sync.RWMutex
|
||||||
|
scrapeFailureLogger FailureLogger
|
||||||
|
|
||||||
|
// Locally cached data.
|
||||||
|
lastScrapeSize int
|
||||||
|
disabledEndOfRunStalenessMarkers atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
||||||
@ -1004,8 +998,8 @@ type scrapeCache struct {
|
|||||||
seriesCur map[storage.SeriesRef]*cacheEntry
|
seriesCur map[storage.SeriesRef]*cacheEntry
|
||||||
seriesPrev map[storage.SeriesRef]*cacheEntry
|
seriesPrev map[storage.SeriesRef]*cacheEntry
|
||||||
|
|
||||||
// TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to
|
// TODO(bwplotka): Consider moving metadata caching to head. See
|
||||||
// avoid locking (using metadata API can block scraping).
|
// https://github.com/prometheus/prometheus/issues/17619.
|
||||||
metaMtx sync.Mutex // Mutex is needed due to api touching it when metadata is queried.
|
metaMtx sync.Mutex // Mutex is needed due to api touching it when metadata is queried.
|
||||||
metadata map[string]*metaEntry // metadata by metric family name.
|
metadata map[string]*metaEntry // metadata by metric family name.
|
||||||
|
|
||||||
@ -1240,101 +1234,6 @@ func (c *scrapeCache) LengthMetadata() int {
|
|||||||
return len(c.metadata)
|
return len(c.metadata)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScrapeLoop(ctx context.Context,
|
|
||||||
sc scraper,
|
|
||||||
l *slog.Logger,
|
|
||||||
buffers *pool.Pool,
|
|
||||||
sampleMutator labelsMutator,
|
|
||||||
reportSampleMutator labelsMutator,
|
|
||||||
appender func(ctx context.Context) storage.Appender,
|
|
||||||
cache *scrapeCache,
|
|
||||||
symbolTable *labels.SymbolTable,
|
|
||||||
offsetSeed uint64,
|
|
||||||
honorTimestamps bool,
|
|
||||||
trackTimestampsStaleness bool,
|
|
||||||
enableCompression bool,
|
|
||||||
sampleLimit int,
|
|
||||||
bucketLimit int,
|
|
||||||
maxSchema int32,
|
|
||||||
labelLimits *labelLimits,
|
|
||||||
interval time.Duration,
|
|
||||||
timeout time.Duration,
|
|
||||||
alwaysScrapeClassicHist bool,
|
|
||||||
convertClassicHistToNHCB bool,
|
|
||||||
enableNativeHistogramScraping bool,
|
|
||||||
enableSTZeroIngestion bool,
|
|
||||||
enableTypeAndUnitLabels bool,
|
|
||||||
reportExtraMetrics bool,
|
|
||||||
appendMetadataToWAL bool,
|
|
||||||
target *Target,
|
|
||||||
passMetadataInContext bool,
|
|
||||||
metrics *scrapeMetrics,
|
|
||||||
skipOffsetting bool,
|
|
||||||
validationScheme model.ValidationScheme,
|
|
||||||
escapingScheme model.EscapingScheme,
|
|
||||||
fallbackScrapeProtocol string,
|
|
||||||
) *scrapeLoop {
|
|
||||||
if l == nil {
|
|
||||||
l = promslog.NewNopLogger()
|
|
||||||
}
|
|
||||||
if buffers == nil {
|
|
||||||
buffers = pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) })
|
|
||||||
}
|
|
||||||
if cache == nil {
|
|
||||||
cache = newScrapeCache(metrics)
|
|
||||||
}
|
|
||||||
|
|
||||||
appenderCtx := ctx
|
|
||||||
|
|
||||||
if passMetadataInContext {
|
|
||||||
// Store the cache and target in the context. This is then used by downstream OTel Collector
|
|
||||||
// to lookup the metadata required to process the samples. Not used by Prometheus itself.
|
|
||||||
// TODO(gouthamve) We're using a dedicated context because using the parentCtx caused a memory
|
|
||||||
// leak. We should ideally fix the main leak. See: https://github.com/prometheus/prometheus/pull/10590
|
|
||||||
appenderCtx = ContextWithMetricMetadataStore(appenderCtx, cache)
|
|
||||||
appenderCtx = ContextWithTarget(appenderCtx, target)
|
|
||||||
}
|
|
||||||
|
|
||||||
sl := &scrapeLoop{
|
|
||||||
scraper: sc,
|
|
||||||
buffers: buffers,
|
|
||||||
cache: cache,
|
|
||||||
appender: appender,
|
|
||||||
symbolTable: symbolTable,
|
|
||||||
sampleMutator: sampleMutator,
|
|
||||||
reportSampleMutator: reportSampleMutator,
|
|
||||||
stopped: make(chan struct{}),
|
|
||||||
offsetSeed: offsetSeed,
|
|
||||||
l: l,
|
|
||||||
parentCtx: ctx,
|
|
||||||
appenderCtx: appenderCtx,
|
|
||||||
honorTimestamps: honorTimestamps,
|
|
||||||
trackTimestampsStaleness: trackTimestampsStaleness,
|
|
||||||
enableCompression: enableCompression,
|
|
||||||
sampleLimit: sampleLimit,
|
|
||||||
bucketLimit: bucketLimit,
|
|
||||||
maxSchema: maxSchema,
|
|
||||||
labelLimits: labelLimits,
|
|
||||||
interval: interval,
|
|
||||||
timeout: timeout,
|
|
||||||
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
|
|
||||||
convertClassicHistToNHCB: convertClassicHistToNHCB,
|
|
||||||
enableSTZeroIngestion: enableSTZeroIngestion,
|
|
||||||
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
|
||||||
fallbackScrapeProtocol: fallbackScrapeProtocol,
|
|
||||||
enableNativeHistogramScraping: enableNativeHistogramScraping,
|
|
||||||
reportExtraMetrics: reportExtraMetrics,
|
|
||||||
appendMetadataToWAL: appendMetadataToWAL,
|
|
||||||
metrics: metrics,
|
|
||||||
skipOffsetting: skipOffsetting,
|
|
||||||
validationScheme: validationScheme,
|
|
||||||
escapingScheme: escapingScheme,
|
|
||||||
}
|
|
||||||
sl.ctx, sl.cancel = context.WithCancel(ctx)
|
|
||||||
|
|
||||||
return sl
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
|
func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
|
||||||
sl.scrapeFailureLoggerMtx.Lock()
|
sl.scrapeFailureLoggerMtx.Lock()
|
||||||
defer sl.scrapeFailureLoggerMtx.Unlock()
|
defer sl.scrapeFailureLoggerMtx.Unlock()
|
||||||
@ -1411,6 +1310,11 @@ mainLoop:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {
|
||||||
|
// TODO(bwplotka): Here we will inject AppenderV2 eventually. See https://github.com/prometheus/prometheus/pull/17610
|
||||||
|
return &scrapeLoopAppender{scrapeLoop: sl, Appender: sl.appendable.Appender(sl.appenderCtx)}
|
||||||
|
}
|
||||||
|
|
||||||
// scrapeAndReport performs a scrape and then appends the result to the storage
|
// scrapeAndReport performs a scrape and then appends the result to the storage
|
||||||
// together with reporting metrics, by using as few appenders as possible.
|
// together with reporting metrics, by using as few appenders as possible.
|
||||||
// In the happy scenario, a single appender is used.
|
// In the happy scenario, a single appender is used.
|
||||||
@ -1432,20 +1336,20 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
|
|||||||
var total, added, seriesAdded, bytesRead int
|
var total, added, seriesAdded, bytesRead int
|
||||||
var err, appErr, scrapeErr error
|
var err, appErr, scrapeErr error
|
||||||
|
|
||||||
app := sl.appender(sl.appenderCtx)
|
sla := sl.appender()
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
app.Rollback()
|
_ = sla.Rollback()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = app.Commit()
|
err = sla.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sl.l.Error("Scrape commit failed", "err", err)
|
sl.l.Error("Scrape commit failed", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytesRead, scrapeErr); err != nil {
|
if err = sl.report(sla, appendTime, time.Since(start), total, added, seriesAdded, bytesRead, scrapeErr); err != nil {
|
||||||
sl.l.Warn("Appending scrape report failed", "err", err)
|
sl.l.Warn("Appending scrape report failed", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -1453,9 +1357,9 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
|
|||||||
if forcedErr := sl.getForcedError(); forcedErr != nil {
|
if forcedErr := sl.getForcedError(); forcedErr != nil {
|
||||||
scrapeErr = forcedErr
|
scrapeErr = forcedErr
|
||||||
// Add stale markers.
|
// Add stale markers.
|
||||||
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
if _, _, _, err := sla.append([]byte{}, "", appendTime); err != nil {
|
||||||
app.Rollback()
|
_ = sla.Rollback()
|
||||||
app = sl.appender(sl.appenderCtx)
|
sla = sl.appender()
|
||||||
sl.l.Warn("Append failed", "err", err)
|
sl.l.Warn("Append failed", "err", err)
|
||||||
}
|
}
|
||||||
if errc != nil {
|
if errc != nil {
|
||||||
@ -1505,16 +1409,16 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
|
|||||||
|
|
||||||
// A failed scrape is the same as an empty scrape,
|
// A failed scrape is the same as an empty scrape,
|
||||||
// we still call sl.append to trigger stale markers.
|
// we still call sl.append to trigger stale markers.
|
||||||
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
|
total, added, seriesAdded, appErr = sla.append(b, contentType, appendTime)
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
app.Rollback()
|
_ = sla.Rollback()
|
||||||
app = sl.appender(sl.appenderCtx)
|
sla = sl.appender()
|
||||||
sl.l.Debug("Append failed", "err", appErr)
|
sl.l.Debug("Append failed", "err", appErr)
|
||||||
// The append failed, probably due to a parse error or sample limit.
|
// The append failed, probably due to a parse error or sample limit.
|
||||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||||
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
if _, _, _, err := sla.append([]byte{}, "", appendTime); err != nil {
|
||||||
app.Rollback()
|
_ = sla.Rollback()
|
||||||
app = sl.appender(sl.appenderCtx)
|
sla = sl.appender()
|
||||||
sl.l.Warn("Append failed", "err", err)
|
sl.l.Warn("Append failed", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1584,24 +1488,24 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
|||||||
// If the target has since been recreated and scraped, the
|
// If the target has since been recreated and scraped, the
|
||||||
// stale markers will be out of order and ignored.
|
// stale markers will be out of order and ignored.
|
||||||
// sl.context would have been cancelled, hence using sl.appenderCtx.
|
// sl.context would have been cancelled, hence using sl.appenderCtx.
|
||||||
app := sl.appender(sl.appenderCtx)
|
sla := sl.appender()
|
||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
app.Rollback()
|
_ = sla.Rollback()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = app.Commit()
|
err = sla.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sl.l.Warn("Stale commit failed", "err", err)
|
sl.l.Warn("Stale commit failed", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
|
if _, _, _, err = sla.append([]byte{}, "", staleTime); err != nil {
|
||||||
app.Rollback()
|
_ = sla.Rollback()
|
||||||
app = sl.appender(sl.appenderCtx)
|
sla = sl.appender()
|
||||||
sl.l.Warn("Stale append failed", "err", err)
|
sl.l.Warn("Stale append failed", "err", err)
|
||||||
}
|
}
|
||||||
if err = sl.reportStale(app, staleTime); err != nil {
|
if err = sl.reportStale(sla, staleTime); err != nil {
|
||||||
sl.l.Warn("Stale report failed", "err", err)
|
sl.l.Warn("Stale report failed", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1646,12 +1550,22 @@ func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (e
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
|
type scrapeLoopAppender struct {
|
||||||
|
*scrapeLoop
|
||||||
|
|
||||||
|
storage.Appender
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ scrapeLoopAppendAdapter = &scrapeLoopAppender{}
|
||||||
|
|
||||||
|
// append for the deprecated storage.Appender flow.
|
||||||
|
// This is only for downstream project migration purposes and will be removed soon.
|
||||||
|
func (sl *scrapeLoopAppender) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
|
||||||
defTime := timestamp.FromTime(ts)
|
defTime := timestamp.FromTime(ts)
|
||||||
|
|
||||||
if len(b) == 0 {
|
if len(b) == 0 {
|
||||||
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
|
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
|
||||||
err = sl.updateStaleMarkers(app, defTime)
|
err = sl.updateStaleMarkers(sl.Appender, defTime)
|
||||||
sl.cache.iterDone(false)
|
sl.cache.iterDone(false)
|
||||||
return total, added, seriesAdded, err
|
return total, added, seriesAdded, err
|
||||||
}
|
}
|
||||||
@ -1694,7 +1608,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
|
|||||||
exemplars := make([]exemplar.Exemplar, 0, 1)
|
exemplars := make([]exemplar.Exemplar, 0, 1)
|
||||||
|
|
||||||
// Take an appender with limits.
|
// Take an appender with limits.
|
||||||
app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
|
app := appenderWithLimits(sl.Appender, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1915,7 +1829,7 @@ loop:
|
|||||||
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
|
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
|
||||||
// However, optional TYPE etc metadata and broken OM text can break this, detect those cases here.
|
// However, optional TYPE etc metadata and broken OM text can break this, detect those cases here.
|
||||||
// TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. ST and NHCB parsing).
|
// TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. ST and NHCB parsing).
|
||||||
if isSeriesPartOfFamily(lset.Get(labels.MetricName), lastMFName, lastMeta.Type) {
|
if isSeriesPartOfFamily(lset.Get(model.MetricNameLabel), lastMFName, lastMeta.Type) {
|
||||||
if _, merr := app.UpdateMetadata(ref, lset, lastMeta.Metadata); merr != nil {
|
if _, merr := app.UpdateMetadata(ref, lset, lastMeta.Metadata); merr != nil {
|
||||||
// No need to fail the scrape on errors appending metadata.
|
// No need to fail the scrape on errors appending metadata.
|
||||||
sl.l.Debug("Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", lastMeta.Metadata), "err", merr)
|
sl.l.Debug("Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", lastMeta.Metadata), "err", merr)
|
||||||
@ -2027,7 +1941,7 @@ func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) boo
|
|||||||
// during normal operation (e.g., accidental cardinality explosion, sudden traffic spikes).
|
// during normal operation (e.g., accidental cardinality explosion, sudden traffic spikes).
|
||||||
// Current case ordering prevents exercising other cases when limits are exceeded.
|
// Current case ordering prevents exercising other cases when limits are exceeded.
|
||||||
// Remaining error cases typically occur only a few times, often during initial setup.
|
// Remaining error cases typically occur only a few times, often during initial setup.
|
||||||
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
|
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (sampleAdded bool, _ error) {
|
||||||
switch {
|
switch {
|
||||||
case err == nil:
|
case err == nil:
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -2139,7 +2053,7 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
|
func (sl *scrapeLoop) report(sla scrapeLoopAppendAdapter, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
|
||||||
sl.scraper.Report(start, duration, scrapeErr)
|
sl.scraper.Report(start, duration, scrapeErr)
|
||||||
|
|
||||||
ts := timestamp.FromTime(start)
|
ts := timestamp.FromTime(start)
|
||||||
@ -2150,71 +2064,70 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
|
|||||||
}
|
}
|
||||||
b := labels.NewBuilderWithSymbolTable(sl.symbolTable)
|
b := labels.NewBuilderWithSymbolTable(sl.symbolTable)
|
||||||
|
|
||||||
if err = sl.addReportSample(app, scrapeHealthMetric, ts, health, b); err != nil {
|
if err = sla.addReportSample(scrapeHealthMetric, ts, health, b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeDurationMetric, ts, duration.Seconds(), b); err != nil {
|
if err = sla.addReportSample(scrapeDurationMetric, ts, duration.Seconds(), b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeSamplesMetric, ts, float64(scraped), b); err != nil {
|
if err = sla.addReportSample(scrapeSamplesMetric, ts, float64(scraped), b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, samplesPostRelabelMetric, ts, float64(added), b); err != nil {
|
if err = sla.addReportSample(samplesPostRelabelMetric, ts, float64(added), b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeSeriesAddedMetric, ts, float64(seriesAdded), b); err != nil {
|
if err = sla.addReportSample(scrapeSeriesAddedMetric, ts, float64(seriesAdded), b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if sl.reportExtraMetrics {
|
if sl.reportExtraMetrics {
|
||||||
if err = sl.addReportSample(app, scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b); err != nil {
|
if err = sla.addReportSample(scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b); err != nil {
|
if err = sla.addReportSample(scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeBodySizeBytesMetric, ts, float64(bytes), b); err != nil {
|
if err = sla.addReportSample(scrapeBodySizeBytesMetric, ts, float64(bytes), b, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) {
|
func (sl *scrapeLoop) reportStale(sla scrapeLoopAppendAdapter, start time.Time) (err error) {
|
||||||
ts := timestamp.FromTime(start)
|
ts := timestamp.FromTime(start)
|
||||||
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
|
|
||||||
stale := math.Float64frombits(value.StaleNaN)
|
stale := math.Float64frombits(value.StaleNaN)
|
||||||
b := labels.NewBuilder(labels.EmptyLabels())
|
b := labels.NewBuilder(labels.EmptyLabels())
|
||||||
|
|
||||||
if err = sl.addReportSample(app, scrapeHealthMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(scrapeHealthMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeDurationMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(scrapeDurationMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeSamplesMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(scrapeSamplesMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, samplesPostRelabelMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(samplesPostRelabelMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeSeriesAddedMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(scrapeSeriesAddedMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if sl.reportExtraMetrics {
|
if sl.reportExtraMetrics {
|
||||||
if err = sl.addReportSample(app, scrapeTimeoutMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(scrapeTimeoutMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeSampleLimitMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(scrapeSampleLimitMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = sl.addReportSample(app, scrapeBodySizeBytesMetric, ts, stale, b); err != nil {
|
if err = sla.addReportSample(scrapeBodySizeBytesMetric, ts, stale, b, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t int64, v float64, b *labels.Builder) error {
|
func (sl *scrapeLoopAppender) addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) error {
|
||||||
ce, ok, _ := sl.cache.get(s.name)
|
ce, ok, _ := sl.cache.get(s.name)
|
||||||
var ref storage.SeriesRef
|
var ref storage.SeriesRef
|
||||||
var lset labels.Labels
|
var lset labels.Labels
|
||||||
@ -2226,18 +2139,22 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t in
|
|||||||
// with scraped metrics in the cache.
|
// with scraped metrics in the cache.
|
||||||
// We have to drop it when building the actual metric.
|
// We have to drop it when building the actual metric.
|
||||||
b.Reset(labels.EmptyLabels())
|
b.Reset(labels.EmptyLabels())
|
||||||
b.Set(labels.MetricName, string(s.name[:len(s.name)-1]))
|
b.Set(model.MetricNameLabel, string(s.name[:len(s.name)-1]))
|
||||||
lset = sl.reportSampleMutator(b.Labels())
|
lset = sl.reportSampleMutator(b.Labels())
|
||||||
}
|
}
|
||||||
|
|
||||||
ref, err := app.Append(ref, lset, t, v)
|
opt := storage.AppendOptions{DiscardOutOfOrder: rejectOOO}
|
||||||
|
sl.SetOptions(&opt)
|
||||||
|
ref, err := sl.Append(ref, lset, t, v)
|
||||||
|
opt.DiscardOutOfOrder = false
|
||||||
|
sl.SetOptions(&opt)
|
||||||
switch {
|
switch {
|
||||||
case err == nil:
|
case err == nil:
|
||||||
if !ok {
|
if !ok {
|
||||||
sl.cache.addRef(s.name, ref, lset, lset.Hash())
|
sl.cache.addRef(s.name, ref, lset, lset.Hash())
|
||||||
// We only need to add metadata once a scrape target appears.
|
// We only need to add metadata once a scrape target appears.
|
||||||
if sl.appendMetadataToWAL {
|
if sl.appendMetadataToWAL {
|
||||||
if _, merr := app.UpdateMetadata(ref, lset, s.Metadata); merr != nil {
|
if _, merr := sl.UpdateMetadata(ref, lset, s.Metadata); merr != nil {
|
||||||
sl.l.Debug("Error when appending metadata in addReportSample", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", s.Metadata), "err", merr)
|
sl.l.Debug("Error when appending metadata in addReportSample", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", s.Metadata), "err", merr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
187
scrape/scrape_append.go
Normal file
187
scrape/scrape_append.go
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
// Copyright The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scrape
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/timestamp"
|
||||||
|
"github.com/prometheus/prometheus/model/value"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
// appenderWithLimits returns an appender with additional validation.
|
||||||
|
func appenderWithLimits(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender {
|
||||||
|
app = &timeLimitAppender{
|
||||||
|
Appender: app,
|
||||||
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||||
|
}
|
||||||
|
|
||||||
|
// The sampleLimit is applied after metrics are potentially dropped via relabeling.
|
||||||
|
if sampleLimit > 0 {
|
||||||
|
app = &limitAppender{
|
||||||
|
Appender: app,
|
||||||
|
limit: sampleLimit,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bucketLimit > 0 {
|
||||||
|
app = &bucketLimitAppender{
|
||||||
|
Appender: app,
|
||||||
|
limit: bucketLimit,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxSchema < histogram.ExponentialSchemaMax {
|
||||||
|
app = &maxSchemaAppender{
|
||||||
|
Appender: app,
|
||||||
|
maxSchema: maxSchema,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|
||||||
|
// limitAppender limits the number of total appended samples in a batch.
|
||||||
|
type limitAppender struct {
|
||||||
|
storage.Appender
|
||||||
|
|
||||||
|
limit int
|
||||||
|
i int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||||
|
// Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero).
|
||||||
|
// This ensures that if a series is already in TSDB then we always write the marker.
|
||||||
|
if ref == 0 || !value.IsStaleNaN(v) {
|
||||||
|
app.i++
|
||||||
|
if app.i > app.limit {
|
||||||
|
return 0, errSampleLimit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ref, err := app.Appender.Append(ref, lset, t, v)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *limitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
// Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero).
|
||||||
|
// This ensures that if a series is already in TSDB then we always write the marker.
|
||||||
|
if ref == 0 || (h != nil && !value.IsStaleNaN(h.Sum)) || (fh != nil && !value.IsStaleNaN(fh.Sum)) {
|
||||||
|
app.i++
|
||||||
|
if app.i > app.limit {
|
||||||
|
return 0, errSampleLimit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type timeLimitAppender struct {
|
||||||
|
storage.Appender
|
||||||
|
|
||||||
|
maxTime int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||||
|
if t > app.maxTime {
|
||||||
|
return 0, storage.ErrOutOfBounds
|
||||||
|
}
|
||||||
|
|
||||||
|
ref, err := app.Appender.Append(ref, lset, t, v)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// bucketLimitAppender limits the number of total appended samples in a batch.
|
||||||
|
type bucketLimitAppender struct {
|
||||||
|
storage.Appender
|
||||||
|
|
||||||
|
limit int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
var err error
|
||||||
|
if h != nil {
|
||||||
|
// Return with an early error if the histogram has too many buckets and the
|
||||||
|
// schema is not exponential, in which case we can't reduce the resolution.
|
||||||
|
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) {
|
||||||
|
return 0, errBucketLimit
|
||||||
|
}
|
||||||
|
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
|
||||||
|
if h.Schema <= histogram.ExponentialSchemaMin {
|
||||||
|
return 0, errBucketLimit
|
||||||
|
}
|
||||||
|
if err = h.ReduceResolution(h.Schema - 1); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if fh != nil {
|
||||||
|
// Return with an early error if the histogram has too many buckets and the
|
||||||
|
// schema is not exponential, in which case we can't reduce the resolution.
|
||||||
|
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) {
|
||||||
|
return 0, errBucketLimit
|
||||||
|
}
|
||||||
|
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
|
||||||
|
if fh.Schema <= histogram.ExponentialSchemaMin {
|
||||||
|
return 0, errBucketLimit
|
||||||
|
}
|
||||||
|
if err = fh.ReduceResolution(fh.Schema - 1); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type maxSchemaAppender struct {
|
||||||
|
storage.Appender
|
||||||
|
|
||||||
|
maxSchema int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
var err error
|
||||||
|
if h != nil {
|
||||||
|
if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema {
|
||||||
|
if err = h.ReduceResolution(app.maxSchema); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if fh != nil {
|
||||||
|
if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema {
|
||||||
|
if err = fh.ReduceResolution(app.maxSchema); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
260
scrape/scrape_append_test.go
Normal file
260
scrape/scrape_append_test.go
Normal file
@ -0,0 +1,260 @@
|
|||||||
|
// Copyright The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scrape
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/timestamp"
|
||||||
|
"github.com/prometheus/prometheus/util/teststorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBucketLimitAppender(t *testing.T) {
|
||||||
|
example := histogram.Histogram{
|
||||||
|
Schema: 0,
|
||||||
|
Count: 21,
|
||||||
|
Sum: 33,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
ZeroCount: 3,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{3, 0, 0},
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []int64{3, 0, 0},
|
||||||
|
}
|
||||||
|
|
||||||
|
bigGap := histogram.Histogram{
|
||||||
|
Schema: 0,
|
||||||
|
Count: 21,
|
||||||
|
Sum: 33,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
ZeroCount: 3,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 1, Length: 1}, // in (1, 2]
|
||||||
|
{Offset: 2, Length: 1}, // in (8, 16]
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{1, 0}, // 1, 1
|
||||||
|
}
|
||||||
|
|
||||||
|
customBuckets := histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 9,
|
||||||
|
Sum: 33,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{3, 0, 0},
|
||||||
|
CustomValues: []float64{1, 2, 3},
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
h histogram.Histogram
|
||||||
|
limit int
|
||||||
|
expectError bool
|
||||||
|
expectBucketCount int
|
||||||
|
expectSchema int32
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
h: example,
|
||||||
|
limit: 3,
|
||||||
|
expectError: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
h: example,
|
||||||
|
limit: 4,
|
||||||
|
expectError: false,
|
||||||
|
expectBucketCount: 4,
|
||||||
|
expectSchema: -1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
h: example,
|
||||||
|
limit: 10,
|
||||||
|
expectError: false,
|
||||||
|
expectBucketCount: 6,
|
||||||
|
expectSchema: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
h: bigGap,
|
||||||
|
limit: 1,
|
||||||
|
expectError: false,
|
||||||
|
expectBucketCount: 1,
|
||||||
|
expectSchema: -2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
h: customBuckets,
|
||||||
|
limit: 2,
|
||||||
|
expectError: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
h: customBuckets,
|
||||||
|
limit: 3,
|
||||||
|
expectError: false,
|
||||||
|
expectBucketCount: 3,
|
||||||
|
expectSchema: histogram.CustomBucketsSchema,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
appTest := teststorage.NewAppender()
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
for _, floatHisto := range []bool{true, false} {
|
||||||
|
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
||||||
|
app := &bucketLimitAppender{Appender: appTest.Appender(t.Context()), limit: c.limit}
|
||||||
|
ts := int64(10 * time.Minute / time.Millisecond)
|
||||||
|
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
|
||||||
|
var err error
|
||||||
|
if floatHisto {
|
||||||
|
fh := c.h.Copy().ToFloat(nil)
|
||||||
|
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
|
||||||
|
if c.expectError {
|
||||||
|
require.Error(t, err)
|
||||||
|
} else {
|
||||||
|
require.Equal(t, c.expectSchema, fh.Schema)
|
||||||
|
require.Equal(t, c.expectBucketCount, len(fh.NegativeBuckets)+len(fh.PositiveBuckets))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
h := c.h.Copy()
|
||||||
|
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
|
||||||
|
if c.expectError {
|
||||||
|
require.Error(t, err)
|
||||||
|
} else {
|
||||||
|
require.Equal(t, c.expectSchema, h.Schema)
|
||||||
|
require.Equal(t, c.expectBucketCount, len(h.NegativeBuckets)+len(h.PositiveBuckets))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMaxSchemaAppender(t *testing.T) {
|
||||||
|
example := histogram.Histogram{
|
||||||
|
Schema: 0,
|
||||||
|
Count: 21,
|
||||||
|
Sum: 33,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
ZeroCount: 3,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{3, 0, 0},
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []int64{3, 0, 0},
|
||||||
|
}
|
||||||
|
|
||||||
|
customBuckets := histogram.Histogram{
|
||||||
|
Schema: histogram.CustomBucketsSchema,
|
||||||
|
Count: 9,
|
||||||
|
Sum: 33,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{3, 0, 0},
|
||||||
|
CustomValues: []float64{1, 2, 3},
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
h histogram.Histogram
|
||||||
|
maxSchema int32
|
||||||
|
expectSchema int32
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
h: example,
|
||||||
|
maxSchema: -1,
|
||||||
|
expectSchema: -1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
h: example,
|
||||||
|
maxSchema: 0,
|
||||||
|
expectSchema: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
h: customBuckets,
|
||||||
|
maxSchema: -1,
|
||||||
|
expectSchema: histogram.CustomBucketsSchema,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
appTest := teststorage.NewAppender()
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
for _, floatHisto := range []bool{true, false} {
|
||||||
|
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
||||||
|
app := &maxSchemaAppender{Appender: appTest.Appender(t.Context()), maxSchema: c.maxSchema}
|
||||||
|
ts := int64(10 * time.Minute / time.Millisecond)
|
||||||
|
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
|
||||||
|
var err error
|
||||||
|
if floatHisto {
|
||||||
|
fh := c.h.Copy().ToFloat(nil)
|
||||||
|
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
|
||||||
|
require.Equal(t, c.expectSchema, fh.Schema)
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
h := c.h.Copy()
|
||||||
|
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
|
||||||
|
require.Equal(t, c.expectSchema, h.Schema)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test sample_limit when a scrape contains Native Histograms.
|
||||||
|
func TestAppendWithSampleLimitAndNativeHistogram(t *testing.T) {
|
||||||
|
appTest := teststorage.NewAppender()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
app := appenderWithLimits(appTest.Appender(t.Context()), 2, 0, 0)
|
||||||
|
|
||||||
|
// sample_limit is set to 2, so first two scrapes should work
|
||||||
|
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "foo"), timestamp.FromTime(now), 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Second sample, should be ok.
|
||||||
|
_, err = app.AppendHistogram(
|
||||||
|
0,
|
||||||
|
labels.FromStrings(model.MetricNameLabel, "my_histogram1"),
|
||||||
|
timestamp.FromTime(now),
|
||||||
|
&histogram.Histogram{},
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// This is third sample with sample_limit=2, it should trigger errSampleLimit.
|
||||||
|
_, err = app.AppendHistogram(
|
||||||
|
0,
|
||||||
|
labels.FromStrings(model.MetricNameLabel, "my_histogram2"),
|
||||||
|
timestamp.FromTime(now),
|
||||||
|
&histogram.Histogram{},
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
require.ErrorIs(t, err, errSampleLimit)
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
136
scrape/target.go
136
scrape/target.go
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2013 The Prometheus Authors
|
// Copyright The Prometheus Authors
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
@ -26,11 +26,8 @@ import (
|
|||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/relabel"
|
"github.com/prometheus/prometheus/model/relabel"
|
||||||
"github.com/prometheus/prometheus/model/value"
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TargetHealth describes the health state of a target.
|
// TargetHealth describes the health state of a target.
|
||||||
@ -323,137 +320,6 @@ var (
|
|||||||
errBucketLimit = errors.New("histogram bucket limit exceeded")
|
errBucketLimit = errors.New("histogram bucket limit exceeded")
|
||||||
)
|
)
|
||||||
|
|
||||||
// limitAppender limits the number of total appended samples in a batch.
|
|
||||||
type limitAppender struct {
|
|
||||||
storage.Appender
|
|
||||||
|
|
||||||
limit int
|
|
||||||
i int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
|
||||||
// Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero).
|
|
||||||
// This ensures that if a series is already in TSDB then we always write the marker.
|
|
||||||
if ref == 0 || !value.IsStaleNaN(v) {
|
|
||||||
app.i++
|
|
||||||
if app.i > app.limit {
|
|
||||||
return 0, errSampleLimit
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ref, err := app.Appender.Append(ref, lset, t, v)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *limitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
|
||||||
// Bypass sample_limit checks only if we have a staleness marker for a known series (ref value is non-zero).
|
|
||||||
// This ensures that if a series is already in TSDB then we always write the marker.
|
|
||||||
if ref == 0 || (h != nil && !value.IsStaleNaN(h.Sum)) || (fh != nil && !value.IsStaleNaN(fh.Sum)) {
|
|
||||||
app.i++
|
|
||||||
if app.i > app.limit {
|
|
||||||
return 0, errSampleLimit
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type timeLimitAppender struct {
|
|
||||||
storage.Appender
|
|
||||||
|
|
||||||
maxTime int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
|
||||||
if t > app.maxTime {
|
|
||||||
return 0, storage.ErrOutOfBounds
|
|
||||||
}
|
|
||||||
|
|
||||||
ref, err := app.Appender.Append(ref, lset, t, v)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// bucketLimitAppender limits the number of total appended samples in a batch.
|
|
||||||
type bucketLimitAppender struct {
|
|
||||||
storage.Appender
|
|
||||||
|
|
||||||
limit int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
|
||||||
var err error
|
|
||||||
if h != nil {
|
|
||||||
// Return with an early error if the histogram has too many buckets and the
|
|
||||||
// schema is not exponential, in which case we can't reduce the resolution.
|
|
||||||
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) {
|
|
||||||
return 0, errBucketLimit
|
|
||||||
}
|
|
||||||
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
|
|
||||||
if h.Schema <= histogram.ExponentialSchemaMin {
|
|
||||||
return 0, errBucketLimit
|
|
||||||
}
|
|
||||||
if err = h.ReduceResolution(h.Schema - 1); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if fh != nil {
|
|
||||||
// Return with an early error if the histogram has too many buckets and the
|
|
||||||
// schema is not exponential, in which case we can't reduce the resolution.
|
|
||||||
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) {
|
|
||||||
return 0, errBucketLimit
|
|
||||||
}
|
|
||||||
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
|
|
||||||
if fh.Schema <= histogram.ExponentialSchemaMin {
|
|
||||||
return 0, errBucketLimit
|
|
||||||
}
|
|
||||||
if err = fh.ReduceResolution(fh.Schema - 1); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type maxSchemaAppender struct {
|
|
||||||
storage.Appender
|
|
||||||
|
|
||||||
maxSchema int32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
|
||||||
var err error
|
|
||||||
if h != nil {
|
|
||||||
if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema {
|
|
||||||
if err = h.ReduceResolution(app.maxSchema); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if fh != nil {
|
|
||||||
if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema {
|
|
||||||
if err = fh.ReduceResolution(app.maxSchema); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return ref, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// PopulateDiscoveredLabels sets base labels on lb from target and group labels and scrape configuration, before relabeling.
|
// PopulateDiscoveredLabels sets base labels on lb from target and group labels and scrape configuration, before relabeling.
|
||||||
func PopulateDiscoveredLabels(lb *labels.Builder, cfg *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) {
|
func PopulateDiscoveredLabels(lb *labels.Builder, cfg *config.ScrapeConfig, tLabels, tgLabels model.LabelSet) {
|
||||||
lb.Reset(labels.EmptyLabels())
|
lb.Reset(labels.EmptyLabels())
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2013 The Prometheus Authors
|
// Copyright The Prometheus Authors
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
// You may obtain a copy of the License at
|
// You may obtain a copy of the License at
|
||||||
@ -14,7 +14,6 @@
|
|||||||
package scrape
|
package scrape
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -33,10 +32,7 @@ import (
|
|||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/timestamp"
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -522,240 +518,3 @@ scrape_configs:
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBucketLimitAppender(t *testing.T) {
|
|
||||||
example := histogram.Histogram{
|
|
||||||
Schema: 0,
|
|
||||||
Count: 21,
|
|
||||||
Sum: 33,
|
|
||||||
ZeroThreshold: 0.001,
|
|
||||||
ZeroCount: 3,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
PositiveBuckets: []int64{3, 0, 0},
|
|
||||||
NegativeSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
NegativeBuckets: []int64{3, 0, 0},
|
|
||||||
}
|
|
||||||
|
|
||||||
bigGap := histogram.Histogram{
|
|
||||||
Schema: 0,
|
|
||||||
Count: 21,
|
|
||||||
Sum: 33,
|
|
||||||
ZeroThreshold: 0.001,
|
|
||||||
ZeroCount: 3,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 1, Length: 1}, // in (1, 2]
|
|
||||||
{Offset: 2, Length: 1}, // in (8, 16]
|
|
||||||
},
|
|
||||||
PositiveBuckets: []int64{1, 0}, // 1, 1
|
|
||||||
}
|
|
||||||
|
|
||||||
customBuckets := histogram.Histogram{
|
|
||||||
Schema: histogram.CustomBucketsSchema,
|
|
||||||
Count: 9,
|
|
||||||
Sum: 33,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
PositiveBuckets: []int64{3, 0, 0},
|
|
||||||
CustomValues: []float64{1, 2, 3},
|
|
||||||
}
|
|
||||||
|
|
||||||
cases := []struct {
|
|
||||||
h histogram.Histogram
|
|
||||||
limit int
|
|
||||||
expectError bool
|
|
||||||
expectBucketCount int
|
|
||||||
expectSchema int32
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
h: example,
|
|
||||||
limit: 3,
|
|
||||||
expectError: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
h: example,
|
|
||||||
limit: 4,
|
|
||||||
expectError: false,
|
|
||||||
expectBucketCount: 4,
|
|
||||||
expectSchema: -1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
h: example,
|
|
||||||
limit: 10,
|
|
||||||
expectError: false,
|
|
||||||
expectBucketCount: 6,
|
|
||||||
expectSchema: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
h: bigGap,
|
|
||||||
limit: 1,
|
|
||||||
expectError: false,
|
|
||||||
expectBucketCount: 1,
|
|
||||||
expectSchema: -2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
h: customBuckets,
|
|
||||||
limit: 2,
|
|
||||||
expectError: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
h: customBuckets,
|
|
||||||
limit: 3,
|
|
||||||
expectError: false,
|
|
||||||
expectBucketCount: 3,
|
|
||||||
expectSchema: histogram.CustomBucketsSchema,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
resApp := &collectResultAppender{}
|
|
||||||
|
|
||||||
for _, c := range cases {
|
|
||||||
for _, floatHisto := range []bool{true, false} {
|
|
||||||
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
|
||||||
app := &bucketLimitAppender{Appender: resApp, limit: c.limit}
|
|
||||||
ts := int64(10 * time.Minute / time.Millisecond)
|
|
||||||
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
|
|
||||||
var err error
|
|
||||||
if floatHisto {
|
|
||||||
fh := c.h.Copy().ToFloat(nil)
|
|
||||||
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
|
|
||||||
if c.expectError {
|
|
||||||
require.Error(t, err)
|
|
||||||
} else {
|
|
||||||
require.Equal(t, c.expectSchema, fh.Schema)
|
|
||||||
require.Equal(t, c.expectBucketCount, len(fh.NegativeBuckets)+len(fh.PositiveBuckets))
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
h := c.h.Copy()
|
|
||||||
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
|
|
||||||
if c.expectError {
|
|
||||||
require.Error(t, err)
|
|
||||||
} else {
|
|
||||||
require.Equal(t, c.expectSchema, h.Schema)
|
|
||||||
require.Equal(t, c.expectBucketCount, len(h.NegativeBuckets)+len(h.PositiveBuckets))
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
require.NoError(t, app.Commit())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMaxSchemaAppender(t *testing.T) {
|
|
||||||
example := histogram.Histogram{
|
|
||||||
Schema: 0,
|
|
||||||
Count: 21,
|
|
||||||
Sum: 33,
|
|
||||||
ZeroThreshold: 0.001,
|
|
||||||
ZeroCount: 3,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
PositiveBuckets: []int64{3, 0, 0},
|
|
||||||
NegativeSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
NegativeBuckets: []int64{3, 0, 0},
|
|
||||||
}
|
|
||||||
|
|
||||||
customBuckets := histogram.Histogram{
|
|
||||||
Schema: histogram.CustomBucketsSchema,
|
|
||||||
Count: 9,
|
|
||||||
Sum: 33,
|
|
||||||
PositiveSpans: []histogram.Span{
|
|
||||||
{Offset: 0, Length: 3},
|
|
||||||
},
|
|
||||||
PositiveBuckets: []int64{3, 0, 0},
|
|
||||||
CustomValues: []float64{1, 2, 3},
|
|
||||||
}
|
|
||||||
|
|
||||||
cases := []struct {
|
|
||||||
h histogram.Histogram
|
|
||||||
maxSchema int32
|
|
||||||
expectSchema int32
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
h: example,
|
|
||||||
maxSchema: -1,
|
|
||||||
expectSchema: -1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
h: example,
|
|
||||||
maxSchema: 0,
|
|
||||||
expectSchema: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
h: customBuckets,
|
|
||||||
maxSchema: -1,
|
|
||||||
expectSchema: histogram.CustomBucketsSchema,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
resApp := &collectResultAppender{}
|
|
||||||
|
|
||||||
for _, c := range cases {
|
|
||||||
for _, floatHisto := range []bool{true, false} {
|
|
||||||
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
|
|
||||||
app := &maxSchemaAppender{Appender: resApp, maxSchema: c.maxSchema}
|
|
||||||
ts := int64(10 * time.Minute / time.Millisecond)
|
|
||||||
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
|
|
||||||
var err error
|
|
||||||
if floatHisto {
|
|
||||||
fh := c.h.Copy().ToFloat(nil)
|
|
||||||
_, err = app.AppendHistogram(0, lbls, ts, nil, fh)
|
|
||||||
require.Equal(t, c.expectSchema, fh.Schema)
|
|
||||||
require.NoError(t, err)
|
|
||||||
} else {
|
|
||||||
h := c.h.Copy()
|
|
||||||
_, err = app.AppendHistogram(0, lbls, ts, h, nil)
|
|
||||||
require.Equal(t, c.expectSchema, h.Schema)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
require.NoError(t, app.Commit())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test sample_limit when a scrape containst Native Histograms.
|
|
||||||
func TestAppendWithSampleLimitAndNativeHistogram(t *testing.T) {
|
|
||||||
const sampleLimit = 2
|
|
||||||
resApp := &collectResultAppender{}
|
|
||||||
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender {
|
|
||||||
return resApp
|
|
||||||
}, 0)
|
|
||||||
sl.sampleLimit = sampleLimit
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
app := appender(sl.appender(context.Background()), sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
|
|
||||||
|
|
||||||
// sample_limit is set to 2, so first two scrapes should work
|
|
||||||
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "foo"), timestamp.FromTime(now), 1)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Second sample, should be ok.
|
|
||||||
_, err = app.AppendHistogram(
|
|
||||||
0,
|
|
||||||
labels.FromStrings(model.MetricNameLabel, "my_histogram1"),
|
|
||||||
timestamp.FromTime(now),
|
|
||||||
&histogram.Histogram{},
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// This is third sample with sample_limit=2, it should trigger errSampleLimit.
|
|
||||||
_, err = app.AppendHistogram(
|
|
||||||
0,
|
|
||||||
labels.FromStrings(model.MetricNameLabel, "my_histogram2"),
|
|
||||||
timestamp.FromTime(now),
|
|
||||||
&histogram.Histogram{},
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
require.ErrorIs(t, err, errSampleLimit)
|
|
||||||
}
|
|
||||||
|
|||||||
346
util/teststorage/appender.go
Normal file
346
util/teststorage/appender.go
Normal file
@ -0,0 +1,346 @@
|
|||||||
|
// Copyright The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package teststorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sample represents test, combined sample for mocking storage.AppenderV2.
|
||||||
|
type Sample struct {
|
||||||
|
MF string
|
||||||
|
L labels.Labels
|
||||||
|
M metadata.Metadata
|
||||||
|
ST, T int64
|
||||||
|
V float64
|
||||||
|
H *histogram.Histogram
|
||||||
|
FH *histogram.FloatHistogram
|
||||||
|
ES []exemplar.Exemplar
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Sample) String() string {
|
||||||
|
b := bytes.Buffer{}
|
||||||
|
if s.M.Help != "" {
|
||||||
|
_, _ = fmt.Fprintf(&b, "HELP %s\n", s.M.Help)
|
||||||
|
}
|
||||||
|
if s.M.Type != model.MetricTypeUnknown && s.M.Type != "" {
|
||||||
|
_, _ = fmt.Fprintf(&b, "type@%s ", s.M.Type)
|
||||||
|
}
|
||||||
|
if s.M.Unit != "" {
|
||||||
|
_, _ = fmt.Fprintf(&b, "unit@%s ", s.M.Unit)
|
||||||
|
}
|
||||||
|
h := ""
|
||||||
|
if s.H != nil {
|
||||||
|
h = s.H.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
fh := ""
|
||||||
|
if s.FH != nil {
|
||||||
|
fh = s.FH.String()
|
||||||
|
}
|
||||||
|
_, _ = fmt.Fprintf(&b, "%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T)
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Sample) exemplarsEqual(other []exemplar.Exemplar) bool {
|
||||||
|
if len(s.ES) != len(other) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i := range s.ES {
|
||||||
|
if !s.ES[i].Equals(other[i]) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Sample) Equal(other Sample) bool {
|
||||||
|
return strings.Compare(s.MF, other.MF) == 0 &&
|
||||||
|
labels.Equal(s.L, other.L) &&
|
||||||
|
s.M.Equals(other.M) &&
|
||||||
|
s.ST == other.ST &&
|
||||||
|
s.T == other.T &&
|
||||||
|
math.Float64bits(s.V) == math.Float64bits(s.V) && // Compare Float64bits so NaN values which are exactly the same will compare equal.
|
||||||
|
s.H.Equals(other.H) &&
|
||||||
|
s.FH.Equals(other.FH) &&
|
||||||
|
s.exemplarsEqual(other.ES)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Appender is a storage.Appender mock.
|
||||||
|
// It allows:
|
||||||
|
// * recording all samples that were added through the appender.
|
||||||
|
// * optionally backed by another appender it writes samples through (Next).
|
||||||
|
// * optionally runs another appender before result recording e.g. to simulate chained validation (Prev)
|
||||||
|
type Appender struct {
|
||||||
|
Prev storage.Appendable // Optional appender to run before the result collection.
|
||||||
|
Next storage.Appendable // Optional appender to run after results are collected (e.g. TestStorage).
|
||||||
|
|
||||||
|
AppendErr error // Inject appender error on every Append, AppendHistogram and ST zero calls.
|
||||||
|
AppendExemplarsError error // Inject exemplar error.
|
||||||
|
CommitErr error // Inject commit error.
|
||||||
|
|
||||||
|
mtx sync.Mutex // mutex for result writes and ResultSamplesGreaterThan read.
|
||||||
|
|
||||||
|
// Recorded results.
|
||||||
|
PendingSamples []Sample
|
||||||
|
ResultSamples []Sample
|
||||||
|
RolledbackSamples []Sample
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Appender) ResultReset() {
|
||||||
|
a.PendingSamples = a.PendingSamples[:0]
|
||||||
|
a.ResultSamples = a.ResultSamples[:0]
|
||||||
|
a.RolledbackSamples = a.RolledbackSamples[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Appender) ResultSamplesGreaterThan(than int) bool {
|
||||||
|
a.mtx.Lock()
|
||||||
|
defer a.mtx.Unlock()
|
||||||
|
return len(a.ResultSamples) > than
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResultMetadata returns ResultSamples with samples only containing L and M.
|
||||||
|
// This is for compatibility with tests that only focus on metadata.
|
||||||
|
//
|
||||||
|
// TODO: Rewrite tests to test metadata on ResultSamples instead.
|
||||||
|
func (a *Appender) ResultMetadata() []Sample {
|
||||||
|
var ret []Sample
|
||||||
|
for _, s := range a.ResultSamples {
|
||||||
|
ret = append(ret, Sample{L: s.L, M: s.M})
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Appender) String() string {
|
||||||
|
var sb strings.Builder
|
||||||
|
sb.WriteString("committed:\n")
|
||||||
|
for _, s := range a.ResultSamples {
|
||||||
|
sb.WriteString("\n")
|
||||||
|
sb.WriteString(s.String())
|
||||||
|
}
|
||||||
|
sb.WriteString("pending:\n")
|
||||||
|
for _, s := range a.PendingSamples {
|
||||||
|
sb.WriteString("\n")
|
||||||
|
sb.WriteString(s.String())
|
||||||
|
}
|
||||||
|
sb.WriteString("rolledback:\n")
|
||||||
|
for _, s := range a.RolledbackSamples {
|
||||||
|
sb.WriteString("\n")
|
||||||
|
sb.WriteString(s.String())
|
||||||
|
}
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAppender() *Appender {
|
||||||
|
return &Appender{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type appender struct {
|
||||||
|
prev storage.Appender
|
||||||
|
next storage.Appender
|
||||||
|
|
||||||
|
*Appender
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Appender) Appender(ctx context.Context) storage.Appender {
|
||||||
|
ret := &appender{Appender: a}
|
||||||
|
if a.Prev != nil {
|
||||||
|
ret.prev = a.Prev.Appender(ctx)
|
||||||
|
}
|
||||||
|
if a.Next != nil {
|
||||||
|
ret.next = a.Next.Appender(ctx)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) SetOptions(*storage.AppendOptions) {}
|
||||||
|
|
||||||
|
func (a *appender) Append(ref storage.SeriesRef, ls labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||||
|
if a.Prev != nil {
|
||||||
|
if _, err := a.prev.Append(ref, ls, t, v); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.AppendErr != nil {
|
||||||
|
return 0, a.AppendErr
|
||||||
|
}
|
||||||
|
|
||||||
|
a.mtx.Lock()
|
||||||
|
a.PendingSamples = append(a.PendingSamples, Sample{L: ls, T: t, V: v})
|
||||||
|
a.mtx.Unlock()
|
||||||
|
|
||||||
|
if a.next != nil {
|
||||||
|
return a.next.Append(ref, ls, t, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ref == 0 {
|
||||||
|
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
|
||||||
|
ref = storage.SeriesRef(ls.Hash())
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) AppendHistogram(ref storage.SeriesRef, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
if a.Prev != nil {
|
||||||
|
if _, err := a.prev.AppendHistogram(ref, ls, t, h, fh); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.AppendErr != nil {
|
||||||
|
return 0, a.AppendErr
|
||||||
|
}
|
||||||
|
|
||||||
|
a.mtx.Lock()
|
||||||
|
a.PendingSamples = append(a.PendingSamples, Sample{L: ls, T: t, H: h, FH: fh})
|
||||||
|
a.mtx.Unlock()
|
||||||
|
|
||||||
|
if a.next != nil {
|
||||||
|
return a.next.AppendHistogram(ref, ls, t, h, fh)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ref == 0 {
|
||||||
|
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
|
||||||
|
ref = storage.SeriesRef(ls.Hash())
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||||
|
if a.Prev != nil {
|
||||||
|
if _, err := a.prev.AppendExemplar(ref, l, e); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.AppendExemplarsError != nil {
|
||||||
|
return 0, a.AppendExemplarsError
|
||||||
|
}
|
||||||
|
|
||||||
|
a.mtx.Lock()
|
||||||
|
// NOTE(bwplotka): Eventually exemplar has to be attached to a series and soon
|
||||||
|
// the AppenderV2 will guarantee that for TSDB. Assume this from the mock perspective
|
||||||
|
// with the naive attaching. See: https://github.com/prometheus/prometheus/pull/17610
|
||||||
|
i := 0
|
||||||
|
for range len(a.PendingSamples) {
|
||||||
|
if ref == storage.SeriesRef(a.PendingSamples[i].L.Hash()) {
|
||||||
|
a.PendingSamples[i].ES = append(a.PendingSamples[i].ES, e)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
a.mtx.Unlock()
|
||||||
|
if i >= len(a.PendingSamples) {
|
||||||
|
return 0, fmt.Errorf("teststorage.appender: exemplar appender without series; ref %v; l %v; exemplar: %v", ref, l, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.next != nil {
|
||||||
|
return a.next.AppendExemplar(ref, l, e)
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64) (storage.SeriesRef, error) {
|
||||||
|
return a.Append(ref, l, st, 0.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
if h != nil {
|
||||||
|
return a.AppendHistogram(ref, l, st, &histogram.Histogram{}, nil)
|
||||||
|
}
|
||||||
|
return a.AppendHistogram(ref, l, st, nil, &histogram.FloatHistogram{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
|
if a.Prev != nil {
|
||||||
|
if _, err := a.prev.UpdateMetadata(ref, l, m); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.mtx.Lock()
|
||||||
|
// NOTE(bwplotka): Eventually exemplar has to be attached to a series and soon
|
||||||
|
// the AppenderV2 will guarantee that for TSDB. Assume this from the mock perspective
|
||||||
|
// with the naive attaching. See: https://github.com/prometheus/prometheus/pull/17610
|
||||||
|
i := 0
|
||||||
|
for range len(a.PendingSamples) {
|
||||||
|
if ref == storage.SeriesRef(a.PendingSamples[i].L.Hash()) {
|
||||||
|
a.PendingSamples[i].M = m
|
||||||
|
break
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
a.mtx.Unlock()
|
||||||
|
if i >= len(a.PendingSamples) {
|
||||||
|
return 0, fmt.Errorf("teststorage.appender: metadata update without series; ref %v; l %v; m: %v", ref, l, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.next != nil {
|
||||||
|
return a.next.UpdateMetadata(ref, l, m)
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) Commit() error {
|
||||||
|
if a.Prev != nil {
|
||||||
|
if err := a.prev.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.CommitErr != nil {
|
||||||
|
return a.CommitErr
|
||||||
|
}
|
||||||
|
|
||||||
|
a.mtx.Lock()
|
||||||
|
a.ResultSamples = append(a.ResultSamples, a.PendingSamples...)
|
||||||
|
a.PendingSamples = a.PendingSamples[:0]
|
||||||
|
a.mtx.Unlock()
|
||||||
|
|
||||||
|
if a.next != nil {
|
||||||
|
return a.next.Commit()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appender) Rollback() error {
|
||||||
|
if a.prev != nil {
|
||||||
|
if err := a.prev.Rollback(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.mtx.Lock()
|
||||||
|
a.RolledbackSamples = append(a.RolledbackSamples, a.PendingSamples...)
|
||||||
|
a.PendingSamples = a.PendingSamples[:0]
|
||||||
|
a.mtx.Unlock()
|
||||||
|
|
||||||
|
if a.next != nil {
|
||||||
|
return a.next.Rollback()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
59
util/teststorage/appender_test.go
Normal file
59
util/teststorage/appender_test.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
// Copyright The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package teststorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestSample_RequireEqual ensures standard testutil.RequireEqual is enough for comparisons.
|
||||||
|
// This is thanks to the fact metadata has now Equals method.
|
||||||
|
// TODO(bwplotka): Adding (*labels.Labels) Equals(other *labels.Labels) would remove the
|
||||||
|
// need of testutil.RequireEqual fully. Do it.
|
||||||
|
func TestSample_RequireEqual(t *testing.T) {
|
||||||
|
a := []Sample{
|
||||||
|
{},
|
||||||
|
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||||
|
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
|
||||||
|
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||||
|
}
|
||||||
|
testutil.RequireEqual(t, a, a)
|
||||||
|
|
||||||
|
b1 := []Sample{
|
||||||
|
{},
|
||||||
|
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
|
||||||
|
{L: labels.FromStrings("__name__", "test_metric2_diff", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123}, // test_metric2_diff is different.
|
||||||
|
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
|
||||||
|
}
|
||||||
|
requireNotEqual(t, a, b1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireNotEqual(t testing.TB, a, b any) {
|
||||||
|
t.Helper()
|
||||||
|
if !cmp.Equal(a, b, cmp.Comparer(labels.Equal)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.Fail(t, fmt.Sprintf("Equal, but expected not: \n"+
|
||||||
|
"a: %s\n"+
|
||||||
|
"b: %s", a, b))
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user