From 6d732b3b1d2d2534b0f5bc16c3eb5aed8f6a9f8e Mon Sep 17 00:00:00 2001 From: bwplotka Date: Sat, 15 Mar 2025 21:26:57 +0000 Subject: [PATCH] ff: native-type-and-unit Signed-off-by: bwplotka --- cmd/prometheus/main.go | 4 + model/series/series.go | 108 ++++++++ model/series/series_test.go | 29 +++ scrape/manager.go | 4 + scrape/scrape.go | 477 ++++++++++++++++++++++++++++++++---- scrape/target.go | 119 ++++++--- storage/interface.go | 11 +- storage/v2/interface.go | 69 ++++++ 8 files changed, 743 insertions(+), 78 deletions(-) create mode 100644 model/series/series.go create mode 100644 model/series/series_test.go create mode 100644 storage/v2/interface.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 347bae470c..52816d3694 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -280,6 +280,10 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "otlp-deltatocumulative": c.web.ConvertOTLPDelta = true logger.Info("Converting delta OTLP metrics to cumulative") + case "native-type-and-unit": + c.scrape.EnableNativeTypeAndUnit = true + + logger.Info("Native type and unit enabled") default: logger.Warn("Unknown option for --enable-feature", "option", o) } diff --git a/model/series/series.go b/model/series/series.go new file mode 100644 index 0000000000..bb097df3d2 --- /dev/null +++ b/model/series/series.go @@ -0,0 +1,108 @@ +package series + +import ( + "bytes" + "strconv" + + "github.com/cespare/xxhash/v2" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/model/labels" +) + +const ( + TypeSelector = "__type__" + UnitSelector = "__unit__" + NameSelector = labels.MetricName + + unitSep = '~' + typeSep = '.' +) + +// Series represents identifiable portion of a metric and its labels. +// This is essentially typed version of labels.Labels where metric name +// unit and type are outside of labels. +// +// There are two modes of using Series. +// * Native: Labels does not have any of name, unit type information (specifically +// Labels does not have __name__). This is recommended more, gated by a +// "native-type-and-unit" feature flag.s +// * Compatibility: In this mode Name is empty and Labels include __name__ label. +type Series struct { + // Metric identity. + Name string // required. + Unit string // optional. + Type model.MetricType // required. + + // Labels, so additional, optional attributes/dimensions. + Labels labels.Labels +} + +// NewCompatibleSeries returns series with labels that behave like it used to +// (e.g., name in __name__ label and unit and type skipped). +// +// This is for ease of migration, compatible with the old code. +func NewCompatibleSeries(lset labels.Labels) Series { + return Series{Labels: lset} +} + +// ToCompatibleLabels returns labels.Labels with the old semantics (e.g., name in __name__ label +// and unit and type skipped). This is for ease of migration, compatible with the old code. +func (s Series) ToCompatibleLabels() labels.Labels { + if s.Name == "" { + // If name is empty, it means it's compatibility mode already. + return s.Labels + } + b := labels.NewBuilder(s.Labels) + b.Set(labels.MetricName, s.Name) + return b.Labels() +} + +func (s Series) Hash() uint64 { + var bytea [1024]byte // On stack to avoid memory allocation. + b := bytes.NewBuffer(bytea[:0]) + b.WriteString(s.Name) + b.WriteByte(unitSep) + b.WriteString(s.Unit) + b.WriteByte(typeSep) + b.WriteString(string(s.Type)) + b.Write(s.Labels.Bytes(b.AvailableBuffer())) + return xxhash.Sum64(b.Bytes()) +} + +func (s Series) String() string { + var bytea [1024]byte // On stack to avoid memory allocation while building the output. + b := bytes.NewBuffer(bytea[:0]) + + i := 0 + if !model.LabelName(s.Name).IsValidLegacy() { + b.WriteByte('{') + b.Write(strconv.AppendQuote(b.AvailableBuffer(), s.Name)) + i++ + } else { + b.WriteString(s.Name) + b.WriteByte('{') + } + s.Labels.Range(func(l labels.Label) { + if i > 0 { + b.WriteByte(',') + b.WriteByte(' ') + } + if !model.LabelName(l.Name).IsValidLegacy() { + b.Write(strconv.AppendQuote(b.AvailableBuffer(), l.Name)) + } else { + b.WriteString(l.Name) + } + b.WriteByte('=') + b.Write(strconv.AppendQuote(b.AvailableBuffer(), l.Value)) + i++ + }) + b.WriteByte('}') + if len(s.Unit) > 0 { + b.WriteByte(unitSep) + b.WriteString(s.Unit) + } + b.WriteByte(typeSep) + b.WriteString(string(s.Type)) + return b.String() +} diff --git a/model/series/series_test.go b/model/series/series_test.go new file mode 100644 index 0000000000..8889c24592 --- /dev/null +++ b/model/series/series_test.go @@ -0,0 +1,29 @@ +package series + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" +) + +func TestSeries_String(t *testing.T) { + s := Series{ + Name: "my_metric_seconds_total", + Unit: "seconds", + Type: model.MetricTypeCounter, + Labels: labels.FromStrings("foo", "bar"), + } + require.Equal(t, "my_metric_seconds_total{foo=\"bar\"}~seconds.counter", s.String()) + + // UTF-8 + s2 := Series{ + Name: "my.metric.seconds_total", + Unit: "seconds", + Type: model.MetricTypeCounter, + Labels: labels.FromStrings("foo", "bar"), + } + require.Equal(t, "{\"my.metric.seconds_total\", foo=\"bar\"}~seconds.counter", s2.String()) +} diff --git a/scrape/manager.go b/scrape/manager.go index 5ef5dccb99..7d87aebb2f 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -86,6 +86,10 @@ type Options struct { EnableCreatedTimestampZeroIngestion bool // Option to enable the ingestion of native histograms. EnableNativeHistogramsIngestion bool + // EnableNativeTypeAndUnit enables storing type and unit (if present) + // in TSDB, unlocking cases mentioned in https://github.com/prometheus/proposals/pull/39. + // e.g. unit & type collision detection and selection. + EnableNativeTypeAndUnit bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption diff --git a/scrape/scrape.go b/scrape/scrape.go index 518103d345..77062397f4 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -37,6 +37,10 @@ import ( "github.com/prometheus/common/promslog" "github.com/prometheus/common/version" + "github.com/prometheus/prometheus/model/series" + + storagev2 "github.com/prometheus/prometheus/storage/v2" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/exemplar" @@ -195,6 +199,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.convertClassicHistToNHCB, options.EnableNativeHistogramsIngestion, options.EnableCreatedTimestampZeroIngestion, + options.EnableNativeTypeAndUnit, options.ExtraMetrics, options.AppendMetadata, opts.target, @@ -742,7 +747,37 @@ func appender(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int3 maxSchema: maxSchema, } } + return app +} +// appenderV2 returns an appender for ingested samples from the target. +func appenderV2(app storagev2.Appender, sampleLimit, bucketLimit int, maxSchema int32) storagev2.Appender { + app = &timeLimitAppender{ + appV2: app, + maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), + } + + // The sampleLimit is applied after metrics are potentially dropped via relabeling. + if sampleLimit > 0 { + app = &limitAppender{ + appV2: app, + limit: sampleLimit, + } + } + + if bucketLimit > 0 { + app = &bucketLimitAppender{ + appV2: app, + limit: bucketLimit, + } + } + + if maxSchema < histogram.ExponentialSchemaMax { + app = &maxSchemaAppender{ + appV2: app, + maxSchema: maxSchema, + } + } return app } @@ -885,7 +920,7 @@ type cacheEntry struct { ref storage.SeriesRef lastIter uint64 hash uint64 - lset labels.Labels + se series.Series } type scrapeLoop struct { @@ -916,6 +951,7 @@ type scrapeLoop struct { // Feature flagged options. enableNativeHistogramIngestion bool enableCTZeroIngestion bool + enableNativeUnitAndType bool appender func(ctx context.Context) storage.Appender symbolTable *labels.SymbolTable @@ -959,8 +995,8 @@ type scrapeCache struct { // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]labels.Labels - seriesPrev map[uint64]labels.Labels + seriesCur map[uint64]series.Series + seriesPrev map[uint64]series.Series // TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to // avoid locking (using metadata API can block scraping). @@ -987,8 +1023,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache { return &scrapeCache{ series: map[string]*cacheEntry{}, droppedSeries: map[string]*uint64{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, + seriesCur: map[uint64]series.Series{}, + seriesPrev: map[uint64]series.Series{}, metadata: map[string]*metaEntry{}, metrics: metrics, } @@ -1057,11 +1093,11 @@ func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) { return e, true, alreadyScraped } -func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { +func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, se series.Series, hash uint64) { if ref == 0 { return } - c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, se: se, hash: hash} } func (c *scrapeCache) addDropped(met []byte) { @@ -1077,11 +1113,11 @@ func (c *scrapeCache) getDropped(met []byte) bool { return ok } -func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { - c.seriesCur[hash] = lset +func (c *scrapeCache) trackStaleness(hash uint64, se series.Series) { + c.seriesCur[hash] = se } -func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { +func (c *scrapeCache) forEachStale(f func(series.Series) bool) { for h, lset := range c.seriesPrev { if _, ok := c.seriesCur[h]; !ok { if !f(lset) { @@ -1223,6 +1259,7 @@ func newScrapeLoop(ctx context.Context, convertClassicHistToNHCB bool, enableNativeHistogramIngestion bool, enableCTZeroIngestion bool, + enableNativeUnitAndType bool, reportExtraMetrics bool, appendMetadataToWAL bool, target *Target, @@ -1279,6 +1316,7 @@ func newScrapeLoop(ctx context.Context, convertClassicHistToNHCB: convertClassicHistToNHCB, enableNativeHistogramIngestion: enableNativeHistogramIngestion, enableCTZeroIngestion: enableCTZeroIngestion, + enableNativeUnitAndType: enableNativeUnitAndType, reportExtraMetrics: reportExtraMetrics, appendMetadataToWAL: appendMetadataToWAL, metrics: metrics, @@ -1577,11 +1615,26 @@ type appendErrors struct { } // Update the stale markers. -func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) { - sl.cache.forEachStale(func(lset labels.Labels) bool { +func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, appV2 storagev2.Appender, defTime int64) (err error) { + if appV2 != nil { + s := storagev2.StaleSample(defTime) + sl.cache.forEachStale(func(se series.Series) bool { + // Series no longer exposed, mark it stale. + _, err = appV2.AppendSample(0, se, s, &storagev2.AppendOptions{DiscardOutOfOrder: true}) + switch { + case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): + // Do not count these in logging, as this is expected if a target + // goes away and comes back again with a new scrape loop. + err = nil + } + return err == nil + }) + return nil + } + sl.cache.forEachStale(func(se series.Series) bool { // Series no longer exposed, mark it stale. app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) - _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) + _, err = app.Append(0, se.ToCompatibleLabels(), defTime, math.Float64frombits(value.StaleNaN)) app.SetOptions(nil) switch { case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): @@ -1591,15 +1644,57 @@ func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (e } return err == nil }) - return + return nil } func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { + if sl.enableNativeUnitAndType { + if sl.enableCTZeroIngestion { + return 0, 0, 0, errors.New("native-type-and-unit feature flag can't be used when created-timestamp-zero-ingestion is enabled") + } + if sl.appendMetadataToWAL { + return 0, 0, 0, errors.New("native-type-and-unit feature flag can't be used when metadata-in-wal is enabled") + } + appV2, ok := app.(storagev2.Appender) + if !ok { + return 0, 0, 0, errors.New("unexpected bug; appender should have storage V2 interface implemented") + } + return sl.appendV2(appV2, b, contentType, ts) + } + return sl.appendV1(app, b, contentType, ts) +} + +func (sl *scrapeLoop) attachExemplars(p textparse.Parser, sa *storagev2.Sample) { + var e exemplar.Exemplar + + for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) { + if !e.HasTs { + if sa.IsHistogram() { + // We drop exemplars for native histograms if they don't have a timestamp. + // Missing timestamps are deliberately not supported as we want to start + // enforcing timestamps for exemplars as otherwise proper deduplication + // is inefficient and purely based on heuristics: we cannot distinguish + // between repeated exemplars and new instances with the same values. + // This is done silently without logs as it is not an error but out of spec. + // This does not affect classic histograms so that behaviour is unchanged. + e = exemplar.Exemplar{} // Reset for next time round loop. + continue + } + e.Ts = sa.T + } + sa.Exemplars = append(sa.Exemplars, e) + e = exemplar.Exemplar{} // Reset for next time round loop. + } + // Sort so that checking for duplicates / out of order is more efficient during validation. + slices.SortFunc(sa.Exemplars, exemplar.Compare) +} + +func (sl *scrapeLoop) appendV1(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { defTime := timestamp.FromTime(ts) if len(b) == 0 { // Empty scrape. Just update the stale makers and swap the cache (but don't flush it). - err = sl.updateStaleMarkers(app, defTime) + err = sl.updateStaleMarkers(app, nil, defTime) sl.cache.iterDone(false) return } @@ -1629,7 +1724,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, appErrs = appendErrors{} sampleLimitErr error bucketLimitErr error - lset labels.Labels // escapes to heap so hoisted out of loop + se series.Series // escapes to heap so hoisted out of loop e exemplar.Exemplar // escapes to heap so hoisted out of loop lastMeta *metaEntry lastMFName []byte @@ -1711,33 +1806,34 @@ loop: if seriesCached { ref = ce.ref - lset = ce.lset + se = ce.se hash = ce.hash } else { - p.Labels(&lset) - hash = lset.Hash() + // In this mode series.Series only contains labels with __name__ inside. + p.Labels(&se.Labels) + hash = se.Hash() // Hash label set as it is seen local to the target. Then add target labels // and relabeling and store the final label set. - lset = sl.sampleMutator(lset) + se.Labels = sl.sampleMutator(se.Labels) // The label set may be set to empty to indicate dropping. - if lset.IsEmpty() { + if se.Labels.IsEmpty() { sl.cache.addDropped(met) continue } - if !lset.Has(labels.MetricName) { + if !se.Labels.Has(labels.MetricName) { err = errNameLabelMandatory break loop } - if !lset.IsValid(sl.validationScheme) { - err = fmt.Errorf("invalid metric name or label names: %s", lset.String()) + if !se.Labels.IsValid(sl.validationScheme) { + err = fmt.Errorf("invalid metric name or label names: %s", se.Labels.String()) break loop } // If any label limits is exceeded the scrape should fail. - if err = verifyLabelLimits(lset, sl.labelLimits); err != nil { + if err = verifyLabelLimits(se.Labels, sl.labelLimits); err != nil { sl.metrics.targetScrapePoolExceededLabelLimits.Inc() break loop } @@ -1750,12 +1846,12 @@ loop: if ctMs := p.CreatedTimestamp(); ctMs != 0 { if isHistogram && sl.enableNativeHistogramIngestion { if h != nil { - ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, ctMs, h, nil) + ref, err = app.AppendHistogramCTZeroSample(ref, se.Labels, t, ctMs, h, nil) } else { - ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, ctMs, nil, fh) + ref, err = app.AppendHistogramCTZeroSample(ref, se.Labels, t, ctMs, nil, fh) } } else { - ref, err = app.AppendCTZeroSample(ref, lset, t, ctMs) + ref, err = app.AppendCTZeroSample(ref, se.Labels, t, ctMs) } if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. // CT is an experimental feature. For now, we don't need to fail the @@ -1767,18 +1863,18 @@ loop: if isHistogram && sl.enableNativeHistogramIngestion { if h != nil { - ref, err = app.AppendHistogram(ref, lset, t, h, nil) + ref, err = app.AppendHistogram(ref, se.Labels, t, h, nil) } else { - ref, err = app.AppendHistogram(ref, lset, t, nil, fh) + ref, err = app.AppendHistogram(ref, se.Labels, t, nil, fh) } } else { - ref, err = app.Append(ref, lset, t, val) + ref, err = app.Append(ref, se.Labels, t, val) } } if err == nil { if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce.lset) + sl.cache.trackStaleness(ce.hash, ce.se) } } @@ -1793,9 +1889,9 @@ loop: if !seriesCached { if parsedTimestamp == nil || sl.trackTimestampsStaleness { // Bypass staleness logic if there is an explicit timestamp. - sl.cache.trackStaleness(hash, lset) + sl.cache.trackStaleness(hash, se) } - sl.cache.addRef(met, ref, lset, hash) + sl.cache.addRef(met, ref, se, hash) if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil { seriesAdded++ } @@ -1829,7 +1925,7 @@ loop: slices.SortFunc(exemplars, exemplar.Compare) outOfOrderExemplars := 0 for _, e := range exemplars { - _, exemplarErr := app.AppendExemplar(ref, lset, e) + _, exemplarErr := app.AppendExemplar(ref, se.Labels, e) switch { case exemplarErr == nil: // Do nothing. @@ -1854,8 +1950,8 @@ loop: // In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName. // However, optional TYPE etc metadata and broken OM text can break this, detect those cases here. // TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. CT and NHCB parsing). - if isSeriesPartOfFamily(lset.Get(labels.MetricName), lastMFName, lastMeta.Type) { - if _, merr := app.UpdateMetadata(ref, lset, lastMeta.Metadata); merr != nil { + if isSeriesPartOfFamily(se.Labels.Get(labels.MetricName), lastMFName, lastMeta.Type) { + if _, merr := app.UpdateMetadata(ref, se.Labels, lastMeta.Metadata); merr != nil { // No need to fail the scrape on errors appending metadata. sl.l.Debug("Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", lastMeta.Metadata), "err", merr) } @@ -1890,11 +1986,262 @@ loop: sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) } if err == nil { - err = sl.updateStaleMarkers(app, defTime) + err = sl.updateStaleMarkers(app, nil, defTime) } return } +// appendV2 appends sample when the new storagev2 interface is enabled and available. +func (sl *scrapeLoop) appendV2(app storagev2.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { + defTime := timestamp.FromTime(ts) + + if len(b) == 0 { + // Empty scrape. Just update the stale makers and swap the cache (but don't flush it). + err = sl.updateStaleMarkers(nil, app, defTime) + sl.cache.iterDone(false) + return + } + + p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable) + if p == nil { + sl.l.Error( + "Failed to determine correct type of scrape target.", + "content_type", contentType, + "fallback_media_type", sl.fallbackScrapeProtocol, + "err", err, + ) + return + } + if sl.convertClassicHistToNHCB { + p = textparse.NewNHCBParser(p, sl.symbolTable, sl.alwaysScrapeClassicHist) + } + if err != nil { + sl.l.Debug( + "Invalid content type on scrape, using fallback setting.", + "content_type", contentType, + "fallback_media_type", sl.fallbackScrapeProtocol, + "err", err, + ) + } + var ( + appErrs = appendErrors{} + sampleLimitErr error + bucketLimitErr error + se series.Series // escapes to heap so hoisted out of loop + + lastMeta *metaEntry + lastMFName []byte + exemplars = make([]exemplar.Exemplar, 0, 1) + ) + + // Take an appender with limits. + app = appenderV2(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema) + + defer func() { + if err != nil { + return + } + // Flush and swap the cache as the scrape was non-empty. + sl.cache.iterDone(true) + }() + +loop: + for { + var ( + et textparse.Entry + sampleAdded, isHistogram bool + met []byte + parsedTimestamp *int64 + ) + if et, err = p.Next(); err != nil { + if errors.Is(err, io.EOF) { + err = nil + } + break + } + switch et { + // TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram() + // otherwise we can expose metadata without series on metadata API. + case textparse.EntryType: + // TODO(bwplotka): Build meta entry directly instead of locking and updating the map. This will + // allow to properly update metadata when e.g unit was added, then removed; + lastMFName, lastMeta = sl.cache.setType(p.Type()) + continue + case textparse.EntryHelp: + lastMFName, lastMeta = sl.cache.setHelp(p.Help()) + continue + case textparse.EntryUnit: + lastMFName, lastMeta = sl.cache.setUnit(p.Unit()) + continue + case textparse.EntryComment: + continue + case textparse.EntryHistogram: + isHistogram = true + default: + } + total++ + + // Reset sa. + sa := storagev2.Sample{T: defTime, Exemplars: exemplars[:0]} + if isHistogram { + met, parsedTimestamp, sa.H, sa.FH = p.Histogram() + } else { + met, parsedTimestamp, sa.V = p.Series() + } + if !sl.honorTimestamps { + parsedTimestamp = nil + } + if parsedTimestamp != nil { + sa.T = *parsedTimestamp + } + + if sl.cache.getDropped(met) { + continue + } + ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met) + var ( + ref storage.SeriesRef + hash uint64 + ) + + if seriesCached { + ref = ce.ref + se = ce.se + hash = ce.hash + } else { + // V2 means we do native identifiable metadata, which means name is native. + p.Labels(&se.Labels) + se.Name = se.Labels.Get(model.MetricNameLabel) + se.Labels = se.Labels.DropMetricName() + if lastMeta != nil { + // In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName. + // However, optional TYPE etc metadata and broken OM text can break this, detect those cases here. + // TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. CT and NHCB parsing). + if isSeriesPartOfFamily(se.Name, lastMFName, lastMeta.Type) { + se.Type = lastMeta.Type + se.Unit = lastMeta.Unit + } + } + hash = se.Hash() + + // Hash label set as it is seen local to the target. Then add target labels + // and relabeling and store the final label set. + se.Labels = sl.sampleMutator(se.Labels) + + // The label set may be set to empty to indicate dropping. + if se.Labels.IsEmpty() { + sl.cache.addDropped(met) + continue + } + + if len(se.Name) == 0 { + err = errNameLabelMandatory + break loop + } + if !se.Labels.IsValid(sl.validationScheme) { + err = fmt.Errorf("invalid metric name or label names: %s", se.Labels.String()) + break loop + } + + // If any label limits is exceeded the scrape should fail. + if err = verifyLabelLimits(se.Labels, sl.labelLimits); err != nil { + sl.metrics.targetScrapePoolExceededLabelLimits.Inc() + break loop + } + } + + sl.attachExemplars(p, &sa) + + // TODO(bwplotka): Add special AppendSample error structure that would give this data. + //outOfOrderExemplars := 0 + //for _, e := range exemplars { + // _, exemplarErr := app.AppendExemplar(ref, lset, e) + // switch { + // case exemplarErr == nil: + // // Do nothing. + // case errors.Is(exemplarErr, storage.ErrOutOfOrderExemplar): + // outOfOrderExemplars++ + // default: + // // Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors. + // sl.l.Debug("Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) + // } + //} + //if outOfOrderExemplars > 0 && outOfOrderExemplars == len(exemplars) { + // // Only report out of order exemplars if all are out of order, otherwise this was a partial update + // // to some existing set of exemplars. + // appErrs.numExemplarOutOfOrder += outOfOrderExemplars + // sl.l.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", exemplars[len(exemplars)-1])) + // sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars)) + //} + + if seriesAlreadyScraped && parsedTimestamp == nil { + err = storage.ErrDuplicateSampleForTimestamp + } else if !sa.IsHistogram() || (sa.IsHistogram() && sl.enableNativeHistogramIngestion) { + ref, err = app.AppendSample(ref, se, sa, nil) + } + if err == nil { + if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { + sl.cache.trackStaleness(ce.hash, ce.se) + } + } + + sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + sl.l.Debug("Unexpected error", "series", string(met), "err", err) + } + break loop + } + + if !seriesCached { + if parsedTimestamp == nil || sl.trackTimestampsStaleness { + // Bypass staleness logic if there is an explicit timestamp. + sl.cache.trackStaleness(hash, se) + } + sl.cache.addRef(met, ref, se, hash) + if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil { + seriesAdded++ + } + } + + // Increment added even if there's an error so we correctly report the + // number of samples remaining after relabeling. + // We still report duplicated samples here since this number should be the exact number + // of time series exposed on a scrape after relabelling. + added++ + } + if sampleLimitErr != nil { + if err == nil { + err = sampleLimitErr + } + // We only want to increment this once per scrape, so this is Inc'd outside the loop. + sl.metrics.targetScrapeSampleLimit.Inc() + } + if bucketLimitErr != nil { + if err == nil { + err = bucketLimitErr // If sample limit is hit, that error takes precedence. + } + // We only want to increment this once per scrape, so this is Inc'd outside the loop. + sl.metrics.targetScrapeNativeHistogramBucketLimit.Inc() + } + if appErrs.numOutOfOrder > 0 { + sl.l.Warn("Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder) + } + if appErrs.numDuplicates > 0 { + sl.l.Warn("Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates) + } + if appErrs.numOutOfBounds > 0 { + sl.l.Warn("Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds) + } + if appErrs.numExemplarOutOfOrder > 0 { + sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) + } + if err != nil { + return total, added, seriesAdded, err + } + return total, added, seriesAdded, sl.updateStaleMarkers(nil, app, defTime) +} + func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) bool { mfNameStr := yoloString(mfName) if !strings.HasPrefix(mName, mfNameStr) { // Fast path. @@ -2154,29 +2501,37 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er } func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t int64, v float64, b *labels.Builder) error { + if sl.enableNativeUnitAndType { + appV2, ok := app.(storagev2.Appender) + if !ok { + return errors.New("expected app to implement storagev2.Appender too, bug?") + } + return sl.addReportSampleV2(appV2, s, t, v, b) + } + ce, ok, _ := sl.cache.get(s.name) var ref storage.SeriesRef - var lset labels.Labels + var se series.Series if ok { ref = ce.ref - lset = ce.lset + se = ce.se } else { // The constants are suffixed with the invalid \xff unicode rune to avoid collisions // with scraped metrics in the cache. // We have to drop it when building the actual metric. b.Reset(labels.EmptyLabels()) b.Set(labels.MetricName, string(s.name[:len(s.name)-1])) - lset = sl.reportSampleMutator(b.Labels()) + se.Labels = sl.reportSampleMutator(b.Labels()) } - ref, err := app.Append(ref, lset, t, v) + ref, err := app.Append(ref, se.ToCompatibleLabels(), t, v) switch { case err == nil: if !ok { - sl.cache.addRef(s.name, ref, lset, lset.Hash()) + sl.cache.addRef(s.name, ref, se, se.Hash()) // We only need to add metadata once a scrape target appears. if sl.appendMetadataToWAL { - if _, merr := app.UpdateMetadata(ref, lset, s.Metadata); merr != nil { + if _, merr := app.UpdateMetadata(ref, se.ToCompatibleLabels(), s.Metadata); merr != nil { sl.l.Debug("Error when appending metadata in addReportSample", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", s.Metadata), "err", merr) } } @@ -2191,6 +2546,40 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t in } } +func (sl *scrapeLoop) addReportSampleV2(app storagev2.Appender, s reportSample, t int64, v float64, b *labels.Builder) error { + ce, ok, _ := sl.cache.get(s.name) + var ref storage.SeriesRef + var se series.Series + if ok { + ref = ce.ref + se = ce.se + } else { + // The constants are suffixed with the invalid \xff unicode rune to avoid collisions + // with scraped metrics in the cache. + // We have to drop it when building the actual metric. + se.Name = string(s.name[:len(s.name)-1]) + se.Type = s.Type + + b.Reset(labels.EmptyLabels()) + se.Labels = sl.reportSampleMutator(b.Labels()) + } + + ref, err := app.AppendSample(ref, se, storagev2.Sample{T: t, V: v}, nil) + switch { + case err == nil: + if !ok { + sl.cache.addRef(s.name, ref, se, se.Hash()) + } + return nil + case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): + // Do not log here, as this is expected if a target goes away and comes back + // again with a new scrape loop. + return nil + default: + return err + } +} + // zeroConfig returns a new scrape config that only contains configuration items // that alter metrics. func zeroConfig(c *config.ScrapeConfig) *config.ScrapeConfig { diff --git a/scrape/target.go b/scrape/target.go index 4f576504f0..374b07c0be 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -24,6 +24,9 @@ import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/series" + storagev2 "github.com/prometheus/prometheus/storage/v2" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/histogram" @@ -326,11 +329,22 @@ var ( // limitAppender limits the number of total appended samples in a batch. type limitAppender struct { storage.Appender + appV2 storagev2.Appender limit int i int } +func (app *limitAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) { + if !value.IsStaleNaN(sa.V) { + app.i++ + if app.i > app.limit { + return 0, errSampleLimit + } + } + return app.appV2.AppendSample(ref, se, sa, opts) +} + func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if !value.IsStaleNaN(v) { app.i++ @@ -347,69 +361,114 @@ func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t in type timeLimitAppender struct { storage.Appender + appV2 storagev2.Appender maxTime int64 } +func (app *timeLimitAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) { + if sa.T > app.maxTime { + return 0, storage.ErrOutOfBounds + } + + return app.appV2.AppendSample(ref, se, sa, opts) +} + func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if t > app.maxTime { return 0, storage.ErrOutOfBounds } - ref, err := app.Appender.Append(ref, lset, t, v) - if err != nil { - return 0, err - } - return ref, nil + return app.Appender.Append(ref, lset, t, v) } // bucketLimitAppender limits the number of total appended samples in a batch. type bucketLimitAppender struct { storage.Appender + appV2 storagev2.Appender limit int } +func (app *bucketLimitAppender) limitH(h *histogram.Histogram) error { + // Return with an early error if the histogram has too many buckets and the + // schema is not exponential, in which case we can't reduce the resolution. + if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) { + return errBucketLimit + } + for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit { + if h.Schema <= histogram.ExponentialSchemaMin { + return errBucketLimit + } + h = h.ReduceResolution(h.Schema - 1) + } + return nil +} + +func (app *bucketLimitAppender) limitFH(fh *histogram.FloatHistogram) error { + // Return with an early error if the histogram has too many buckets and the + // schema is not exponential, in which case we can't reduce the resolution. + if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) { + return errBucketLimit + } + for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit { + if fh.Schema <= histogram.ExponentialSchemaMin { + return errBucketLimit + } + fh = fh.ReduceResolution(fh.Schema - 1) + } + return nil +} + +func (app *bucketLimitAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) { + if sa.H != nil { + if err := app.limitH(sa.H); err != nil { + return 0, err + } + } + if sa.FH != nil { + if err := app.limitFH(sa.FH); err != nil { + return 0, err + } + } + return app.appV2.AppendSample(ref, se, sa, opts) +} + func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if h != nil { - // Return with an early error if the histogram has too many buckets and the - // schema is not exponential, in which case we can't reduce the resolution. - if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(h.Schema) { - return 0, errBucketLimit - } - for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit { - if h.Schema <= histogram.ExponentialSchemaMin { - return 0, errBucketLimit - } - h = h.ReduceResolution(h.Schema - 1) + if err := app.limitH(h); err != nil { + return 0, err } } if fh != nil { - // Return with an early error if the histogram has too many buckets and the - // schema is not exponential, in which case we can't reduce the resolution. - if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && !histogram.IsExponentialSchema(fh.Schema) { - return 0, errBucketLimit - } - for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit { - if fh.Schema <= histogram.ExponentialSchemaMin { - return 0, errBucketLimit - } - fh = fh.ReduceResolution(fh.Schema - 1) + if err := app.limitFH(fh); err != nil { + return 0, err } } - ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) - if err != nil { - return 0, err - } - return ref, nil + return app.Appender.AppendHistogram(ref, lset, t, h, fh) } type maxSchemaAppender struct { storage.Appender + appV2 storagev2.Appender maxSchema int32 } +func (app *maxSchemaAppender) AppendSample(ref storage.SeriesRef, se series.Series, sa storagev2.Sample, opts *storagev2.AppendOptions) (storage.SeriesRef, error) { + if sa.H != nil { + if histogram.IsExponentialSchema(sa.H.Schema) && sa.H.Schema > app.maxSchema { + sa.H = sa.H.ReduceResolution(app.maxSchema) + } + } + if sa.FH != nil { + if histogram.IsExponentialSchema(sa.FH.Schema) && sa.FH.Schema > app.maxSchema { + sa.FH = sa.FH.ReduceResolution(app.maxSchema) + } + } + return app.appV2.AppendSample(ref, se, sa, opts) +} + func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if h != nil { if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema { diff --git a/storage/interface.go b/storage/interface.go index 32b90cc10a..d2f5c2a170 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -275,16 +275,19 @@ type Appender interface { // Appender has to be discarded after rollback. Rollback() error - // SetOptions configures the appender with specific append options such as - // discarding out-of-order samples even if out-of-order is enabled in the TSDB. - SetOptions(opts *AppendOptions) - + SetOptionAppender ExemplarAppender HistogramAppender MetadataUpdater CreatedTimestampAppender } +type SetOptionAppender interface { + // SetOptions configures the appender with specific append options such as + // discarding out-of-order samples even if out-of-order is enabled in the TSDB. + SetOptions(opts *AppendOptions) +} + // GetRef is an extra interface on Appenders used by downstream projects // (e.g. Cortex) to avoid maintaining a parallel set of references. type GetRef interface { diff --git a/storage/v2/interface.go b/storage/v2/interface.go new file mode 100644 index 0000000000..11a49991ce --- /dev/null +++ b/storage/v2/interface.go @@ -0,0 +1,69 @@ +package storagev2 + +import ( + "math" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/series" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/storage" +) + +// Sample to be appended. +// TODO(bwplotka): Double check if function arguments would not be faster. +type Sample struct { + // SchemaURL string one day. + // CT int64 // 0 means no created timestamp. + T int64 // required. + + // h, fh, ordered by priority e.g. if h is not nil fh and v is assumed empty. + H *histogram.Histogram + FH *histogram.FloatHistogram + V float64 + + Exemplars []exemplar.Exemplar // optional exemplars to attach. +} + +func StaleSample(t int64) Sample { + return Sample{T: t, V: math.Float64frombits(value.StaleNaN)} +} + +// IsHistogram returns true if the sample is a histogram with a native representation. +func (sa Sample) IsHistogram() bool { + return sa.H != nil || sa.FH != nil +} + +// Appender provides batched appends against a storage. +// It must be completed with a call to Commit or Rollback and must not be reused afterwards. +// +// Operations on the Appender interface are not goroutine-safe. +// +// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender. +// The behaviour is undefined if samples of different types are appended to the same series in a single Commit(). +type Appender interface { + // Commit submits the collected samples and purges the batch. If Commit + // returns a non-nil error, it also rolls back all modifications made in + // the appender so far, as Rollback would do. In any case, an Appender + // must not be used anymore after Commit has been called. + Commit() error + + // Rollback rolls back all modifications made in the appender so far. + // Appender has to be discarded after rollback. + Rollback() error + + // AppendSample adds a sample for the given series. + // An optional series reference can be provided to accelerate calls. + // A series reference number is returned which can be used to add further + // samples to the given series in the same or later transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to Append() at any point. Adding the sample via Append() returns a new + // reference number. + // If the reference is 0 it must not be used for caching. + AppendSample(ref storage.SeriesRef, se series.Series, sa Sample, opts *AppendOptions) (storage.SeriesRef, error) + // Append(ref storage.SeriesRef, se series.Series, sa Sample, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, v float64, exemplars []exemplar.Exemplar, opts *AppendOptions) (storage.SeriesRef, error) +} + +type AppendOptions struct { + DiscardOutOfOrder bool +}