ff: native-type-and-unit

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-03-15 21:26:57 +00:00
parent b0227d1f16
commit 6d732b3b1d
8 changed files with 743 additions and 78 deletions

View File

@ -280,6 +280,10 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "otlp-deltatocumulative":
c.web.ConvertOTLPDelta = true
logger.Info("Converting delta OTLP metrics to cumulative")
case "native-type-and-unit":
c.scrape.EnableNativeTypeAndUnit = true
logger.Info("Native type and unit enabled")
default:
logger.Warn("Unknown option for --enable-feature", "option", o)
}

108
model/series/series.go Normal file
View File

@ -0,0 +1,108 @@
package series
import (
"bytes"
"strconv"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
const (
TypeSelector = "__type__"
UnitSelector = "__unit__"
NameSelector = labels.MetricName
unitSep = '~'
typeSep = '.'
)
// Series represents identifiable portion of a metric and its labels.
// This is essentially typed version of labels.Labels where metric name
// unit and type are outside of labels.
//
// There are two modes of using Series.
// * Native: Labels does not have any of name, unit type information (specifically
// Labels does not have __name__). This is recommended more, gated by a
// "native-type-and-unit" feature flag.s
// * Compatibility: In this mode Name is empty and Labels include __name__ label.
type Series struct {
// Metric identity.
Name string // required.
Unit string // optional.
Type model.MetricType // required.
// Labels, so additional, optional attributes/dimensions.
Labels labels.Labels
}
// NewCompatibleSeries returns series with labels that behave like it used to
// (e.g., name in __name__ label and unit and type skipped).
//
// This is for ease of migration, compatible with the old code.
func NewCompatibleSeries(lset labels.Labels) Series {
return Series{Labels: lset}
}
// ToCompatibleLabels returns labels.Labels with the old semantics (e.g., name in __name__ label
// and unit and type skipped). This is for ease of migration, compatible with the old code.
func (s Series) ToCompatibleLabels() labels.Labels {
if s.Name == "" {
// If name is empty, it means it's compatibility mode already.
return s.Labels
}
b := labels.NewBuilder(s.Labels)
b.Set(labels.MetricName, s.Name)
return b.Labels()
}
func (s Series) Hash() uint64 {
var bytea [1024]byte // On stack to avoid memory allocation.
b := bytes.NewBuffer(bytea[:0])
b.WriteString(s.Name)
b.WriteByte(unitSep)
b.WriteString(s.Unit)
b.WriteByte(typeSep)
b.WriteString(string(s.Type))
b.Write(s.Labels.Bytes(b.AvailableBuffer()))
return xxhash.Sum64(b.Bytes())
}
func (s Series) String() string {
var bytea [1024]byte // On stack to avoid memory allocation while building the output.
b := bytes.NewBuffer(bytea[:0])
i := 0
if !model.LabelName(s.Name).IsValidLegacy() {
b.WriteByte('{')
b.Write(strconv.AppendQuote(b.AvailableBuffer(), s.Name))
i++
} else {
b.WriteString(s.Name)
b.WriteByte('{')
}
s.Labels.Range(func(l labels.Label) {
if i > 0 {
b.WriteByte(',')
b.WriteByte(' ')
}
if !model.LabelName(l.Name).IsValidLegacy() {
b.Write(strconv.AppendQuote(b.AvailableBuffer(), l.Name))
} else {
b.WriteString(l.Name)
}
b.WriteByte('=')
b.Write(strconv.AppendQuote(b.AvailableBuffer(), l.Value))
i++
})
b.WriteByte('}')
if len(s.Unit) > 0 {
b.WriteByte(unitSep)
b.WriteString(s.Unit)
}
b.WriteByte(typeSep)
b.WriteString(string(s.Type))
return b.String()
}

View File

@ -0,0 +1,29 @@
package series
import (
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
)
func TestSeries_String(t *testing.T) {
s := Series{
Name: "my_metric_seconds_total",
Unit: "seconds",
Type: model.MetricTypeCounter,
Labels: labels.FromStrings("foo", "bar"),
}
require.Equal(t, "my_metric_seconds_total{foo=\"bar\"}~seconds.counter", s.String())
// UTF-8
s2 := Series{
Name: "my.metric.seconds_total",
Unit: "seconds",
Type: model.MetricTypeCounter,
Labels: labels.FromStrings("foo", "bar"),
}
require.Equal(t, "{\"my.metric.seconds_total\", foo=\"bar\"}~seconds.counter", s2.String())
}

View File

@ -86,6 +86,10 @@ type Options struct {
EnableCreatedTimestampZeroIngestion bool
// Option to enable the ingestion of native histograms.
EnableNativeHistogramsIngestion bool
// EnableNativeTypeAndUnit enables storing type and unit (if present)
// in TSDB, unlocking cases mentioned in https://github.com/prometheus/proposals/pull/39.
// e.g. unit & type collision detection and selection.
EnableNativeTypeAndUnit bool
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption

View File

@ -37,6 +37,10 @@ import (
"github.com/prometheus/common/promslog"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/model/series"
storagev2 "github.com/prometheus/prometheus/storage/v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/exemplar"
@ -195,6 +199,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.convertClassicHistToNHCB,
options.EnableNativeHistogramsIngestion,
options.EnableCreatedTimestampZeroIngestion,
options.EnableNativeTypeAndUnit,
options.ExtraMetrics,
options.AppendMetadata,
opts.target,
@ -742,7 +747,37 @@ func appender(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int3
maxSchema: maxSchema,
}
}
return app
}
// appenderV2 returns an appender for ingested samples from the target.
func appenderV2(app storagev2.Appender, sampleLimit, bucketLimit int, maxSchema int32) storagev2.Appender {
app = &timeLimitAppender{
appV2: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
// The sampleLimit is applied after metrics are potentially dropped via relabeling.
if sampleLimit > 0 {
app = &limitAppender{
appV2: app,
limit: sampleLimit,
}
}
if bucketLimit > 0 {
app = &bucketLimitAppender{
appV2: app,
limit: bucketLimit,
}
}
if maxSchema < histogram.ExponentialSchemaMax {
app = &maxSchemaAppender{
appV2: app,
maxSchema: maxSchema,
}
}
return app
}
@ -885,7 +920,7 @@ type cacheEntry struct {
ref storage.SeriesRef
lastIter uint64
hash uint64
lset labels.Labels
se series.Series
}
type scrapeLoop struct {
@ -916,6 +951,7 @@ type scrapeLoop struct {
// Feature flagged options.
enableNativeHistogramIngestion bool
enableCTZeroIngestion bool
enableNativeUnitAndType bool
appender func(ctx context.Context) storage.Appender
symbolTable *labels.SymbolTable
@ -959,8 +995,8 @@ type scrapeCache struct {
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
seriesCur map[uint64]labels.Labels
seriesPrev map[uint64]labels.Labels
seriesCur map[uint64]series.Series
seriesPrev map[uint64]series.Series
// TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to
// avoid locking (using metadata API can block scraping).
@ -987,8 +1023,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache {
return &scrapeCache{
series: map[string]*cacheEntry{},
droppedSeries: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
seriesCur: map[uint64]series.Series{},
seriesPrev: map[uint64]series.Series{},
metadata: map[string]*metaEntry{},
metrics: metrics,
}
@ -1057,11 +1093,11 @@ func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) {
return e, true, alreadyScraped
}
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) {
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, se series.Series, hash uint64) {
if ref == 0 {
return
}
c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, se: se, hash: hash}
}
func (c *scrapeCache) addDropped(met []byte) {
@ -1077,11 +1113,11 @@ func (c *scrapeCache) getDropped(met []byte) bool {
return ok
}
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
c.seriesCur[hash] = lset
func (c *scrapeCache) trackStaleness(hash uint64, se series.Series) {
c.seriesCur[hash] = se
}
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
func (c *scrapeCache) forEachStale(f func(series.Series) bool) {
for h, lset := range c.seriesPrev {
if _, ok := c.seriesCur[h]; !ok {
if !f(lset) {
@ -1223,6 +1259,7 @@ func newScrapeLoop(ctx context.Context,
convertClassicHistToNHCB bool,
enableNativeHistogramIngestion bool,
enableCTZeroIngestion bool,
enableNativeUnitAndType bool,
reportExtraMetrics bool,
appendMetadataToWAL bool,
target *Target,
@ -1279,6 +1316,7 @@ func newScrapeLoop(ctx context.Context,
convertClassicHistToNHCB: convertClassicHistToNHCB,
enableNativeHistogramIngestion: enableNativeHistogramIngestion,
enableCTZeroIngestion: enableCTZeroIngestion,
enableNativeUnitAndType: enableNativeUnitAndType,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
@ -1577,11 +1615,26 @@ type appendErrors struct {
}
// Update the stale markers.
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) {
sl.cache.forEachStale(func(lset labels.Labels) bool {
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, appV2 storagev2.Appender, defTime int64) (err error) {
if appV2 != nil {
s := storagev2.StaleSample(defTime)
sl.cache.forEachStale(func(se series.Series) bool {
// Series no longer exposed, mark it stale.
_, err = appV2.AppendSample(0, se, s, &storagev2.AppendOptions{DiscardOutOfOrder: true})
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
return err == nil
})
return nil
}
sl.cache.forEachStale(func(se series.Series) bool {
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
_, err = app.Append(0, se.ToCompatibleLabels(), defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
@ -1591,15 +1644,57 @@ func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (e
}
return err == nil
})
return
return nil
}
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
if sl.enableNativeUnitAndType {
if sl.enableCTZeroIngestion {
return 0, 0, 0, errors.New("native-type-and-unit feature flag can't be used when created-timestamp-zero-ingestion is enabled")
}
if sl.appendMetadataToWAL {
return 0, 0, 0, errors.New("native-type-and-unit feature flag can't be used when metadata-in-wal is enabled")
}
appV2, ok := app.(storagev2.Appender)
if !ok {
return 0, 0, 0, errors.New("unexpected bug; appender should have storage V2 interface implemented")
}
return sl.appendV2(appV2, b, contentType, ts)
}
return sl.appendV1(app, b, contentType, ts)
}
func (sl *scrapeLoop) attachExemplars(p textparse.Parser, sa *storagev2.Sample) {
var e exemplar.Exemplar
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
if !e.HasTs {
if sa.IsHistogram() {
// We drop exemplars for native histograms if they don't have a timestamp.
// Missing timestamps are deliberately not supported as we want to start
// enforcing timestamps for exemplars as otherwise proper deduplication
// is inefficient and purely based on heuristics: we cannot distinguish
// between repeated exemplars and new instances with the same values.
// This is done silently without logs as it is not an error but out of spec.
// This does not affect classic histograms so that behaviour is unchanged.
e = exemplar.Exemplar{} // Reset for next time round loop.
continue
}
e.Ts = sa.T
}
sa.Exemplars = append(sa.Exemplars, e)
e = exemplar.Exemplar{} // Reset for next time round loop.
}
// Sort so that checking for duplicates / out of order is more efficient during validation.
slices.SortFunc(sa.Exemplars, exemplar.Compare)
}
func (sl *scrapeLoop) appendV1(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
defTime := timestamp.FromTime(ts)
if len(b) == 0 {
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
err = sl.updateStaleMarkers(app, defTime)
err = sl.updateStaleMarkers(app, nil, defTime)
sl.cache.iterDone(false)
return
}
@ -1629,7 +1724,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
lset labels.Labels // escapes to heap so hoisted out of loop
se series.Series // escapes to heap so hoisted out of loop
e exemplar.Exemplar // escapes to heap so hoisted out of loop
lastMeta *metaEntry
lastMFName []byte
@ -1711,33 +1806,34 @@ loop:
if seriesCached {
ref = ce.ref
lset = ce.lset
se = ce.se
hash = ce.hash
} else {
p.Labels(&lset)
hash = lset.Hash()
// In this mode series.Series only contains labels with __name__ inside.
p.Labels(&se.Labels)
hash = se.Hash()
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)
se.Labels = sl.sampleMutator(se.Labels)
// The label set may be set to empty to indicate dropping.
if lset.IsEmpty() {
if se.Labels.IsEmpty() {
sl.cache.addDropped(met)
continue
}
if !lset.Has(labels.MetricName) {
if !se.Labels.Has(labels.MetricName) {
err = errNameLabelMandatory
break loop
}
if !lset.IsValid(sl.validationScheme) {
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
if !se.Labels.IsValid(sl.validationScheme) {
err = fmt.Errorf("invalid metric name or label names: %s", se.Labels.String())
break loop
}
// If any label limits is exceeded the scrape should fail.
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
if err = verifyLabelLimits(se.Labels, sl.labelLimits); err != nil {
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
break loop
}
@ -1750,12 +1846,12 @@ loop:
if ctMs := p.CreatedTimestamp(); ctMs != 0 {
if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, ctMs, h, nil)
ref, err = app.AppendHistogramCTZeroSample(ref, se.Labels, t, ctMs, h, nil)
} else {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, ctMs, nil, fh)
ref, err = app.AppendHistogramCTZeroSample(ref, se.Labels, t, ctMs, nil, fh)
}
} else {
ref, err = app.AppendCTZeroSample(ref, lset, t, ctMs)
ref, err = app.AppendCTZeroSample(ref, se.Labels, t, ctMs)
}
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
@ -1767,18 +1863,18 @@ loop:
if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
ref, err = app.AppendHistogram(ref, se.Labels, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
ref, err = app.AppendHistogram(ref, se.Labels, t, nil, fh)
}
} else {
ref, err = app.Append(ref, lset, t, val)
ref, err = app.Append(ref, se.Labels, t, val)
}
}
if err == nil {
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
sl.cache.trackStaleness(ce.hash, ce.se)
}
}
@ -1793,9 +1889,9 @@ loop:
if !seriesCached {
if parsedTimestamp == nil || sl.trackTimestampsStaleness {
// Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset)
sl.cache.trackStaleness(hash, se)
}
sl.cache.addRef(met, ref, lset, hash)
sl.cache.addRef(met, ref, se, hash)
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}
@ -1829,7 +1925,7 @@ loop:
slices.SortFunc(exemplars, exemplar.Compare)
outOfOrderExemplars := 0
for _, e := range exemplars {
_, exemplarErr := app.AppendExemplar(ref, lset, e)
_, exemplarErr := app.AppendExemplar(ref, se.Labels, e)
switch {
case exemplarErr == nil:
// Do nothing.
@ -1854,8 +1950,8 @@ loop:
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
// However, optional TYPE etc metadata and broken OM text can break this, detect those cases here.
// TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. CT and NHCB parsing).
if isSeriesPartOfFamily(lset.Get(labels.MetricName), lastMFName, lastMeta.Type) {
if _, merr := app.UpdateMetadata(ref, lset, lastMeta.Metadata); merr != nil {
if isSeriesPartOfFamily(se.Labels.Get(labels.MetricName), lastMFName, lastMeta.Type) {
if _, merr := app.UpdateMetadata(ref, se.Labels, lastMeta.Metadata); merr != nil {
// No need to fail the scrape on errors appending metadata.
sl.l.Debug("Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", lastMeta.Metadata), "err", merr)
}
@ -1890,11 +1986,262 @@ loop:
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
}
if err == nil {
err = sl.updateStaleMarkers(app, defTime)
err = sl.updateStaleMarkers(app, nil, defTime)
}
return
}
// appendV2 appends sample when the new storagev2 interface is enabled and available.
func (sl *scrapeLoop) appendV2(app storagev2.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
defTime := timestamp.FromTime(ts)
if len(b) == 0 {
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
err = sl.updateStaleMarkers(nil, app, defTime)
sl.cache.iterDone(false)
return
}
p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable)
if p == nil {
sl.l.Error(
"Failed to determine correct type of scrape target.",
"content_type", contentType,
"fallback_media_type", sl.fallbackScrapeProtocol,
"err", err,
)
return
}
if sl.convertClassicHistToNHCB {
p = textparse.NewNHCBParser(p, sl.symbolTable, sl.alwaysScrapeClassicHist)
}
if err != nil {
sl.l.Debug(
"Invalid content type on scrape, using fallback setting.",
"content_type", contentType,
"fallback_media_type", sl.fallbackScrapeProtocol,
"err", err,
)
}
var (
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
se series.Series // escapes to heap so hoisted out of loop
lastMeta *metaEntry
lastMFName []byte
exemplars = make([]exemplar.Exemplar, 0, 1)
)
// Take an appender with limits.
app = appenderV2(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
defer func() {
if err != nil {
return
}
// Flush and swap the cache as the scrape was non-empty.
sl.cache.iterDone(true)
}()
loop:
for {
var (
et textparse.Entry
sampleAdded, isHistogram bool
met []byte
parsedTimestamp *int64
)
if et, err = p.Next(); err != nil {
if errors.Is(err, io.EOF) {
err = nil
}
break
}
switch et {
// TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram()
// otherwise we can expose metadata without series on metadata API.
case textparse.EntryType:
// TODO(bwplotka): Build meta entry directly instead of locking and updating the map. This will
// allow to properly update metadata when e.g unit was added, then removed;
lastMFName, lastMeta = sl.cache.setType(p.Type())
continue
case textparse.EntryHelp:
lastMFName, lastMeta = sl.cache.setHelp(p.Help())
continue
case textparse.EntryUnit:
lastMFName, lastMeta = sl.cache.setUnit(p.Unit())
continue
case textparse.EntryComment:
continue
case textparse.EntryHistogram:
isHistogram = true
default:
}
total++
// Reset sa.
sa := storagev2.Sample{T: defTime, Exemplars: exemplars[:0]}
if isHistogram {
met, parsedTimestamp, sa.H, sa.FH = p.Histogram()
} else {
met, parsedTimestamp, sa.V = p.Series()
}
if !sl.honorTimestamps {
parsedTimestamp = nil
}
if parsedTimestamp != nil {
sa.T = *parsedTimestamp
}
if sl.cache.getDropped(met) {
continue
}
ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met)
var (
ref storage.SeriesRef
hash uint64
)
if seriesCached {
ref = ce.ref
se = ce.se
hash = ce.hash
} else {
// V2 means we do native identifiable metadata, which means name is native.
p.Labels(&se.Labels)
se.Name = se.Labels.Get(model.MetricNameLabel)
se.Labels = se.Labels.DropMetricName()
if lastMeta != nil {
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
// However, optional TYPE etc metadata and broken OM text can break this, detect those cases here.
// TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. CT and NHCB parsing).
if isSeriesPartOfFamily(se.Name, lastMFName, lastMeta.Type) {
se.Type = lastMeta.Type
se.Unit = lastMeta.Unit
}
}
hash = se.Hash()
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
se.Labels = sl.sampleMutator(se.Labels)
// The label set may be set to empty to indicate dropping.
if se.Labels.IsEmpty() {
sl.cache.addDropped(met)
continue
}
if len(se.Name) == 0 {
err = errNameLabelMandatory
break loop
}
if !se.Labels.IsValid(sl.validationScheme) {
err = fmt.Errorf("invalid metric name or label names: %s", se.Labels.String())
break loop
}
// If any label limits is exceeded the scrape should fail.
if err = verifyLabelLimits(se.Labels, sl.labelLimits); err != nil {
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
break loop
}
}
sl.attachExemplars(p, &sa)
// TODO(bwplotka): Add special AppendSample error structure that would give this data.
//outOfOrderExemplars := 0
//for _, e := range exemplars {
// _, exemplarErr := app.AppendExemplar(ref, lset, e)
// switch {
// case exemplarErr == nil:
// // Do nothing.
// case errors.Is(exemplarErr, storage.ErrOutOfOrderExemplar):
// outOfOrderExemplars++
// default:
// // Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
// sl.l.Debug("Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
// }
//}
//if outOfOrderExemplars > 0 && outOfOrderExemplars == len(exemplars) {
// // Only report out of order exemplars if all are out of order, otherwise this was a partial update
// // to some existing set of exemplars.
// appErrs.numExemplarOutOfOrder += outOfOrderExemplars
// sl.l.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", exemplars[len(exemplars)-1]))
// sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
//}
if seriesAlreadyScraped && parsedTimestamp == nil {
err = storage.ErrDuplicateSampleForTimestamp
} else if !sa.IsHistogram() || (sa.IsHistogram() && sl.enableNativeHistogramIngestion) {
ref, err = app.AppendSample(ref, se, sa, nil)
}
if err == nil {
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.se)
}
}
sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
sl.l.Debug("Unexpected error", "series", string(met), "err", err)
}
break loop
}
if !seriesCached {
if parsedTimestamp == nil || sl.trackTimestampsStaleness {
// Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, se)
}
sl.cache.addRef(met, ref, se, hash)
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}
}
// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabeling.
// We still report duplicated samples here since this number should be the exact number
// of time series exposed on a scrape after relabelling.
added++
}
if sampleLimitErr != nil {
if err == nil {
err = sampleLimitErr
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
sl.metrics.targetScrapeSampleLimit.Inc()
}
if bucketLimitErr != nil {
if err == nil {
err = bucketLimitErr // If sample limit is hit, that error takes precedence.
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
sl.metrics.targetScrapeNativeHistogramBucketLimit.Inc()
}
if appErrs.numOutOfOrder > 0 {
sl.l.Warn("Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
}
if appErrs.numDuplicates > 0 {
sl.l.Warn("Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
}
if appErrs.numOutOfBounds > 0 {
sl.l.Warn("Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
}
if appErrs.numExemplarOutOfOrder > 0 {
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
}
if err != nil {
return total, added, seriesAdded, err
}
return total, added, seriesAdded, sl.updateStaleMarkers(nil, app, defTime)
}
func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) bool {
mfNameStr := yoloString(mfName)
if !strings.HasPrefix(mName, mfNameStr) { // Fast path.
@ -2154,29 +2501,37 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t int64, v float64, b *labels.Builder) error {
if sl.enableNativeUnitAndType {
appV2, ok := app.(storagev2.Appender)
if !ok {
return errors.New("expected app to implement storagev2.Appender too, bug?")
}
return sl.addReportSampleV2(appV2, s, t, v, b)
}
ce, ok, _ := sl.cache.get(s.name)
var ref storage.SeriesRef
var lset labels.Labels
var se series.Series
if ok {
ref = ce.ref
lset = ce.lset
se = ce.se
} else {
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
// with scraped metrics in the cache.
// We have to drop it when building the actual metric.
b.Reset(labels.EmptyLabels())
b.Set(labels.MetricName, string(s.name[:len(s.name)-1]))
lset = sl.reportSampleMutator(b.Labels())
se.Labels = sl.reportSampleMutator(b.Labels())
}
ref, err := app.Append(ref, lset, t, v)
ref, err := app.Append(ref, se.ToCompatibleLabels(), t, v)
switch {
case err == nil:
if !ok {
sl.cache.addRef(s.name, ref, lset, lset.Hash())
sl.cache.addRef(s.name, ref, se, se.Hash())
// We only need to add metadata once a scrape target appears.
if sl.appendMetadataToWAL {
if _, merr := app.UpdateMetadata(ref, lset, s.Metadata); merr != nil {
if _, merr := app.UpdateMetadata(ref, se.ToCompatibleLabels(), s.Metadata); merr != nil {
sl.l.Debug("Error when appending metadata in addReportSample", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", s.Metadata), "err", merr)
}
}
@ -2191,6 +2546,40 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t in
}
}
func (sl *scrapeLoop) addReportSampleV2(app storagev2.Appender, s reportSample, t int64, v float64, b *labels.Builder) error {
ce, ok, _ := sl.cache.get(s.name)
var ref storage.SeriesRef
var se series.Series
if ok {
ref = ce.ref
se = ce.se
} else {
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
// with scraped metrics in the cache.
// We have to drop it when building the actual metric.
se.Name = string(s.name[:len(s.name)-1])
se.Type = s.Type
b.Reset(labels.EmptyLabels())
se.Labels = sl.reportSampleMutator(b.Labels())
}
ref, err := app.AppendSample(ref, se, storagev2.Sample{T: t, V: v}, nil)
switch {
case err == nil:
if !ok {
sl.cache.addRef(s.name, ref, se, se.Hash())
}
return nil
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not log here, as this is expected if a target goes away and comes back
// again with a new scrape loop.
return nil
default:
return err
}
}
// zeroConfig returns a new scrape config that only contains configuration items
// that alter metrics.
func zeroConfig(c *config.ScrapeConfig) *config.ScrapeConfig {

View File

@ -24,6 +24,9 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/series"
storagev2 "github.com/prometheus/prometheus/storage/v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
@ -326,11 +329,22 @@ var (
// limitAppender limits the number of total appended samples in a batch.
type limitAppender struct {
storage.Appender
appV2 storagev2.Appender
limit int
i int
}
func (app *limitAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) {
if !value.IsStaleNaN(sa.V) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit
}
}
return app.appV2.AppendSample(ref, se, sa, opts)
}
func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if !value.IsStaleNaN(v) {
app.i++
@ -347,69 +361,114 @@ func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t in
type timeLimitAppender struct {
storage.Appender
appV2 storagev2.Appender
maxTime int64
}
func (app *timeLimitAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) {
if sa.T > app.maxTime {
return 0, storage.ErrOutOfBounds
}
return app.appV2.AppendSample(ref, se, sa, opts)
}
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, storage.ErrOutOfBounds
}
ref, err := app.Appender.Append(ref, lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
return app.Appender.Append(ref, lset, t, v)
}
// bucketLimitAppender limits the number of total appended samples in a batch.
type bucketLimitAppender struct {
storage.Appender
appV2 storagev2.Appender
limit int
}
func (app *bucketLimitAppender) limitH(h *histogram.Histogram) error {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) {
return errBucketLimit
}
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
if h.Schema <= histogram.ExponentialSchemaMin {
return errBucketLimit
}
h = h.ReduceResolution(h.Schema - 1)
}
return nil
}
func (app *bucketLimitAppender) limitFH(fh *histogram.FloatHistogram) error {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) {
return errBucketLimit
}
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
if fh.Schema <= histogram.ExponentialSchemaMin {
return errBucketLimit
}
fh = fh.ReduceResolution(fh.Schema - 1)
}
return nil
}
func (app *bucketLimitAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) {
if sa.H != nil {
if err := app.limitH(sa.H); err != nil {
return 0, err
}
}
if sa.FH != nil {
if err := app.limitFH(sa.FH); err != nil {
return 0, err
}
}
return app.appV2.AppendSample(ref, se, sa, opts)
}
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) {
return 0, errBucketLimit
}
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
if h.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
h = h.ReduceResolution(h.Schema - 1)
if err := app.limitH(h); err != nil {
return 0, err
}
}
if fh != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) {
return 0, errBucketLimit
}
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
if fh.Schema <= histogram.ExponentialSchemaMin {
return 0, errBucketLimit
}
fh = fh.ReduceResolution(fh.Schema - 1)
if err := app.limitFH(fh); err != nil {
return 0, err
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
return 0, err
}
return ref, nil
return app.Appender.AppendHistogram(ref, lset, t, h, fh)
}
type maxSchemaAppender struct {
storage.Appender
appV2 storagev2.Appender
maxSchema int32
}
func (app *maxSchemaAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) {
if sa.H != nil {
if histogram.IsExponentialSchema(sa.H.Schema) && sa.H.Schema > app.maxSchema {
sa.H = sa.H.ReduceResolution(app.maxSchema)
}
}
if sa.FH != nil {
if histogram.IsExponentialSchema(sa.FH.Schema) && sa.FH.Schema > app.maxSchema {
sa.FH = sa.FH.ReduceResolution(app.maxSchema)
}
}
return app.appV2.AppendSample(ref, se, sa, opts)
}
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema {

View File

@ -275,16 +275,19 @@ type Appender interface {
// Appender has to be discarded after rollback.
Rollback() error
// SetOptions configures the appender with specific append options such as
// discarding out-of-order samples even if out-of-order is enabled in the TSDB.
SetOptions(opts *AppendOptions)
SetOptionAppender
ExemplarAppender
HistogramAppender
MetadataUpdater
CreatedTimestampAppender
}
type SetOptionAppender interface {
// SetOptions configures the appender with specific append options such as
// discarding out-of-order samples even if out-of-order is enabled in the TSDB.
SetOptions(opts *AppendOptions)
}
// GetRef is an extra interface on Appenders used by downstream projects
// (e.g. Cortex) to avoid maintaining a parallel set of references.
type GetRef interface {

69
storage/v2/interface.go Normal file
View File

@ -0,0 +1,69 @@
package storagev2
import (
"math"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/series"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
)
// Sample to be appended.
// TODO(bwplotka): Double check if function arguments would not be faster.
type Sample struct {
// SchemaURL string one day.
// CT int64 // 0 means no created timestamp.
T int64 // required.
// h, fh, ordered by priority e.g. if h is not nil fh and v is assumed empty.
H *histogram.Histogram
FH *histogram.FloatHistogram
V float64
Exemplars []exemplar.Exemplar // optional exemplars to attach.
}
func StaleSample(t int64) Sample {
return Sample{T: t, V: math.Float64frombits(value.StaleNaN)}
}
// IsHistogram returns true if the sample is a histogram with a native representation.
func (sa Sample) IsHistogram() bool {
return sa.H != nil || sa.FH != nil
}
// Appender provides batched appends against a storage.
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
//
// Operations on the Appender interface are not goroutine-safe.
//
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
type Appender interface {
// Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
// must not be used anymore after Commit has been called.
Commit() error
// Rollback rolls back all modifications made in the appender so far.
// Appender has to be discarded after rollback.
Rollback() error
// AppendSample adds a sample for the given series.
// An optional series reference can be provided to accelerate calls.
// A series reference number is returned which can be used to add further
// samples to the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to Append() at any point. Adding the sample via Append() returns a new
// reference number.
// If the reference is 0 it must not be used for caching.
AppendSample(ref storage.SeriesRef, se series.Series, sa Sample, opts *AppendOptions) (storage.SeriesRef, error)
// Append(ref storage.SeriesRef, se series.Series, sa Sample, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, v float64, exemplars []exemplar.Exemplar, opts *AppendOptions) (storage.SeriesRef, error)
}
type AppendOptions struct {
DiscardOutOfOrder bool
}