diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 9b4ba0f4ba..ca908c8b29 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -214,6 +214,7 @@ type flagConfig struct { // for ease of use. enablePerStepStats bool enableConcurrentRuleEval bool + useStartTimestamps bool prometheusURL string corsRegexString string @@ -293,6 +294,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols logger.Info("Experimental start timestamp storage enabled. OpenMetrics 1.0 parsing will parse _created metrics as ST instead of normal sample. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) + case "use-start-timestamps": + c.useStartTimestamps = true + logger.Info("Experimental usage of start timestamps in PromQL engine is enabled.") case "delayed-compaction": c.tsdb.EnableDelayedCompaction = true logger.Info("Experimental delayed compaction is enabled.") @@ -610,7 +614,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-start-timestamps, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). StringsVar(&cfg.featureList) a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode) @@ -951,6 +955,7 @@ func main() { EnablePerStepStats: cfg.enablePerStepStats, EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval, EnableTypeAndUnitLabels: cfg.scrape.EnableTypeAndUnitLabels, + UseStartTimestamps: cfg.useStartTimestamps, FeatureRegistry: features.DefaultRegistry, Parser: promqlParser, } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index ad4763d716..df0425c98d 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -59,7 +59,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature ... | Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature ... | Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-start-timestamps, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --agent | Run Prometheus in 'Agent mode'. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index ccc3a2bcde..e29cac4a03 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -101,6 +101,12 @@ Besides enabling this feature in Prometheus, start timestamps need to be exposed > * ST for native histograms and NHCBs are not yet implemented (see [#18315](https://github.com/prometheus/prometheus/issues/18315)). > * PromQL use of ST is out of scope of this feature. +## Start timestamp (ST) usage in PromQL functions + +`--enable-feature=use-start-timestamps` + +Enables the use of start timestamps (ST) in PromQL functions such as `rate()`, `irate()`, and `increase()`. This feature doesn't currently work with extended range selectors (`promql-extended-range-selectors`). + ## Concurrent evaluation of independent rules `--enable-feature=concurrent-rule-eval` diff --git a/promql/engine.go b/promql/engine.go index 59763dbf77..c4048ae239 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -333,6 +333,9 @@ type EngineOpts struct { // EnableTypeAndUnitLabels will allow PromQL Engine to make decisions based on the type and unit labels. EnableTypeAndUnitLabels bool + // UseStartTimestamps enables start timestamp usage in functions such as rate(). + UseStartTimestamps bool + // FeatureRegistry is the registry for tracking enabled/disabled features. FeatureRegistry features.Collector @@ -357,6 +360,7 @@ type Engine struct { enablePerStepStats bool enableDelayedNameRemoval bool enableTypeAndUnitLabels bool + useStartTimestamps bool parser parser.Parser } @@ -486,6 +490,7 @@ func NewEngine(opts EngineOpts) *Engine { enablePerStepStats: opts.EnablePerStepStats, enableDelayedNameRemoval: opts.EnableDelayedNameRemoval, enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels, + useStartTimestamps: opts.UseStartTimestamps, parser: opts.Parser, } } @@ -801,6 +806,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ng.enableDelayedNameRemoval, enableTypeAndUnitLabels: ng.enableTypeAndUnitLabels, + useStartTimestamps: ng.useStartTimestamps, querier: querier, } query.sampleStats.InitStepTracking(start, start, 1) @@ -861,6 +867,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ng.enableDelayedNameRemoval, enableTypeAndUnitLabels: ng.enableTypeAndUnitLabels, + useStartTimestamps: ng.useStartTimestamps, querier: querier, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) @@ -1148,6 +1155,7 @@ type evaluator struct { noStepSubqueryIntervalFn func(rangeMillis int64) int64 enableDelayedNameRemoval bool enableTypeAndUnitLabels bool + useStartTimestamps bool querier storage.Querier } @@ -1239,6 +1247,9 @@ type EvalNodeHelper struct { // Additional options for the evaluation. enableDelayedNameRemoval bool + + // StartTimestamps optionally provides sample start timestamps aligned with matrix samples. + StartTimestamps *StartTimestamps } func (enh *EvalNodeHelper) resetSigsPresent() []bool { @@ -1733,7 +1744,7 @@ func (ev *evaluator) smoothSeries(series []storage.Series, offset time.Duration) matrixStart := dataTS - lb matrixEnd := dataTS + lb - floats, hists = ev.matrixIterSlice(it, matrixStart, matrixEnd, floats, hists) + floats, hists, _ = ev.matrixIterSlice(it, matrixStart, matrixEnd, floats, hists, nil) if len(floats) == 0 && len(hists) == 0 { continue } @@ -2088,6 +2099,12 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, it := storage.NewBuffer(bufferRange) var chkIter chunkenc.Iterator + var startTimestamps *StartTimestamps + if ev.useStartTimestamps && (e.Func.Name == "rate" || e.Func.Name == "irate" || e.Func.Name == "increase") { + // TODO: consider pooling this. + startTimestamps = &StartTimestamps{} + } + // The last_over_time and first_over_time functions act like // offset; thus, they should keep the metric name. For all the // other range vector functions, the only change needed is to @@ -2107,6 +2124,9 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, if histograms != nil { histograms = histograms[:0] } + if startTimestamps != nil { + startTimestamps.Reset() + } chkIter = s.Iterator(chkIter) it.Reset(chkIter) @@ -2153,7 +2173,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, mint -= durationMilliseconds(ev.lookbackDelta) maxt += durationMilliseconds(ev.lookbackDelta) } - floats, histograms = ev.matrixIterSlice(it, mint, maxt, floats, histograms) + floats, histograms, startTimestamps = ev.matrixIterSlice(it, mint, maxt, floats, histograms, startTimestamps) } if len(floats)+len(histograms) == 0 { continue @@ -2167,6 +2187,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, inMatrix[0].Floats = floats inMatrix[0].Histograms = histograms enh.Ts = ts + enh.StartTimestamps = startTimestamps // Make the function call. outVec, annos := call(vectorVals, inMatrix, e.Args, enh) @@ -2382,6 +2403,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ev.enableDelayedNameRemoval, enableTypeAndUnitLabels: ev.enableTypeAndUnitLabels, + useStartTimestamps: ev.useStartTimestamps, querier: ev.querier, } @@ -2423,6 +2445,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ev.enableDelayedNameRemoval, enableTypeAndUnitLabels: ev.enableTypeAndUnitLabels, + useStartTimestamps: ev.useStartTimestamps, querier: ev.querier, } res, ws := newEv.eval(ctx, e.Expr) @@ -2690,7 +2713,7 @@ func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSele Metric: series[i].Labels(), } - ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil) + ss.Floats, ss.Histograms, _ = ev.matrixIterSlice(it, mint, maxt, nil, nil, nil) switch { case vs.Anchored: if ss.Histograms != nil { @@ -2724,10 +2747,17 @@ func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSele // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. +// +// This function will also optionally track corresponding StartTimestamps for +// each float/histogram datapoint if the passed-in startTimestamps is non-nil. +// In this case, the caller must always pass in startTimestamps with fields +// whose lengths exactly match those of the "floats" and "histograms" slices. +// Typically this is accomplished by passing in either all empty slices or the +// values returned by a previous call. func (ev *evaluator) matrixIterSlice( it *storage.BufferedSeriesIterator, mint, maxt int64, - floats []FPoint, histograms []HPoint, -) ([]FPoint, []HPoint) { + floats []FPoint, histograms []HPoint, startTimestamps *StartTimestamps, +) ([]FPoint, []HPoint, *StartTimestamps) { mintFloats, mintHistograms := mint, mint // First floats... @@ -2745,11 +2775,22 @@ func (ev *evaluator) matrixIterSlice( floats = floats[:len(floats)-drop] // Only append points with timestamps after the last timestamp we have. mintFloats = floats[len(floats)-1].T + + if startTimestamps != nil { + // Truncate startTimestamps at the same drop point. + copy(startTimestamps.Floats, startTimestamps.Floats[drop:]) + startTimestamps.Floats = startTimestamps.Floats[:len(startTimestamps.Floats)-drop] + } } else { ev.currentSamples -= len(floats) if floats != nil { floats = floats[:0] } + + if startTimestamps != nil { + // Clear the startTimestamps since we are clearing floats now. + startTimestamps.Floats = startTimestamps.Floats[:0] + } } // ...then the same for histograms. TODO(beorn7): Use generics? @@ -2772,16 +2813,27 @@ func (ev *evaluator) matrixIterSlice( ev.currentSamples -= totalHPointSize(histograms) // Only append points with timestamps after the last timestamp we have. mintHistograms = histograms[len(histograms)-1].T + + if startTimestamps != nil { + // Truncate startTimestamps at the same drop point. + copy(startTimestamps.Histograms, startTimestamps.Histograms[drop:]) + startTimestamps.Histograms = startTimestamps.Histograms[:len(startTimestamps.Histograms)-drop] + } } else { ev.currentSamples -= totalHPointSize(histograms) if histograms != nil { histograms = histograms[:0] } + + if startTimestamps != nil { + // Clear the startTimestamps since we are clearing histograms now. + startTimestamps.Histograms = startTimestamps.Histograms[:0] + } } if mint == maxt { // Empty range: return the empty slices. - return floats, histograms + return floats, histograms, startTimestamps } soughtValueType := it.Seek(maxt) @@ -2819,6 +2871,10 @@ loop: if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } + + if startTimestamps != nil { + startTimestamps.Histograms = append(startTimestamps.Histograms, buf.AtST()) + } } case chunkenc.ValFloat: t, f := buf.At() @@ -2835,6 +2891,10 @@ loop: floats = getFPointSlice(16) } floats = append(floats, FPoint{T: t, F: f}) + + if startTimestamps != nil { + startTimestamps.Floats = append(startTimestamps.Floats, buf.AtST()) + } } } } @@ -2868,6 +2928,9 @@ loop: ev.error(ErrTooManySamples(env)) } + if startTimestamps != nil { + startTimestamps.Histograms = append(startTimestamps.Histograms, it.AtST()) + } case chunkenc.ValFloat: t, f := it.At() if t == maxt && !value.IsStaleNaN(f) { @@ -2879,10 +2942,14 @@ loop: floats = getFPointSlice(16) } floats = append(floats, FPoint{T: t, F: f}) + + if startTimestamps != nil { + startTimestamps.Floats = append(startTimestamps.Floats, it.AtST()) + } } } ev.samplesStats.UpdatePeak(ev.currentSamples) - return floats, histograms + return floats, histograms, startTimestamps } func (*evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { diff --git a/promql/functions.go b/promql/functions.go index d8c362d43b..7b3fd2d363 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -216,7 +216,12 @@ func extrapolatedRate(vals Matrix, args parser.Expressions, enh *EvalNodeHelper, firstT = samples.Histograms[0].T lastT = samples.Histograms[numSamplesMinusOne].T var newAnnos annotations.Annotations - resultHistogram, newAnnos = histogramRate(samples.Histograms, isCounter, samples.Metric, args[0].PositionRange()) + var startTimestamps []int64 + if enh.StartTimestamps != nil { + startTimestamps = enh.StartTimestamps.Histograms + } + resultHistogram, newAnnos = histogramRate(samples.Histograms, startTimestamps, isCounter, + samples.Metric, args[0].PositionRange()) annos.Merge(newAnnos) if resultHistogram == nil { // The histograms are not compatible with each other. @@ -231,12 +236,15 @@ func extrapolatedRate(vals Matrix, args parser.Expressions, enh *EvalNodeHelper, break } // Handle counter resets: - prevValue := samples.Floats[0].F - for _, currPoint := range samples.Floats[1:] { - if currPoint.F < prevValue { - resultFloat += prevValue + var startTimestamps []int64 + if enh.StartTimestamps != nil { + startTimestamps = enh.StartTimestamps.Floats + } + for i, currPoint := range samples.Floats[1:] { + prevPoint := samples.Floats[i] + if currPoint.F < prevPoint.F || i+1 < len(startTimestamps) && isStartTimestampReset(startTimestamps[i], prevPoint.T, startTimestamps[i+1], currPoint.T) { + resultFloat += prevPoint.F } - prevValue = currPoint.F } default: // TODO: add RangeTooShortWarning @@ -310,7 +318,13 @@ func extrapolatedRate(vals Matrix, args parser.Expressions, enh *EvalNodeHelper, // points[0] to be a histogram. It returns nil if any other Point in points is // not a histogram, and a warning wrapped in an annotation in that case. // Otherwise, it returns the calculated histogram and an empty annotation. -func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos posrange.PositionRange) (*histogram.FloatHistogram, annotations.Annotations) { +func histogramRate( + points []HPoint, + startTimestamps []int64, + isCounter bool, + labels labels.Labels, + pos posrange.PositionRange, +) (*histogram.FloatHistogram, annotations.Annotations) { var ( prev = points[0].H usingCustomBuckets = prev.UsesCustomBuckets() @@ -334,7 +348,7 @@ func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos po // bucket layout of the 1st sample because we do not need to look at it. if isCounter && len(points) > 1 { second := points[1].H - if second != nil && second.DetectReset(prev) { + if second != nil && (len(startTimestamps) > 1 && isStartTimestampReset(startTimestamps[0], points[0].T, startTimestamps[1], points[1].T) || second.DetectReset(prev)) { prev = &histogram.FloatHistogram{} prev.Schema = second.Schema prev.CustomValues = second.CustomValues @@ -385,9 +399,10 @@ func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos po if isCounter { // Second iteration to deal with counter resets. - for _, currPoint := range points[1:] { + for i, currPoint := range points[1:] { curr := currPoint.H - if curr.DetectReset(prev) { + // Check start timestamps first since it's potentially cheaper. + if i+1 < len(startTimestamps) && isStartTimestampReset(startTimestamps[i], points[i].T, startTimestamps[i+1], currPoint.T) || curr.DetectReset(prev) { // Counter reset conflict ignored here for the same reason as above. _, _, nhcbBoundsReconciled, err := h.Add(prev) if err != nil { @@ -409,6 +424,32 @@ func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos po return h.Compact(0), annos } +// isStartTimestampReset tells whether there was a counter reset by checking the start timestamp value. +func isStartTimestampReset(prevStartTimestamp, prevTimestamp, currStartTimestamp, currTimestamp int64) bool { + if currStartTimestamp == 0 || currStartTimestamp > currTimestamp { + // No reset if start timestamp is not set (value is 0), or if it is clearly invalid. + return false + } + + if currStartTimestamp < prevTimestamp { + return false + } + + if currStartTimestamp > prevTimestamp { + return true + } + + // If this place is reached, then it means that the current datapoint start timestamp is pointing + // to a previous datapoint. In OTel, this should only happen in two cases: + // * this is a delta series, + // * or this is a cumulative series with unknown start timestamp. + // + // This should be treated as a reset for deltas, but it is not a reset for cumulative series + // with unknown start timestamp. Thus we have to check whether the start timestamp + // of the previous datapoint is known. + return prevStartTimestamp != 0 +} + // === delta(Matrix parser.ValueTypeMatrix) (Vector, Annotations) === func funcDelta(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return extrapolatedRate(matrixVals, args, enh, false, false) @@ -426,18 +467,25 @@ func funcIncrease(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *E // === irate(node parser.ValueTypeMatrix) (Vector, Annotations) === func funcIrate(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - return instantValue(matrixVals, args, enh.Out, true) + return instantValue(matrixVals, args, enh, true) } // === idelta(node model.ValMatrix) (Vector, Annotations) === func funcIdelta(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - return instantValue(matrixVals, args, enh.Out, false) + return instantValue(matrixVals, args, enh, false) } -func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool) (Vector, annotations.Annotations) { +func instantValue(vals Matrix, args parser.Expressions, enh *EvalNodeHelper, isRate bool) (Vector, annotations.Annotations) { + type sampleWithSt struct { + Sample + + ST int64 + } + var ( samples = vals[0] - ss = make([]Sample, 0, 2) + out = enh.Out + ss = make([]sampleWithSt, 0, 2) annos annotations.Annotations ) @@ -450,24 +498,35 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool) // Add the last 2 float samples if they exist. for i := max(0, len(samples.Floats)-2); i < len(samples.Floats); i++ { - ss = append(ss, Sample{ - F: samples.Floats[i].F, - T: samples.Floats[i].T, - }) + s := sampleWithSt{ + Sample: Sample{ + F: samples.Floats[i].F, + T: samples.Floats[i].T, + }, + } + if sts := enh.StartTimestamps; sts != nil && i < len(sts.Floats) { + s.ST = sts.Floats[i] + } + ss = append(ss, s) } // Add the last 2 histogram samples into their correct position if they exist. for i := max(0, len(samples.Histograms)-2); i < len(samples.Histograms); i++ { - s := Sample{ - H: samples.Histograms[i].H, - T: samples.Histograms[i].T, + s := sampleWithSt{ + Sample: Sample{ + H: samples.Histograms[i].H, + T: samples.Histograms[i].T, + }, + } + if sts := enh.StartTimestamps; sts != nil && i < len(sts.Histograms) { + s.ST = sts.Histograms[i] } switch { case len(ss) == 0: ss = append(ss, s) case len(ss) == 1: if s.T < ss[0].T { - ss = append([]Sample{s}, ss...) + ss = append([]sampleWithSt{s}, ss...) } else { ss = append(ss, s) } @@ -493,7 +552,7 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool) } switch { case ss[1].H == nil && ss[0].H == nil: - if !isRate || !(ss[1].F < ss[0].F) { + if !isRate || !(ss[1].F < ss[0].F || isStartTimestampReset(ss[0].ST, ss[0].T, ss[1].ST, ss[1].T)) { // Gauge, or counter without reset, or counter with NaN value. resultSample.F = ss[1].F - ss[0].F } @@ -510,7 +569,7 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool) if !isRate && (ss[1].H.CounterResetHint != histogram.GaugeType || ss[0].H.CounterResetHint != histogram.GaugeType) { annos.Add(annotations.NewNativeHistogramNotGaugeWarning(getMetricName(samples.Metric), args.PositionRange())) } - if !isRate || !ss[1].H.DetectReset(ss[0].H) { + if !isRate || (!isStartTimestampReset(ss[0].ST, ss[0].T, ss[1].ST, ss[1].T) && !ss[1].H.DetectReset(ss[0].H)) { // This subtraction may deliberately include conflicting // counter resets. Counter resets are treated explicitly // in this function, so the information about @@ -539,7 +598,7 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool) } } - return append(out, resultSample), annos + return append(out, resultSample.Sample), annos } // Calculate the trend value at the given index i in raw data d. diff --git a/promql/functions_internal_test.go b/promql/functions_internal_test.go index cd170823a8..fc38fb338f 100644 --- a/promql/functions_internal_test.go +++ b/promql/functions_internal_test.go @@ -33,10 +33,10 @@ func TestHistogramRateCounterResetHint(t *testing.T) { {T: 1, H: &histogram.FloatHistogram{CounterResetHint: histogram.UnknownCounterReset, Count: 10, Sum: 10}}, } labels := labels.FromMap(map[string]string{model.MetricNameLabel: "foo"}) - fh, _ := histogramRate(points, false, labels, posrange.PositionRange{}) + fh, _ := histogramRate(points, nil, false, labels, posrange.PositionRange{}) require.Equal(t, histogram.GaugeType, fh.CounterResetHint) - fh, _ = histogramRate(points, true, labels, posrange.PositionRange{}) + fh, _ = histogramRate(points, nil, true, labels, posrange.PositionRange{}) require.Equal(t, histogram.GaugeType, fh.CounterResetHint) } diff --git a/promql/promqltest/test.go b/promql/promqltest/test.go index 2e7b47f103..85c0c4f88a 100644 --- a/promql/promqltest/test.go +++ b/promql/promqltest/test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/almost" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/convertnhcb" @@ -107,6 +108,7 @@ func NewTestEngine(tb testing.TB, enablePerStepStats bool, lookbackDelta time.Du EnablePerStepStats: enablePerStepStats, LookbackDelta: lookbackDelta, EnableDelayedNameRemoval: true, + UseStartTimestamps: true, Parser: parser.NewParser(TestParserOpts), }) } @@ -156,7 +158,12 @@ func GetBuiltInExprs() ([]string, error) { // RunBuiltinTests runs an acceptance test suite against the provided engine. func RunBuiltinTests(t TBRun, engine promql.QueryEngine) { - RunBuiltinTestsWithStorage(t, engine, newTestStorage) + RunBuiltinTestsWithStorage(t, engine, func(t testing.TB) storage.Storage { + return teststorage.New(t, func(opt *tsdb.Options) { + opt.EnableSTStorage = true + opt.EnableXOR2Encoding = true + }) + }) } // RunBuiltinTestsWithStorage runs an acceptance test suite against the provided engine and storage. diff --git a/promql/promqltest/testdata/start_timestamps.test b/promql/promqltest/testdata/start_timestamps.test new file mode 100644 index 0000000000..c7148e5cf3 --- /dev/null +++ b/promql/promqltest/testdata/start_timestamps.test @@ -0,0 +1,162 @@ +# Tests for rate(), irate() and increase() on cumulative. +load 1m + cumulative{type="no_st"} _ _ 0+60x14 + cumulative{type="st_resets"}@st _ _ -30s-1mx2 -1m-1mx2 -59999ms-1mx2 -1s-1mx2 -1ms-1mx2 + cumulative{type="st_resets"} _ _ 0+60x14 + cumulative{type="normalized_resets"} _ _ 0 60 120 300 360 420 780 840 900 1440 1500 1560 2280 2340 2400 + +eval range from 7m to 15m step 1m increase(cumulative[5m]) + {type="no_st"} 300 300 300 300 300 300 300 300 300 + {type="st_resets"} 420 780 675 675 1275 900 900 1725 1125 + {type="normalized_resets"} 420 780 675 675 1275 900 900 1725 1125 + +eval range from 7m to 15m step 1m rate(cumulative[5m]) + {type="no_st"} 1 1 1 1 1 1 1 1 1 + {type="st_resets"} 1.40 2.60 2.25 2.25 4.25 3.00 3.00 5.75 3.75 + {type="normalized_resets"} 1.40 2.60 2.25 2.25 4.25 3.00 3.00 5.75 3.75 + +eval range from 7m to 15m step 1m irate(cumulative[5m]) + {type="no_st"} 1 1 1 1 1 1 1 1 1 + {type="st_resets"} 1 6 1 1 9 1 1 12 1 + {type="normalized_resets"} 1 6 1 1 9 1 1 12 1 + +# Simulate anchored vector selection for easier manual check. +eval range from 7m to 15m step 1m round(increase(cumulative[5m1ms])) + {type="no_st"} 300 300 300 300 300 300 300 300 300 + {type="st_resets"} 420 720 720 600 1080 1080 780 1440 1440 + {type="normalized_resets"} 420 720 720 600 1080 1080 780 1440 1440 + +# TODO actual anchored +# eval range from 7m to 15m step 1m increase(cumulative[5m] anchored) +# {type="no_st"} 300 300 300 300 300 300 300 300 300 +# {type="st_resets"} 420 720 720 600 1080 1080 780 1440 1440 +# {type="normalized_resets"} 420 720 720 600 1080 1080 780 1440 1440 + +# Subqueries cut the propagation of start timestamps. +eval range from 7m to 15m step 1m increase(cumulative[5m:1m]) + {type="no_st"} 300x8 + {type="st_resets"} 300x8 + {type="normalized_resets"} 420 780 675 675 1275 900 900 1725 1125 + +clear + +# Tests for rate(), irate() and increase() on deltas. Includes various approaches for start timestamps: +# * ST_curr = T_prev (OTel) +# * ST_curr = T_prev (GCP) +# * ST = T - 1ms +# * ST = T +load 1m + delta{type="no_st"} _ _ 60x15 + delta{type="otel"}@st _ _ -1mx15 + delta{type="otel"} _ _ 60x15 + delta{type="gcp"}@st _ _ -59999msx15 + delta{type="gcp"} _ _ 60x15 + delta{type="minimal_range"}@st _ _ -1msx15 + delta{type="minimal_range"} _ _ 60x15 + delta{type="empty_range"}@st _ _ -0msx15 + delta{type="empty_range"} _ _ 60x15 + delta{type="normalized_resets"} _ _ 60+60x15 + +eval range from 10m to 15m step 1m increase(delta[5m]) + {type="no_st"} 0x5 + {type="otel"} 300x5 + {type="gcp"} 300x5 + {type="minimal_range"} 300x5 + {type="empty_range"} 300x5 + {type="normalized_resets"} 300x5 + +eval range from 10m to 15m step 1m rate(delta[5m]) + {type="no_st"} 0x5 + {type="otel"} 1x5 + {type="gcp"} 1x5 + {type="minimal_range"} 1x5 + {type="empty_range"} 1x5 + {type="normalized_resets"} 1x5 + +eval range from 10m to 15m step 1m irate(delta[5m]) + {type="no_st"} 0x5 + {type="otel"} 1x5 + {type="gcp"} 1x5 + {type="minimal_range"} 1x5 + {type="empty_range"} 1x5 + {type="normalized_resets"} 1x5 + +# Subqueries cuts the propagation of start timestamps +eval range from 10m to 15m step 1m increase(delta[5m:1m]) + {type="no_st"} 0x5 + {type="otel"} 0x5 + {type="gcp"} 0x5 + {type="minimal_range"} 0x5 + {type="empty_range"} 0x5 + {type="normalized_resets"} 300x5 + +clear + +# Test for cumulative with unknow start timestamp, which is described in OTel spec. It is easy to incorrectly treat +# second datapoint in such timeseries as delta, producing incorrect result. These can be distinguished by: +# * ST_curr == T_prev && ST_prev == 0 -> cumulative with unknown start, no reset +# * ST_curr == T_prev && ST_prev != 0 -> delta datapoint, treat as a reset +load 1m + series{type="otel_cumulative_unknown_start"}@st _ _ _ -1m + series{type="otel_cumulative_unknown_start"} _ _ 1 1 + series{type="otel_delta"}@st _ _ -1m -1m + series{type="otel_delta"} _ _ 1 1 + +eval instant at 3m round(increase(series[1m1ms])) + {type="otel_cumulative_unknown_start"} 0 + {type="otel_delta"} 1 + +clear + +# Tests for rate(), irate() and increase() on cumulative histograms. Includes various approaches for start timestamps: +# * ST_curr = T_prev (OTel) +# * ST_curr = T_prev (GCP) +# * ST = T - 1ms +# * ST = T +# TODO: start timestamps doesn't work for histograms yet, because the tsdb support is missing. +load 1m + cumulative@st _ _ -30s-1mx2 -1m-1mx2 -59999ms-1mx2 -1s-1mx2 -1ms-1mx2 + cumulative _ _ {{schema:0 sum:0 count:0}}+{{schema:0 sum:60 count:60}}x14 + +eval range from 7m to 15m step 1m increase(cumulative[5m]) + {} {{count:300 sum:300}}x8 + +eval range from 7m to 15m step 1m rate(cumulative[5m]) + {} {{count:1 sum:1}}x8 + +eval range from 7m to 15m step 1m irate(cumulative[5m]) + {} {{count:1 sum:1}}x8 + +clear + +# Tests for rate(), irate() and increase() on delta histograms. +# TODO: start timestamps doesn't work for histograms yet, because the tsdb support is missing. +load 1m + delta{type="otel"}@st _ _ -1mx15 + delta{type="otel"} _ _ {{schema:0 sum:60 count:60}}x15 + delta{type="gcp"}@st _ _ -59999msx15 + delta{type="gcp"} _ _ {{schema:0 sum:60 count:60}}x15 + delta{type="minimal_range"}@st _ _ -1msx15 + delta{type="minimal_range"} _ _ {{schema:0 sum:60 count:60}}x15 + delta{type="empty_range"}@st _ _ -0msx15 + delta{type="empty_range"} _ _ {{schema:0 sum:60 count:60}}x15 + +eval range from 10m to 15m step 1m increase(delta[5m]) + {type="otel"} {{sum:0 count:0}}x5 + {type="gcp"} {{sum:0 count:0}}x5 + {type="minimal_range"} {{sum:0 count:0}}x5 + {type="empty_range"} {{sum:0 count:0}}x5 + +eval range from 10m to 15m step 1m rate(delta[5m]) + {type="otel"} {{schema:0 sum:0 count:0}}x5 + {type="gcp"} {{schema:0 sum:0 count:0}}x5 + {type="minimal_range"} {{schema:0 sum:0 count:0}}x5 + {type="empty_range"} {{schema:0 sum:0 count:0}}x5 + +eval range from 10m to 15m step 1m irate(delta[5m]) + {type="otel"} {{schema:0 sum:0 count:0}}x5 + {type="gcp"} {{schema:0 sum:0 count:0}}x5 + {type="minimal_range"} {{schema:0 sum:0 count:0}}x5 + {type="empty_range"} {{schema:0 sum:0 count:0}}x5 + +clear diff --git a/promql/value.go b/promql/value.go index 17afdfc410..e1ff393bdd 100644 --- a/promql/value.go +++ b/promql/value.go @@ -397,6 +397,24 @@ func (r *Result) String() string { return r.Value.String() } +// StartTimestamps stores sample start timestamps aligned with points. +type StartTimestamps struct { + // Floats stores start timestamps for float samples. + Floats []int64 + // Histograms stores start timestamps for histogram samples. + Histograms []int64 +} + +// Reset clears the start timestamps while keeping the slice capacity for reuse. +func (st *StartTimestamps) Reset() { + if st.Floats != nil { + st.Floats = st.Floats[:0] + } + if st.Histograms != nil { + st.Histograms = st.Histograms[:0] + } +} + // StorageSeries simulates promql.Series as storage.Series. type StorageSeries struct { series Series