mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-05 04:16:15 +02:00
PromQL: use start timestamps for rate()-like calculations (#18344)
* PromQL: use start timestamps for rate() and increase() calculations * implement start timestamps reset detection for `irate()` * add `start_timestamps.test` * add a couple of tests with subqueries * add a test for cumulative with unknown start timestamp * update `enable-features` CLI parameter description * `make cli-documentation` Signed-off-by: vpranckaitis <vpranckaitis@gmail.com> --------- Signed-off-by: Vilius Pranckaitis <vpranckaitis@gmail.com> Signed-off-by: vpranckaitis <vpranckaitis@gmail.com>
This commit is contained in:
parent
e0ea05665a
commit
321fe34aab
@ -214,6 +214,7 @@ type flagConfig struct {
|
||||
// for ease of use.
|
||||
enablePerStepStats bool
|
||||
enableConcurrentRuleEval bool
|
||||
useStartTimestamps bool
|
||||
|
||||
prometheusURL string
|
||||
corsRegexString string
|
||||
@ -293,6 +294,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
||||
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
|
||||
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
|
||||
logger.Info("Experimental start timestamp storage enabled. OpenMetrics 1.0 parsing will parse <metric>_created metrics as ST instead of normal sample. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
|
||||
case "use-start-timestamps":
|
||||
c.useStartTimestamps = true
|
||||
logger.Info("Experimental usage of start timestamps in PromQL engine is enabled.")
|
||||
case "delayed-compaction":
|
||||
c.tsdb.EnableDelayedCompaction = true
|
||||
logger.Info("Experimental delayed compaction is enabled.")
|
||||
@ -610,7 +614,7 @@ func main() {
|
||||
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
|
||||
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
|
||||
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-start-timestamps, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
StringsVar(&cfg.featureList)
|
||||
|
||||
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)
|
||||
@ -951,6 +955,7 @@ func main() {
|
||||
EnablePerStepStats: cfg.enablePerStepStats,
|
||||
EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval,
|
||||
EnableTypeAndUnitLabels: cfg.scrape.EnableTypeAndUnitLabels,
|
||||
UseStartTimestamps: cfg.useStartTimestamps,
|
||||
FeatureRegistry: features.DefaultRegistry,
|
||||
Parser: promqlParser,
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ The Prometheus monitoring server
|
||||
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
|
||||
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
|
||||
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
|
||||
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, extra-scrape-metrics, memory-snapshot-on-shutdown, metadata-wal-records, old-ui, otlp-deltatocumulative, otlp-native-delta-ingestion, promql-binop-fill-modifiers, promql-delayed-name-removal, promql-duration-expr, promql-experimental-functions, promql-extended-range-selectors, promql-per-step-stats, st-storage, type-and-unit-labels, use-start-timestamps, use-uncached-io, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--agent</code> | Run Prometheus in 'Agent mode'. | |
|
||||
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
|
||||
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |
|
||||
|
||||
@ -101,6 +101,12 @@ Besides enabling this feature in Prometheus, start timestamps need to be exposed
|
||||
> * ST for native histograms and NHCBs are not yet implemented (see [#18315](https://github.com/prometheus/prometheus/issues/18315)).
|
||||
> * PromQL use of ST is out of scope of this feature.
|
||||
|
||||
## Start timestamp (ST) usage in PromQL functions
|
||||
|
||||
`--enable-feature=use-start-timestamps`
|
||||
|
||||
Enables the use of start timestamps (ST) in PromQL functions such as `rate()`, `irate()`, and `increase()`. This feature doesn't currently work with extended range selectors (`promql-extended-range-selectors`).
|
||||
|
||||
## Concurrent evaluation of independent rules
|
||||
|
||||
`--enable-feature=concurrent-rule-eval`
|
||||
|
||||
@ -333,6 +333,9 @@ type EngineOpts struct {
|
||||
// EnableTypeAndUnitLabels will allow PromQL Engine to make decisions based on the type and unit labels.
|
||||
EnableTypeAndUnitLabels bool
|
||||
|
||||
// UseStartTimestamps enables start timestamp usage in functions such as rate().
|
||||
UseStartTimestamps bool
|
||||
|
||||
// FeatureRegistry is the registry for tracking enabled/disabled features.
|
||||
FeatureRegistry features.Collector
|
||||
|
||||
@ -357,6 +360,7 @@ type Engine struct {
|
||||
enablePerStepStats bool
|
||||
enableDelayedNameRemoval bool
|
||||
enableTypeAndUnitLabels bool
|
||||
useStartTimestamps bool
|
||||
parser parser.Parser
|
||||
}
|
||||
|
||||
@ -486,6 +490,7 @@ func NewEngine(opts EngineOpts) *Engine {
|
||||
enablePerStepStats: opts.EnablePerStepStats,
|
||||
enableDelayedNameRemoval: opts.EnableDelayedNameRemoval,
|
||||
enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels,
|
||||
useStartTimestamps: opts.UseStartTimestamps,
|
||||
parser: opts.Parser,
|
||||
}
|
||||
}
|
||||
@ -801,6 +806,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
|
||||
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
|
||||
enableDelayedNameRemoval: ng.enableDelayedNameRemoval,
|
||||
enableTypeAndUnitLabels: ng.enableTypeAndUnitLabels,
|
||||
useStartTimestamps: ng.useStartTimestamps,
|
||||
querier: querier,
|
||||
}
|
||||
query.sampleStats.InitStepTracking(start, start, 1)
|
||||
@ -861,6 +867,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
|
||||
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
|
||||
enableDelayedNameRemoval: ng.enableDelayedNameRemoval,
|
||||
enableTypeAndUnitLabels: ng.enableTypeAndUnitLabels,
|
||||
useStartTimestamps: ng.useStartTimestamps,
|
||||
querier: querier,
|
||||
}
|
||||
query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval)
|
||||
@ -1148,6 +1155,7 @@ type evaluator struct {
|
||||
noStepSubqueryIntervalFn func(rangeMillis int64) int64
|
||||
enableDelayedNameRemoval bool
|
||||
enableTypeAndUnitLabels bool
|
||||
useStartTimestamps bool
|
||||
querier storage.Querier
|
||||
}
|
||||
|
||||
@ -1239,6 +1247,9 @@ type EvalNodeHelper struct {
|
||||
|
||||
// Additional options for the evaluation.
|
||||
enableDelayedNameRemoval bool
|
||||
|
||||
// StartTimestamps optionally provides sample start timestamps aligned with matrix samples.
|
||||
StartTimestamps *StartTimestamps
|
||||
}
|
||||
|
||||
func (enh *EvalNodeHelper) resetSigsPresent() []bool {
|
||||
@ -1733,7 +1744,7 @@ func (ev *evaluator) smoothSeries(series []storage.Series, offset time.Duration)
|
||||
matrixStart := dataTS - lb
|
||||
matrixEnd := dataTS + lb
|
||||
|
||||
floats, hists = ev.matrixIterSlice(it, matrixStart, matrixEnd, floats, hists)
|
||||
floats, hists, _ = ev.matrixIterSlice(it, matrixStart, matrixEnd, floats, hists, nil)
|
||||
if len(floats) == 0 && len(hists) == 0 {
|
||||
continue
|
||||
}
|
||||
@ -2088,6 +2099,12 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
it := storage.NewBuffer(bufferRange)
|
||||
var chkIter chunkenc.Iterator
|
||||
|
||||
var startTimestamps *StartTimestamps
|
||||
if ev.useStartTimestamps && (e.Func.Name == "rate" || e.Func.Name == "irate" || e.Func.Name == "increase") {
|
||||
// TODO: consider pooling this.
|
||||
startTimestamps = &StartTimestamps{}
|
||||
}
|
||||
|
||||
// The last_over_time and first_over_time functions act like
|
||||
// offset; thus, they should keep the metric name. For all the
|
||||
// other range vector functions, the only change needed is to
|
||||
@ -2107,6 +2124,9 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
if histograms != nil {
|
||||
histograms = histograms[:0]
|
||||
}
|
||||
if startTimestamps != nil {
|
||||
startTimestamps.Reset()
|
||||
}
|
||||
chkIter = s.Iterator(chkIter)
|
||||
it.Reset(chkIter)
|
||||
|
||||
@ -2153,7 +2173,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
mint -= durationMilliseconds(ev.lookbackDelta)
|
||||
maxt += durationMilliseconds(ev.lookbackDelta)
|
||||
}
|
||||
floats, histograms = ev.matrixIterSlice(it, mint, maxt, floats, histograms)
|
||||
floats, histograms, startTimestamps = ev.matrixIterSlice(it, mint, maxt, floats, histograms, startTimestamps)
|
||||
}
|
||||
if len(floats)+len(histograms) == 0 {
|
||||
continue
|
||||
@ -2167,6 +2187,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
inMatrix[0].Floats = floats
|
||||
inMatrix[0].Histograms = histograms
|
||||
enh.Ts = ts
|
||||
enh.StartTimestamps = startTimestamps
|
||||
|
||||
// Make the function call.
|
||||
outVec, annos := call(vectorVals, inMatrix, e.Args, enh)
|
||||
@ -2382,6 +2403,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
|
||||
enableDelayedNameRemoval: ev.enableDelayedNameRemoval,
|
||||
enableTypeAndUnitLabels: ev.enableTypeAndUnitLabels,
|
||||
useStartTimestamps: ev.useStartTimestamps,
|
||||
querier: ev.querier,
|
||||
}
|
||||
|
||||
@ -2423,6 +2445,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
|
||||
enableDelayedNameRemoval: ev.enableDelayedNameRemoval,
|
||||
enableTypeAndUnitLabels: ev.enableTypeAndUnitLabels,
|
||||
useStartTimestamps: ev.useStartTimestamps,
|
||||
querier: ev.querier,
|
||||
}
|
||||
res, ws := newEv.eval(ctx, e.Expr)
|
||||
@ -2690,7 +2713,7 @@ func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSele
|
||||
Metric: series[i].Labels(),
|
||||
}
|
||||
|
||||
ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil)
|
||||
ss.Floats, ss.Histograms, _ = ev.matrixIterSlice(it, mint, maxt, nil, nil, nil)
|
||||
switch {
|
||||
case vs.Anchored:
|
||||
if ss.Histograms != nil {
|
||||
@ -2724,10 +2747,17 @@ func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSele
|
||||
// values). Any such points falling before mint are discarded; points that fall
|
||||
// into the [mint, maxt] range are retained; only points with later timestamps
|
||||
// are populated from the iterator.
|
||||
//
|
||||
// This function will also optionally track corresponding StartTimestamps for
|
||||
// each float/histogram datapoint if the passed-in startTimestamps is non-nil.
|
||||
// In this case, the caller must always pass in startTimestamps with fields
|
||||
// whose lengths exactly match those of the "floats" and "histograms" slices.
|
||||
// Typically this is accomplished by passing in either all empty slices or the
|
||||
// values returned by a previous call.
|
||||
func (ev *evaluator) matrixIterSlice(
|
||||
it *storage.BufferedSeriesIterator, mint, maxt int64,
|
||||
floats []FPoint, histograms []HPoint,
|
||||
) ([]FPoint, []HPoint) {
|
||||
floats []FPoint, histograms []HPoint, startTimestamps *StartTimestamps,
|
||||
) ([]FPoint, []HPoint, *StartTimestamps) {
|
||||
mintFloats, mintHistograms := mint, mint
|
||||
|
||||
// First floats...
|
||||
@ -2745,11 +2775,22 @@ func (ev *evaluator) matrixIterSlice(
|
||||
floats = floats[:len(floats)-drop]
|
||||
// Only append points with timestamps after the last timestamp we have.
|
||||
mintFloats = floats[len(floats)-1].T
|
||||
|
||||
if startTimestamps != nil {
|
||||
// Truncate startTimestamps at the same drop point.
|
||||
copy(startTimestamps.Floats, startTimestamps.Floats[drop:])
|
||||
startTimestamps.Floats = startTimestamps.Floats[:len(startTimestamps.Floats)-drop]
|
||||
}
|
||||
} else {
|
||||
ev.currentSamples -= len(floats)
|
||||
if floats != nil {
|
||||
floats = floats[:0]
|
||||
}
|
||||
|
||||
if startTimestamps != nil {
|
||||
// Clear the startTimestamps since we are clearing floats now.
|
||||
startTimestamps.Floats = startTimestamps.Floats[:0]
|
||||
}
|
||||
}
|
||||
|
||||
// ...then the same for histograms. TODO(beorn7): Use generics?
|
||||
@ -2772,16 +2813,27 @@ func (ev *evaluator) matrixIterSlice(
|
||||
ev.currentSamples -= totalHPointSize(histograms)
|
||||
// Only append points with timestamps after the last timestamp we have.
|
||||
mintHistograms = histograms[len(histograms)-1].T
|
||||
|
||||
if startTimestamps != nil {
|
||||
// Truncate startTimestamps at the same drop point.
|
||||
copy(startTimestamps.Histograms, startTimestamps.Histograms[drop:])
|
||||
startTimestamps.Histograms = startTimestamps.Histograms[:len(startTimestamps.Histograms)-drop]
|
||||
}
|
||||
} else {
|
||||
ev.currentSamples -= totalHPointSize(histograms)
|
||||
if histograms != nil {
|
||||
histograms = histograms[:0]
|
||||
}
|
||||
|
||||
if startTimestamps != nil {
|
||||
// Clear the startTimestamps since we are clearing histograms now.
|
||||
startTimestamps.Histograms = startTimestamps.Histograms[:0]
|
||||
}
|
||||
}
|
||||
|
||||
if mint == maxt {
|
||||
// Empty range: return the empty slices.
|
||||
return floats, histograms
|
||||
return floats, histograms, startTimestamps
|
||||
}
|
||||
|
||||
soughtValueType := it.Seek(maxt)
|
||||
@ -2819,6 +2871,10 @@ loop:
|
||||
if ev.currentSamples > ev.maxSamples {
|
||||
ev.error(ErrTooManySamples(env))
|
||||
}
|
||||
|
||||
if startTimestamps != nil {
|
||||
startTimestamps.Histograms = append(startTimestamps.Histograms, buf.AtST())
|
||||
}
|
||||
}
|
||||
case chunkenc.ValFloat:
|
||||
t, f := buf.At()
|
||||
@ -2835,6 +2891,10 @@ loop:
|
||||
floats = getFPointSlice(16)
|
||||
}
|
||||
floats = append(floats, FPoint{T: t, F: f})
|
||||
|
||||
if startTimestamps != nil {
|
||||
startTimestamps.Floats = append(startTimestamps.Floats, buf.AtST())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2868,6 +2928,9 @@ loop:
|
||||
ev.error(ErrTooManySamples(env))
|
||||
}
|
||||
|
||||
if startTimestamps != nil {
|
||||
startTimestamps.Histograms = append(startTimestamps.Histograms, it.AtST())
|
||||
}
|
||||
case chunkenc.ValFloat:
|
||||
t, f := it.At()
|
||||
if t == maxt && !value.IsStaleNaN(f) {
|
||||
@ -2879,10 +2942,14 @@ loop:
|
||||
floats = getFPointSlice(16)
|
||||
}
|
||||
floats = append(floats, FPoint{T: t, F: f})
|
||||
|
||||
if startTimestamps != nil {
|
||||
startTimestamps.Floats = append(startTimestamps.Floats, it.AtST())
|
||||
}
|
||||
}
|
||||
}
|
||||
ev.samplesStats.UpdatePeak(ev.currentSamples)
|
||||
return floats, histograms
|
||||
return floats, histograms, startTimestamps
|
||||
}
|
||||
|
||||
func (*evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
|
||||
|
||||
@ -216,7 +216,12 @@ func extrapolatedRate(vals Matrix, args parser.Expressions, enh *EvalNodeHelper,
|
||||
firstT = samples.Histograms[0].T
|
||||
lastT = samples.Histograms[numSamplesMinusOne].T
|
||||
var newAnnos annotations.Annotations
|
||||
resultHistogram, newAnnos = histogramRate(samples.Histograms, isCounter, samples.Metric, args[0].PositionRange())
|
||||
var startTimestamps []int64
|
||||
if enh.StartTimestamps != nil {
|
||||
startTimestamps = enh.StartTimestamps.Histograms
|
||||
}
|
||||
resultHistogram, newAnnos = histogramRate(samples.Histograms, startTimestamps, isCounter,
|
||||
samples.Metric, args[0].PositionRange())
|
||||
annos.Merge(newAnnos)
|
||||
if resultHistogram == nil {
|
||||
// The histograms are not compatible with each other.
|
||||
@ -231,12 +236,15 @@ func extrapolatedRate(vals Matrix, args parser.Expressions, enh *EvalNodeHelper,
|
||||
break
|
||||
}
|
||||
// Handle counter resets:
|
||||
prevValue := samples.Floats[0].F
|
||||
for _, currPoint := range samples.Floats[1:] {
|
||||
if currPoint.F < prevValue {
|
||||
resultFloat += prevValue
|
||||
var startTimestamps []int64
|
||||
if enh.StartTimestamps != nil {
|
||||
startTimestamps = enh.StartTimestamps.Floats
|
||||
}
|
||||
for i, currPoint := range samples.Floats[1:] {
|
||||
prevPoint := samples.Floats[i]
|
||||
if currPoint.F < prevPoint.F || i+1 < len(startTimestamps) && isStartTimestampReset(startTimestamps[i], prevPoint.T, startTimestamps[i+1], currPoint.T) {
|
||||
resultFloat += prevPoint.F
|
||||
}
|
||||
prevValue = currPoint.F
|
||||
}
|
||||
default:
|
||||
// TODO: add RangeTooShortWarning
|
||||
@ -310,7 +318,13 @@ func extrapolatedRate(vals Matrix, args parser.Expressions, enh *EvalNodeHelper,
|
||||
// points[0] to be a histogram. It returns nil if any other Point in points is
|
||||
// not a histogram, and a warning wrapped in an annotation in that case.
|
||||
// Otherwise, it returns the calculated histogram and an empty annotation.
|
||||
func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos posrange.PositionRange) (*histogram.FloatHistogram, annotations.Annotations) {
|
||||
func histogramRate(
|
||||
points []HPoint,
|
||||
startTimestamps []int64,
|
||||
isCounter bool,
|
||||
labels labels.Labels,
|
||||
pos posrange.PositionRange,
|
||||
) (*histogram.FloatHistogram, annotations.Annotations) {
|
||||
var (
|
||||
prev = points[0].H
|
||||
usingCustomBuckets = prev.UsesCustomBuckets()
|
||||
@ -334,7 +348,7 @@ func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos po
|
||||
// bucket layout of the 1st sample because we do not need to look at it.
|
||||
if isCounter && len(points) > 1 {
|
||||
second := points[1].H
|
||||
if second != nil && second.DetectReset(prev) {
|
||||
if second != nil && (len(startTimestamps) > 1 && isStartTimestampReset(startTimestamps[0], points[0].T, startTimestamps[1], points[1].T) || second.DetectReset(prev)) {
|
||||
prev = &histogram.FloatHistogram{}
|
||||
prev.Schema = second.Schema
|
||||
prev.CustomValues = second.CustomValues
|
||||
@ -385,9 +399,10 @@ func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos po
|
||||
|
||||
if isCounter {
|
||||
// Second iteration to deal with counter resets.
|
||||
for _, currPoint := range points[1:] {
|
||||
for i, currPoint := range points[1:] {
|
||||
curr := currPoint.H
|
||||
if curr.DetectReset(prev) {
|
||||
// Check start timestamps first since it's potentially cheaper.
|
||||
if i+1 < len(startTimestamps) && isStartTimestampReset(startTimestamps[i], points[i].T, startTimestamps[i+1], currPoint.T) || curr.DetectReset(prev) {
|
||||
// Counter reset conflict ignored here for the same reason as above.
|
||||
_, _, nhcbBoundsReconciled, err := h.Add(prev)
|
||||
if err != nil {
|
||||
@ -409,6 +424,32 @@ func histogramRate(points []HPoint, isCounter bool, labels labels.Labels, pos po
|
||||
return h.Compact(0), annos
|
||||
}
|
||||
|
||||
// isStartTimestampReset tells whether there was a counter reset by checking the start timestamp value.
|
||||
func isStartTimestampReset(prevStartTimestamp, prevTimestamp, currStartTimestamp, currTimestamp int64) bool {
|
||||
if currStartTimestamp == 0 || currStartTimestamp > currTimestamp {
|
||||
// No reset if start timestamp is not set (value is 0), or if it is clearly invalid.
|
||||
return false
|
||||
}
|
||||
|
||||
if currStartTimestamp < prevTimestamp {
|
||||
return false
|
||||
}
|
||||
|
||||
if currStartTimestamp > prevTimestamp {
|
||||
return true
|
||||
}
|
||||
|
||||
// If this place is reached, then it means that the current datapoint start timestamp is pointing
|
||||
// to a previous datapoint. In OTel, this should only happen in two cases:
|
||||
// * this is a delta series,
|
||||
// * or this is a cumulative series with unknown start timestamp.
|
||||
//
|
||||
// This should be treated as a reset for deltas, but it is not a reset for cumulative series
|
||||
// with unknown start timestamp. Thus we have to check whether the start timestamp
|
||||
// of the previous datapoint is known.
|
||||
return prevStartTimestamp != 0
|
||||
}
|
||||
|
||||
// === delta(Matrix parser.ValueTypeMatrix) (Vector, Annotations) ===
|
||||
func funcDelta(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
|
||||
return extrapolatedRate(matrixVals, args, enh, false, false)
|
||||
@ -426,18 +467,25 @@ func funcIncrease(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *E
|
||||
|
||||
// === irate(node parser.ValueTypeMatrix) (Vector, Annotations) ===
|
||||
func funcIrate(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
|
||||
return instantValue(matrixVals, args, enh.Out, true)
|
||||
return instantValue(matrixVals, args, enh, true)
|
||||
}
|
||||
|
||||
// === idelta(node model.ValMatrix) (Vector, Annotations) ===
|
||||
func funcIdelta(_ []Vector, matrixVals Matrix, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
|
||||
return instantValue(matrixVals, args, enh.Out, false)
|
||||
return instantValue(matrixVals, args, enh, false)
|
||||
}
|
||||
|
||||
func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool) (Vector, annotations.Annotations) {
|
||||
func instantValue(vals Matrix, args parser.Expressions, enh *EvalNodeHelper, isRate bool) (Vector, annotations.Annotations) {
|
||||
type sampleWithSt struct {
|
||||
Sample
|
||||
|
||||
ST int64
|
||||
}
|
||||
|
||||
var (
|
||||
samples = vals[0]
|
||||
ss = make([]Sample, 0, 2)
|
||||
out = enh.Out
|
||||
ss = make([]sampleWithSt, 0, 2)
|
||||
annos annotations.Annotations
|
||||
)
|
||||
|
||||
@ -450,24 +498,35 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool)
|
||||
|
||||
// Add the last 2 float samples if they exist.
|
||||
for i := max(0, len(samples.Floats)-2); i < len(samples.Floats); i++ {
|
||||
ss = append(ss, Sample{
|
||||
F: samples.Floats[i].F,
|
||||
T: samples.Floats[i].T,
|
||||
})
|
||||
s := sampleWithSt{
|
||||
Sample: Sample{
|
||||
F: samples.Floats[i].F,
|
||||
T: samples.Floats[i].T,
|
||||
},
|
||||
}
|
||||
if sts := enh.StartTimestamps; sts != nil && i < len(sts.Floats) {
|
||||
s.ST = sts.Floats[i]
|
||||
}
|
||||
ss = append(ss, s)
|
||||
}
|
||||
|
||||
// Add the last 2 histogram samples into their correct position if they exist.
|
||||
for i := max(0, len(samples.Histograms)-2); i < len(samples.Histograms); i++ {
|
||||
s := Sample{
|
||||
H: samples.Histograms[i].H,
|
||||
T: samples.Histograms[i].T,
|
||||
s := sampleWithSt{
|
||||
Sample: Sample{
|
||||
H: samples.Histograms[i].H,
|
||||
T: samples.Histograms[i].T,
|
||||
},
|
||||
}
|
||||
if sts := enh.StartTimestamps; sts != nil && i < len(sts.Histograms) {
|
||||
s.ST = sts.Histograms[i]
|
||||
}
|
||||
switch {
|
||||
case len(ss) == 0:
|
||||
ss = append(ss, s)
|
||||
case len(ss) == 1:
|
||||
if s.T < ss[0].T {
|
||||
ss = append([]Sample{s}, ss...)
|
||||
ss = append([]sampleWithSt{s}, ss...)
|
||||
} else {
|
||||
ss = append(ss, s)
|
||||
}
|
||||
@ -493,7 +552,7 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool)
|
||||
}
|
||||
switch {
|
||||
case ss[1].H == nil && ss[0].H == nil:
|
||||
if !isRate || !(ss[1].F < ss[0].F) {
|
||||
if !isRate || !(ss[1].F < ss[0].F || isStartTimestampReset(ss[0].ST, ss[0].T, ss[1].ST, ss[1].T)) {
|
||||
// Gauge, or counter without reset, or counter with NaN value.
|
||||
resultSample.F = ss[1].F - ss[0].F
|
||||
}
|
||||
@ -510,7 +569,7 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool)
|
||||
if !isRate && (ss[1].H.CounterResetHint != histogram.GaugeType || ss[0].H.CounterResetHint != histogram.GaugeType) {
|
||||
annos.Add(annotations.NewNativeHistogramNotGaugeWarning(getMetricName(samples.Metric), args.PositionRange()))
|
||||
}
|
||||
if !isRate || !ss[1].H.DetectReset(ss[0].H) {
|
||||
if !isRate || (!isStartTimestampReset(ss[0].ST, ss[0].T, ss[1].ST, ss[1].T) && !ss[1].H.DetectReset(ss[0].H)) {
|
||||
// This subtraction may deliberately include conflicting
|
||||
// counter resets. Counter resets are treated explicitly
|
||||
// in this function, so the information about
|
||||
@ -539,7 +598,7 @@ func instantValue(vals Matrix, args parser.Expressions, out Vector, isRate bool)
|
||||
}
|
||||
}
|
||||
|
||||
return append(out, resultSample), annos
|
||||
return append(out, resultSample.Sample), annos
|
||||
}
|
||||
|
||||
// Calculate the trend value at the given index i in raw data d.
|
||||
|
||||
@ -33,10 +33,10 @@ func TestHistogramRateCounterResetHint(t *testing.T) {
|
||||
{T: 1, H: &histogram.FloatHistogram{CounterResetHint: histogram.UnknownCounterReset, Count: 10, Sum: 10}},
|
||||
}
|
||||
labels := labels.FromMap(map[string]string{model.MetricNameLabel: "foo"})
|
||||
fh, _ := histogramRate(points, false, labels, posrange.PositionRange{})
|
||||
fh, _ := histogramRate(points, nil, false, labels, posrange.PositionRange{})
|
||||
require.Equal(t, histogram.GaugeType, fh.CounterResetHint)
|
||||
|
||||
fh, _ = histogramRate(points, true, labels, posrange.PositionRange{})
|
||||
fh, _ = histogramRate(points, nil, true, labels, posrange.PositionRange{})
|
||||
require.Equal(t, histogram.GaugeType, fh.CounterResetHint)
|
||||
}
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/parser/posrange"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/util/almost"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
"github.com/prometheus/prometheus/util/convertnhcb"
|
||||
@ -107,6 +108,7 @@ func NewTestEngine(tb testing.TB, enablePerStepStats bool, lookbackDelta time.Du
|
||||
EnablePerStepStats: enablePerStepStats,
|
||||
LookbackDelta: lookbackDelta,
|
||||
EnableDelayedNameRemoval: true,
|
||||
UseStartTimestamps: true,
|
||||
Parser: parser.NewParser(TestParserOpts),
|
||||
})
|
||||
}
|
||||
@ -156,7 +158,12 @@ func GetBuiltInExprs() ([]string, error) {
|
||||
|
||||
// RunBuiltinTests runs an acceptance test suite against the provided engine.
|
||||
func RunBuiltinTests(t TBRun, engine promql.QueryEngine) {
|
||||
RunBuiltinTestsWithStorage(t, engine, newTestStorage)
|
||||
RunBuiltinTestsWithStorage(t, engine, func(t testing.TB) storage.Storage {
|
||||
return teststorage.New(t, func(opt *tsdb.Options) {
|
||||
opt.EnableSTStorage = true
|
||||
opt.EnableXOR2Encoding = true
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// RunBuiltinTestsWithStorage runs an acceptance test suite against the provided engine and storage.
|
||||
|
||||
162
promql/promqltest/testdata/start_timestamps.test
vendored
Normal file
162
promql/promqltest/testdata/start_timestamps.test
vendored
Normal file
@ -0,0 +1,162 @@
|
||||
# Tests for rate(), irate() and increase() on cumulative.
|
||||
load 1m
|
||||
cumulative{type="no_st"} _ _ 0+60x14
|
||||
cumulative{type="st_resets"}@st _ _ -30s-1mx2 -1m-1mx2 -59999ms-1mx2 -1s-1mx2 -1ms-1mx2
|
||||
cumulative{type="st_resets"} _ _ 0+60x14
|
||||
cumulative{type="normalized_resets"} _ _ 0 60 120 300 360 420 780 840 900 1440 1500 1560 2280 2340 2400
|
||||
|
||||
eval range from 7m to 15m step 1m increase(cumulative[5m])
|
||||
{type="no_st"} 300 300 300 300 300 300 300 300 300
|
||||
{type="st_resets"} 420 780 675 675 1275 900 900 1725 1125
|
||||
{type="normalized_resets"} 420 780 675 675 1275 900 900 1725 1125
|
||||
|
||||
eval range from 7m to 15m step 1m rate(cumulative[5m])
|
||||
{type="no_st"} 1 1 1 1 1 1 1 1 1
|
||||
{type="st_resets"} 1.40 2.60 2.25 2.25 4.25 3.00 3.00 5.75 3.75
|
||||
{type="normalized_resets"} 1.40 2.60 2.25 2.25 4.25 3.00 3.00 5.75 3.75
|
||||
|
||||
eval range from 7m to 15m step 1m irate(cumulative[5m])
|
||||
{type="no_st"} 1 1 1 1 1 1 1 1 1
|
||||
{type="st_resets"} 1 6 1 1 9 1 1 12 1
|
||||
{type="normalized_resets"} 1 6 1 1 9 1 1 12 1
|
||||
|
||||
# Simulate anchored vector selection for easier manual check.
|
||||
eval range from 7m to 15m step 1m round(increase(cumulative[5m1ms]))
|
||||
{type="no_st"} 300 300 300 300 300 300 300 300 300
|
||||
{type="st_resets"} 420 720 720 600 1080 1080 780 1440 1440
|
||||
{type="normalized_resets"} 420 720 720 600 1080 1080 780 1440 1440
|
||||
|
||||
# TODO actual anchored
|
||||
# eval range from 7m to 15m step 1m increase(cumulative[5m] anchored)
|
||||
# {type="no_st"} 300 300 300 300 300 300 300 300 300
|
||||
# {type="st_resets"} 420 720 720 600 1080 1080 780 1440 1440
|
||||
# {type="normalized_resets"} 420 720 720 600 1080 1080 780 1440 1440
|
||||
|
||||
# Subqueries cut the propagation of start timestamps.
|
||||
eval range from 7m to 15m step 1m increase(cumulative[5m:1m])
|
||||
{type="no_st"} 300x8
|
||||
{type="st_resets"} 300x8
|
||||
{type="normalized_resets"} 420 780 675 675 1275 900 900 1725 1125
|
||||
|
||||
clear
|
||||
|
||||
# Tests for rate(), irate() and increase() on deltas. Includes various approaches for start timestamps:
|
||||
# * ST_curr = T_prev (OTel)
|
||||
# * ST_curr = T_prev (GCP)
|
||||
# * ST = T - 1ms
|
||||
# * ST = T
|
||||
load 1m
|
||||
delta{type="no_st"} _ _ 60x15
|
||||
delta{type="otel"}@st _ _ -1mx15
|
||||
delta{type="otel"} _ _ 60x15
|
||||
delta{type="gcp"}@st _ _ -59999msx15
|
||||
delta{type="gcp"} _ _ 60x15
|
||||
delta{type="minimal_range"}@st _ _ -1msx15
|
||||
delta{type="minimal_range"} _ _ 60x15
|
||||
delta{type="empty_range"}@st _ _ -0msx15
|
||||
delta{type="empty_range"} _ _ 60x15
|
||||
delta{type="normalized_resets"} _ _ 60+60x15
|
||||
|
||||
eval range from 10m to 15m step 1m increase(delta[5m])
|
||||
{type="no_st"} 0x5
|
||||
{type="otel"} 300x5
|
||||
{type="gcp"} 300x5
|
||||
{type="minimal_range"} 300x5
|
||||
{type="empty_range"} 300x5
|
||||
{type="normalized_resets"} 300x5
|
||||
|
||||
eval range from 10m to 15m step 1m rate(delta[5m])
|
||||
{type="no_st"} 0x5
|
||||
{type="otel"} 1x5
|
||||
{type="gcp"} 1x5
|
||||
{type="minimal_range"} 1x5
|
||||
{type="empty_range"} 1x5
|
||||
{type="normalized_resets"} 1x5
|
||||
|
||||
eval range from 10m to 15m step 1m irate(delta[5m])
|
||||
{type="no_st"} 0x5
|
||||
{type="otel"} 1x5
|
||||
{type="gcp"} 1x5
|
||||
{type="minimal_range"} 1x5
|
||||
{type="empty_range"} 1x5
|
||||
{type="normalized_resets"} 1x5
|
||||
|
||||
# Subqueries cuts the propagation of start timestamps
|
||||
eval range from 10m to 15m step 1m increase(delta[5m:1m])
|
||||
{type="no_st"} 0x5
|
||||
{type="otel"} 0x5
|
||||
{type="gcp"} 0x5
|
||||
{type="minimal_range"} 0x5
|
||||
{type="empty_range"} 0x5
|
||||
{type="normalized_resets"} 300x5
|
||||
|
||||
clear
|
||||
|
||||
# Test for cumulative with unknow start timestamp, which is described in OTel spec. It is easy to incorrectly treat
|
||||
# second datapoint in such timeseries as delta, producing incorrect result. These can be distinguished by:
|
||||
# * ST_curr == T_prev && ST_prev == 0 -> cumulative with unknown start, no reset
|
||||
# * ST_curr == T_prev && ST_prev != 0 -> delta datapoint, treat as a reset
|
||||
load 1m
|
||||
series{type="otel_cumulative_unknown_start"}@st _ _ _ -1m
|
||||
series{type="otel_cumulative_unknown_start"} _ _ 1 1
|
||||
series{type="otel_delta"}@st _ _ -1m -1m
|
||||
series{type="otel_delta"} _ _ 1 1
|
||||
|
||||
eval instant at 3m round(increase(series[1m1ms]))
|
||||
{type="otel_cumulative_unknown_start"} 0
|
||||
{type="otel_delta"} 1
|
||||
|
||||
clear
|
||||
|
||||
# Tests for rate(), irate() and increase() on cumulative histograms. Includes various approaches for start timestamps:
|
||||
# * ST_curr = T_prev (OTel)
|
||||
# * ST_curr = T_prev (GCP)
|
||||
# * ST = T - 1ms
|
||||
# * ST = T
|
||||
# TODO: start timestamps doesn't work for histograms yet, because the tsdb support is missing.
|
||||
load 1m
|
||||
cumulative@st _ _ -30s-1mx2 -1m-1mx2 -59999ms-1mx2 -1s-1mx2 -1ms-1mx2
|
||||
cumulative _ _ {{schema:0 sum:0 count:0}}+{{schema:0 sum:60 count:60}}x14
|
||||
|
||||
eval range from 7m to 15m step 1m increase(cumulative[5m])
|
||||
{} {{count:300 sum:300}}x8
|
||||
|
||||
eval range from 7m to 15m step 1m rate(cumulative[5m])
|
||||
{} {{count:1 sum:1}}x8
|
||||
|
||||
eval range from 7m to 15m step 1m irate(cumulative[5m])
|
||||
{} {{count:1 sum:1}}x8
|
||||
|
||||
clear
|
||||
|
||||
# Tests for rate(), irate() and increase() on delta histograms.
|
||||
# TODO: start timestamps doesn't work for histograms yet, because the tsdb support is missing.
|
||||
load 1m
|
||||
delta{type="otel"}@st _ _ -1mx15
|
||||
delta{type="otel"} _ _ {{schema:0 sum:60 count:60}}x15
|
||||
delta{type="gcp"}@st _ _ -59999msx15
|
||||
delta{type="gcp"} _ _ {{schema:0 sum:60 count:60}}x15
|
||||
delta{type="minimal_range"}@st _ _ -1msx15
|
||||
delta{type="minimal_range"} _ _ {{schema:0 sum:60 count:60}}x15
|
||||
delta{type="empty_range"}@st _ _ -0msx15
|
||||
delta{type="empty_range"} _ _ {{schema:0 sum:60 count:60}}x15
|
||||
|
||||
eval range from 10m to 15m step 1m increase(delta[5m])
|
||||
{type="otel"} {{sum:0 count:0}}x5
|
||||
{type="gcp"} {{sum:0 count:0}}x5
|
||||
{type="minimal_range"} {{sum:0 count:0}}x5
|
||||
{type="empty_range"} {{sum:0 count:0}}x5
|
||||
|
||||
eval range from 10m to 15m step 1m rate(delta[5m])
|
||||
{type="otel"} {{schema:0 sum:0 count:0}}x5
|
||||
{type="gcp"} {{schema:0 sum:0 count:0}}x5
|
||||
{type="minimal_range"} {{schema:0 sum:0 count:0}}x5
|
||||
{type="empty_range"} {{schema:0 sum:0 count:0}}x5
|
||||
|
||||
eval range from 10m to 15m step 1m irate(delta[5m])
|
||||
{type="otel"} {{schema:0 sum:0 count:0}}x5
|
||||
{type="gcp"} {{schema:0 sum:0 count:0}}x5
|
||||
{type="minimal_range"} {{schema:0 sum:0 count:0}}x5
|
||||
{type="empty_range"} {{schema:0 sum:0 count:0}}x5
|
||||
|
||||
clear
|
||||
@ -397,6 +397,24 @@ func (r *Result) String() string {
|
||||
return r.Value.String()
|
||||
}
|
||||
|
||||
// StartTimestamps stores sample start timestamps aligned with points.
|
||||
type StartTimestamps struct {
|
||||
// Floats stores start timestamps for float samples.
|
||||
Floats []int64
|
||||
// Histograms stores start timestamps for histogram samples.
|
||||
Histograms []int64
|
||||
}
|
||||
|
||||
// Reset clears the start timestamps while keeping the slice capacity for reuse.
|
||||
func (st *StartTimestamps) Reset() {
|
||||
if st.Floats != nil {
|
||||
st.Floats = st.Floats[:0]
|
||||
}
|
||||
if st.Histograms != nil {
|
||||
st.Histograms = st.Histograms[:0]
|
||||
}
|
||||
}
|
||||
|
||||
// StorageSeries simulates promql.Series as storage.Series.
|
||||
type StorageSeries struct {
|
||||
series Series
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user