mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-06 06:07:11 +02:00
Add anchored and smoothed to vector selectors.
**This "PoC" is not finished not optimized. It currently also returns data when there is only one point in the selector. It is not ready for use and just there to play around** This adds "anchored" and "smoothed" keywords that can be used following a matrix selector. "Anchored" selects the last point before the range (or the first one after the range) and adds it at the boundary of the matrix selector. "Smoothed" applies linear interpolation at the edges using the points around the edges. In the absence of a point before or after the edge, the first or the last point is added to the edge, without interpolation. A complete design doc will follow. *Exemple usage* * `increase(caddy_http_requests_total[5m] anchored)` (equivalent of *caddy_http_requests_total - caddy_http_requests_total offset 5m* but takes counter reset into consideration) * `rate(caddy_http_requests_total[5m] smoothed)` Signed-off-by: Julien Pivotto <291750+roidelapluie@users.noreply.github.com>
This commit is contained in:
parent
fe67b30e9e
commit
47fdeb9a03
@ -275,6 +275,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
||||
case "promql-delayed-name-removal":
|
||||
c.promqlEnableDelayedNameRemoval = true
|
||||
logger.Info("Experimental PromQL delayed name removal enabled.")
|
||||
case "promql-extended-range-selectors":
|
||||
parser.EnableExtendedRangeSelectors = true
|
||||
logger.Info("Experimental PromQL extended range selectors enabled.")
|
||||
case "":
|
||||
continue
|
||||
case "old-ui":
|
||||
@ -561,7 +564,7 @@ func main() {
|
||||
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
|
||||
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
|
||||
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io, promql-extended-range-selectors. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
Default("").StringsVar(&cfg.featureList)
|
||||
|
||||
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)
|
||||
|
@ -302,3 +302,21 @@ memory in response to misleading cache growth.
|
||||
This is currently implemented using direct I/O.
|
||||
|
||||
For more details, see the [proposal](https://github.com/prometheus/proposals/pull/45).
|
||||
|
||||
## Extended Range Selectors
|
||||
|
||||
`--enable-feature=promql-extended-range-selectors`
|
||||
|
||||
Enables experimental `anchored` and `smoothed` modifiers for PromQL range and instant selectors. These modifiers provide more control over how range boundaries are handled in functions like `rate` and `increase`, especially with missing or irregular data.
|
||||
|
||||
- `anchored`: Treats the range as closed on both ends, using the most recent sample before each boundary or duplicating the first/last sample if needed. Useful for precise counter calculations and improved composability. Use with **increase** and **delta**.
|
||||
- `smoothed`: Linearly interpolates values at the range boundaries, using all available data for more accurate results with irregular scrapes or missing samples. Use with **rate** and **deriv**.
|
||||
|
||||
Example queries:
|
||||
`increase(http_requests_total[5m] anchored)`
|
||||
`rate(http_requests_total[5m] smoothed)`
|
||||
|
||||
> **Note for alerting and recording rules:**
|
||||
> The `smoothed` modifier requires samples after the evaluation interval, so using it directly in alerting or recording rules will typically *under-estimate* the result, as future samples are not available at evaluation time.
|
||||
> To use `smoothed` safely in rules, you **must** apply a `query_offset` (e.g., `offset 1m`) to ensure the calculation window is fully in the past and all needed samples are available.
|
||||
> For critical alerting, set the offset to at least one scrape interval; for less critical or more resilient use cases, consider a larger offset (multiple scrape intervals) to tolerate missed scrapes.
|
||||
|
251
promql/engine.go
251
promql/engine.go
@ -926,13 +926,27 @@ func getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorSelector, path
|
||||
// because wo want to exclude samples that are precisely the
|
||||
// lookback delta before the eval time.
|
||||
start -= durationMilliseconds(s.LookbackDelta) - 1
|
||||
if n.Smoothed {
|
||||
end += durationMilliseconds(s.LookbackDelta)
|
||||
}
|
||||
} else {
|
||||
// For all matrix queries we want to ensure that we have
|
||||
// (end-start) + range selected this way we have `range` data
|
||||
// before the start time. We subtract one from the range to
|
||||
// exclude samples positioned directly at the lower boundary of
|
||||
// the range.
|
||||
start -= durationMilliseconds(evalRange) - 1
|
||||
// For matrix queries, adjust the start and end times to ensure the
|
||||
// correct range of data is selected. For "anchored" selectors, extend
|
||||
// the start time backwards by the lookback delta plus the evaluation
|
||||
// range. For "smoothed" selectors, extend both the start and end times
|
||||
// by the lookback delta, and also extend the start time by the
|
||||
// evaluation range to cover the smoothing window. For standard range
|
||||
// queries, extend the start time backwards by the range (minus one
|
||||
// millisecond) to exclude samples exactly at the lower boundary.
|
||||
switch {
|
||||
case n.Anchored:
|
||||
start -= durationMilliseconds(s.LookbackDelta + evalRange)
|
||||
case n.Smoothed:
|
||||
start -= durationMilliseconds(s.LookbackDelta + evalRange)
|
||||
end += durationMilliseconds(s.LookbackDelta)
|
||||
default:
|
||||
start -= durationMilliseconds(evalRange) - 1
|
||||
}
|
||||
}
|
||||
|
||||
offsetMilliseconds := durationMilliseconds(n.OriginalOffset)
|
||||
@ -979,7 +993,6 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
|
||||
evalRange = 0
|
||||
hints.By, hints.Grouping = extractGroupsFromPath(path)
|
||||
n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...)
|
||||
|
||||
case *parser.MatrixSelector:
|
||||
evalRange = n.Range
|
||||
}
|
||||
@ -1516,6 +1529,57 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
|
||||
return result, annos
|
||||
}
|
||||
|
||||
func (ev *evaluator) smoothSeries(series []storage.Series, offset time.Duration, recordOrigT bool) Matrix {
|
||||
dur := ev.endTimestamp - ev.startTimestamp
|
||||
it := storage.NewBuffer(dur + 2*durationMilliseconds(ev.lookbackDelta))
|
||||
var chkIter chunkenc.Iterator
|
||||
mat := make(Matrix, 0, len(series))
|
||||
for _, s := range series {
|
||||
ss := Series{
|
||||
Metric: s.Labels(),
|
||||
}
|
||||
chkIter := s.Iterator(chkIter)
|
||||
it.Reset(chkIter)
|
||||
|
||||
var floats []FPoint
|
||||
|
||||
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
|
||||
matrixStart := ts - durationMilliseconds(ev.lookbackDelta)
|
||||
matrixEnd := ts + durationMilliseconds(ev.lookbackDelta)
|
||||
floats, _ = ev.matrixIterSlice(it, matrixStart, matrixEnd, floats, nil)
|
||||
|
||||
step++
|
||||
var havePrev bool
|
||||
var found bool
|
||||
var prevF float64
|
||||
var prevT int64
|
||||
for _, f := range floats {
|
||||
if f.T == ts {
|
||||
ss.Floats = append(ss.Floats, f)
|
||||
found = true
|
||||
break
|
||||
} else if f.T < ts {
|
||||
prevT = f.T
|
||||
prevF = f.F
|
||||
havePrev = true
|
||||
continue
|
||||
}
|
||||
if havePrev {
|
||||
value := linear(prevF, f.F, prevT, f.T, ts)
|
||||
ss.Floats = append(ss.Floats, FPoint{F: value, T: ts})
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found && havePrev {
|
||||
ss.Floats = append(ss.Floats, FPoint{F: prevF, T: ts})
|
||||
}
|
||||
}
|
||||
mat = append(mat, ss)
|
||||
}
|
||||
return mat
|
||||
}
|
||||
|
||||
// evalSeries generates a Matrix between ev.startTimestamp and ev.endTimestamp (inclusive), each point spaced ev.interval apart, from series given offset.
|
||||
// For every storage.Series iterator in series, the method iterates in ev.interval sized steps from ev.startTimestamp until and including ev.endTimestamp,
|
||||
// collecting every corresponding sample (obtained via ev.vectorSelectorSingle) into a Series.
|
||||
@ -1786,6 +1850,15 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
sel := arg.(*parser.MatrixSelector)
|
||||
selVS := sel.VectorSelector.(*parser.VectorSelector)
|
||||
|
||||
// Anchored works with: increase, and delta.
|
||||
if selVS.Anchored && e.Func.Name != "increase" && e.Func.Name != "delta" {
|
||||
warnings.Add(annotations.NewAnchoredWithUnsupportedFunctionWarning(e.Func.Name, e.Args[matrixArgIndex].PositionRange()))
|
||||
}
|
||||
// Smoothed works with: deriv, and rate.
|
||||
if selVS.Smoothed && e.Func.Name != "deriv" && e.Func.Name != "rate" {
|
||||
warnings.Add(annotations.NewSmoothedWithUnsupportedFunctionWarning(e.Func.Name, e.Args[matrixArgIndex].PositionRange()))
|
||||
}
|
||||
|
||||
ws, err := checkAndExpandSeriesSet(ctx, sel)
|
||||
warnings.Merge(ws)
|
||||
if err != nil {
|
||||
@ -1795,7 +1868,13 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
offset := durationMilliseconds(selVS.Offset)
|
||||
selRange := durationMilliseconds(sel.Range)
|
||||
stepRange := selRange
|
||||
if stepRange > ev.interval {
|
||||
switch {
|
||||
case selVS.Anchored:
|
||||
stepRange += durationMilliseconds(ev.lookbackDelta)
|
||||
case selVS.Smoothed:
|
||||
stepRange += durationMilliseconds(2 * ev.lookbackDelta)
|
||||
}
|
||||
if stepRange > ev.interval || selVS.Smoothed || selVS.Anchored {
|
||||
stepRange = ev.interval
|
||||
}
|
||||
// Reuse objects across steps to save memory allocations.
|
||||
@ -1805,7 +1884,18 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
inMatrix := make(Matrix, 1)
|
||||
enh := &EvalNodeHelper{Out: make(Vector, 0, 1), enableDelayedNameRemoval: ev.enableDelayedNameRemoval}
|
||||
// Process all the calls for one time series at a time.
|
||||
it := storage.NewBuffer(selRange)
|
||||
// For anchored and smoothed selectors, we need to iterate over a
|
||||
// larger range than the query range to account for the lookback delta.
|
||||
// For standard range queries, we iterate over the query range.
|
||||
bufferRange := selRange
|
||||
switch {
|
||||
case selVS.Anchored:
|
||||
bufferRange += durationMilliseconds(ev.lookbackDelta)
|
||||
case selVS.Smoothed:
|
||||
bufferRange += durationMilliseconds(2 * ev.lookbackDelta)
|
||||
}
|
||||
|
||||
it := storage.NewBuffer(bufferRange)
|
||||
var chkIter chunkenc.Iterator
|
||||
|
||||
// The last_over_time function acts like offset; thus, it
|
||||
@ -1836,6 +1926,7 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
DropName: dropName,
|
||||
}
|
||||
inMatrix[0].Metric = selVS.Series[i].Labels()
|
||||
var mint, maxt int64
|
||||
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
|
||||
step++
|
||||
// Set the non-matrix arguments.
|
||||
@ -1852,14 +1943,30 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
// for this step, but only if this is the 1st
|
||||
// iteration or no @ modifier has been used.
|
||||
if ts == ev.startTimestamp || selVS.Timestamp == nil {
|
||||
maxt := ts - offset
|
||||
mint := maxt - selRange
|
||||
floats, histograms = ev.matrixIterSlice(it, mint, maxt, floats, histograms)
|
||||
maxt = ts - offset
|
||||
mint = maxt - selRange
|
||||
matrixMaxt := maxt
|
||||
matrixMint := mint
|
||||
if selVS.Anchored || selVS.Smoothed {
|
||||
matrixMint -= durationMilliseconds(ev.lookbackDelta)
|
||||
}
|
||||
if selVS.Smoothed {
|
||||
matrixMaxt += durationMilliseconds(ev.lookbackDelta)
|
||||
}
|
||||
floats, histograms = ev.matrixIterSlice(it, matrixMint, matrixMaxt, floats, histograms)
|
||||
}
|
||||
if len(floats)+len(histograms) == 0 {
|
||||
continue
|
||||
}
|
||||
inMatrix[0].Floats = floats
|
||||
switch {
|
||||
case selVS.Anchored:
|
||||
inMatrix[0].Floats = anchorFloats(floats, mint, maxt)
|
||||
case selVS.Smoothed:
|
||||
counterReset := e.Func.Name == "rate" || e.Func.Name == "increase" || e.Func.Name == "irate"
|
||||
inMatrix[0].Floats = smoothFloats(floats, mint, maxt, counterReset)
|
||||
default:
|
||||
inMatrix[0].Floats = floats
|
||||
}
|
||||
inMatrix[0].Histograms = histograms
|
||||
enh.Ts = ts
|
||||
|
||||
@ -2057,6 +2164,10 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
||||
if err != nil {
|
||||
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
|
||||
}
|
||||
if e.Smoothed {
|
||||
mat := ev.smoothSeries(e.Series, e.Offset, false)
|
||||
return mat, ws
|
||||
}
|
||||
mat := ev.evalSeries(ctx, e.Series, e.Offset, false)
|
||||
return mat, ws
|
||||
|
||||
@ -2355,13 +2466,24 @@ func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSele
|
||||
var (
|
||||
vs = node.VectorSelector.(*parser.VectorSelector)
|
||||
|
||||
offset = durationMilliseconds(vs.Offset)
|
||||
maxt = ev.startTimestamp - offset
|
||||
mint = maxt - durationMilliseconds(node.Range)
|
||||
matrix = make(Matrix, 0, len(vs.Series))
|
||||
|
||||
it = storage.NewBuffer(durationMilliseconds(node.Range))
|
||||
offset = durationMilliseconds(vs.Offset)
|
||||
maxt = ev.startTimestamp - offset
|
||||
mint = maxt - durationMilliseconds(node.Range)
|
||||
matrixMint = mint
|
||||
matrixMaxt = maxt
|
||||
matrix = make(Matrix, 0, len(vs.Series))
|
||||
bufferRange = durationMilliseconds(node.Range)
|
||||
)
|
||||
switch {
|
||||
case vs.Anchored:
|
||||
bufferRange += durationMilliseconds(ev.lookbackDelta)
|
||||
mint -= durationMilliseconds(ev.lookbackDelta)
|
||||
case vs.Smoothed:
|
||||
bufferRange += 2 * durationMilliseconds(ev.lookbackDelta)
|
||||
mint -= durationMilliseconds(ev.lookbackDelta)
|
||||
maxt += durationMilliseconds(ev.lookbackDelta)
|
||||
}
|
||||
it := storage.NewBuffer(bufferRange)
|
||||
ws, err := checkAndExpandSeriesSet(ctx, node)
|
||||
if err != nil {
|
||||
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
|
||||
@ -2380,6 +2502,12 @@ func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSele
|
||||
}
|
||||
|
||||
ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil)
|
||||
switch {
|
||||
case vs.Anchored:
|
||||
ss.Floats = anchorFloats(ss.Floats, matrixMint, matrixMaxt)
|
||||
case vs.Smoothed:
|
||||
ss.Floats = smoothFloats(ss.Floats, matrixMint, matrixMaxt, false)
|
||||
}
|
||||
totalSize := int64(len(ss.Floats)) + int64(totalHPointSize(ss.Histograms))
|
||||
ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalSize)
|
||||
|
||||
@ -4010,3 +4138,88 @@ func (ev *evaluator) gatherVector(ts int64, input Matrix, output Vector, bufHelp
|
||||
|
||||
return output, bufHelpers
|
||||
}
|
||||
|
||||
func anchorFloats(floats []FPoint, mint, maxt int64) []FPoint {
|
||||
anchor := make([]FPoint, 0)
|
||||
for i, f := range floats {
|
||||
// <= mint because the interval is left-closed.
|
||||
if f.T <= mint {
|
||||
continue
|
||||
}
|
||||
if i > 0 {
|
||||
anchor = append(anchor, FPoint{T: mint, F: floats[i-1].F})
|
||||
} else {
|
||||
anchor = append(anchor, FPoint{T: mint, F: f.F})
|
||||
}
|
||||
anchor = append(anchor, floats[i:]...)
|
||||
if lastF := anchor[len(anchor)-1]; lastF.T != maxt {
|
||||
anchor = append(anchor, FPoint{T: maxt, F: lastF.F})
|
||||
}
|
||||
break
|
||||
}
|
||||
return anchor
|
||||
}
|
||||
|
||||
// linear interpolates between two points at a given time.
|
||||
func linear(f1, f2 float64, t1, t2, t int64) float64 {
|
||||
if f1 == f2 {
|
||||
return f1
|
||||
}
|
||||
ratio := float64(t-t1) / float64(t2-t1)
|
||||
return (1.0-ratio)*f1 + ratio*f2
|
||||
}
|
||||
|
||||
func smoothFloats(floats []FPoint, mint, maxt int64, counterReset bool) []FPoint {
|
||||
var out []FPoint
|
||||
n := len(floats)
|
||||
if n == 0 {
|
||||
return out
|
||||
}
|
||||
|
||||
// Find first point after mint.
|
||||
i := 0
|
||||
for i < n && floats[i].T <= mint {
|
||||
i++
|
||||
}
|
||||
|
||||
// If all points are before mint or first after maxt, nothing to do.
|
||||
if i == n {
|
||||
return out
|
||||
}
|
||||
|
||||
// Add interpolated/anchored point at mint.
|
||||
if i == 0 {
|
||||
out = append(out, FPoint{T: mint, F: floats[0].F})
|
||||
} else {
|
||||
prev, next := floats[i-1], floats[i]
|
||||
prevF := prev.F
|
||||
if counterReset && prevF > next.F {
|
||||
prevF = 0
|
||||
}
|
||||
f := linear(prevF, next.F, prev.T, next.T, mint)
|
||||
out = append(out, FPoint{T: mint, F: f})
|
||||
}
|
||||
|
||||
// Add all points strictly between mint and maxt.
|
||||
for ; i < n && floats[i].T < maxt; i++ {
|
||||
out = append(out, floats[i])
|
||||
}
|
||||
|
||||
// Add interpolated/anchored point at maxt.
|
||||
if i == n {
|
||||
// If last point is before maxt, anchor at last value.
|
||||
out = append(out, FPoint{T: maxt, F: floats[n-1].F})
|
||||
} else if floats[i].T == maxt {
|
||||
out = append(out, floats[i])
|
||||
} else if i > 0 {
|
||||
prev, next := floats[i-1], floats[i]
|
||||
prevF := prev.F
|
||||
if counterReset && prevF > next.F {
|
||||
prevF = 0
|
||||
}
|
||||
f := linear(prevF, next.F, prev.T, next.T, maxt)
|
||||
out = append(out, FPoint{T: maxt, F: f})
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
@ -1513,6 +1513,139 @@ load 10s
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtendedRangeSelectors(t *testing.T) {
|
||||
parser.EnableExtendedRangeSelectors = true
|
||||
t.Cleanup(func() {
|
||||
parser.EnableExtendedRangeSelectors = false
|
||||
})
|
||||
|
||||
engine := newTestEngine(t)
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 10s
|
||||
metric 1+1x10
|
||||
withreset 1+1x4 1+1x5
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
tc := []struct {
|
||||
query string
|
||||
t time.Time
|
||||
expected promql.Matrix
|
||||
}{
|
||||
{
|
||||
query: "metric[10s] smoothed",
|
||||
t: time.Unix(10, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 1, T: 0}, {F: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "metric[10s] smoothed",
|
||||
t: time.Unix(15, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 1.5, T: 5000}, {F: 2, T: 10000}, {F: 2.5, T: 15000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "metric[10s] smoothed",
|
||||
t: time.Unix(5, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 1, T: -5000}, {F: 1, T: 0}, {F: 1.5, T: 5000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "metric[10s] smoothed",
|
||||
t: time.Unix(105, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 10.5, T: 95000}, {F: 11, T: 100000}, {F: 11, T: 105000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "withreset[10s] smoothed",
|
||||
t: time.Unix(45, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 4.5, T: 35000}, {F: 5, T: 40000}, {F: 3, T: 45000}},
|
||||
Metric: labels.FromStrings("__name__", "withreset"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "metric[10s] anchored",
|
||||
t: time.Unix(10, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 1, T: 0}, {F: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "metric[10s] anchored",
|
||||
t: time.Unix(15, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 1, T: 5000}, {F: 2, T: 10000}, {F: 2, T: 15000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "metric[10s] anchored",
|
||||
t: time.Unix(5, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 1, T: -5000}, {F: 1, T: 0}, {F: 1, T: 5000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "metric[10s] anchored",
|
||||
t: time.Unix(105, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 10, T: 95000}, {F: 11, T: 100000}, {F: 11, T: 105000}},
|
||||
Metric: labels.FromStrings("__name__", "metric"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "withreset[10s] anchored",
|
||||
t: time.Unix(45, 0),
|
||||
expected: promql.Matrix{
|
||||
promql.Series{
|
||||
Floats: []promql.FPoint{{F: 4, T: 35000}, {F: 5, T: 40000}, {F: 5, T: 45000}},
|
||||
Metric: labels.FromStrings("__name__", "withreset"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tc {
|
||||
t.Run(tc.query, func(t *testing.T) {
|
||||
engine = promqltest.NewTestEngine(t, false, 0, 100)
|
||||
qry, err := engine.NewInstantQuery(context.Background(), storage, nil, tc.query, tc.t)
|
||||
require.NoError(t, err)
|
||||
res := qry.Exec(context.Background())
|
||||
require.NoError(t, res.Err)
|
||||
require.Equal(t, tc.expected, res.Value)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAtModifier(t *testing.T) {
|
||||
engine := newTestEngine(t)
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
|
@ -226,6 +226,11 @@ type VectorSelector struct {
|
||||
// This is the case when VectorSelector is used to represent the info function's second argument.
|
||||
BypassEmptyMatcherCheck bool
|
||||
|
||||
// Anchored is true when the VectorSelector is anchored.
|
||||
Anchored bool
|
||||
// Smoothed is true when the VectorSelector is smoothed.
|
||||
Smoothed bool
|
||||
|
||||
PosRange posrange.PositionRange
|
||||
}
|
||||
|
||||
|
@ -141,6 +141,8 @@ GROUP_LEFT
|
||||
GROUP_RIGHT
|
||||
IGNORING
|
||||
OFFSET
|
||||
SMOOTHED
|
||||
ANCHORED
|
||||
ON
|
||||
WITHOUT
|
||||
%token keywordsEnd
|
||||
@ -187,7 +189,7 @@ START_METRIC_SELECTOR
|
||||
%type <int> int
|
||||
%type <uint> uint
|
||||
%type <float> number series_value signed_number signed_or_unsigned_number
|
||||
%type <node> step_invariant_expr aggregate_expr aggregate_modifier bin_modifier binary_expr bool_modifier expr function_call function_call_args function_call_body group_modifiers label_matchers matrix_selector number_duration_literal offset_expr on_or_ignoring paren_expr string_literal subquery_expr unary_expr vector_selector duration_expr paren_duration_expr positive_duration_expr offset_duration_expr
|
||||
%type <node> step_invariant_expr aggregate_expr aggregate_modifier bin_modifier binary_expr bool_modifier expr function_call function_call_args function_call_body group_modifiers label_matchers matrix_selector number_duration_literal offset_expr anchored_expr smoothed_expr on_or_ignoring paren_expr string_literal subquery_expr unary_expr vector_selector duration_expr paren_duration_expr positive_duration_expr offset_duration_expr
|
||||
|
||||
%start start
|
||||
|
||||
@ -230,6 +232,8 @@ expr :
|
||||
| matrix_selector
|
||||
| number_duration_literal
|
||||
| offset_expr
|
||||
| anchored_expr
|
||||
| smoothed_expr
|
||||
| paren_expr
|
||||
| string_literal
|
||||
| subquery_expr
|
||||
@ -482,6 +486,20 @@ offset_expr: expr OFFSET offset_duration_expr
|
||||
{ yylex.(*parser).unexpected("offset", "number, duration, or step()"); $$ = $1 }
|
||||
;
|
||||
|
||||
/*
|
||||
* Anchored and smoothed modifiers
|
||||
*/
|
||||
|
||||
anchored_expr: expr ANCHORED
|
||||
{
|
||||
yylex.(*parser).setAnchored($1)
|
||||
}
|
||||
|
||||
smoothed_expr: expr SMOOTHED
|
||||
{
|
||||
yylex.(*parser).setSmoothed($1)
|
||||
}
|
||||
|
||||
/*
|
||||
* @ modifiers.
|
||||
*/
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -129,6 +129,8 @@ var key = map[string]ItemType{
|
||||
|
||||
// Keywords.
|
||||
"offset": OFFSET,
|
||||
"smoothed": SMOOTHED,
|
||||
"anchored": ANCHORED,
|
||||
"by": BY,
|
||||
"without": WITHOUT,
|
||||
"on": ON,
|
||||
|
@ -42,6 +42,9 @@ var parserPool = sync.Pool{
|
||||
// ExperimentalDurationExpr is a flag to enable experimental duration expression parsing.
|
||||
var ExperimentalDurationExpr bool
|
||||
|
||||
// EnableExtendedRangeSelectors is a flag to enable experimental extended range selectors.
|
||||
var EnableExtendedRangeSelectors bool
|
||||
|
||||
type Parser interface {
|
||||
ParseExpr() (Expr, error)
|
||||
Close()
|
||||
@ -1037,6 +1040,48 @@ func (p *parser) addOffsetExpr(e Node, expr *DurationExpr) {
|
||||
*endPosp = p.lastClosing
|
||||
}
|
||||
|
||||
func (p *parser) setAnchored(e Node) {
|
||||
if !EnableExtendedRangeSelectors {
|
||||
p.addParseErrf(e.PositionRange(), "anchored modifier is experimental and not enabled")
|
||||
return
|
||||
}
|
||||
switch s := e.(type) {
|
||||
case *VectorSelector:
|
||||
s.Anchored = true
|
||||
if s.Smoothed {
|
||||
p.addParseErrf(e.PositionRange(), "anchored and smoothed modifiers cannot be used together")
|
||||
}
|
||||
case *MatrixSelector:
|
||||
s.VectorSelector.(*VectorSelector).Anchored = true
|
||||
if s.VectorSelector.(*VectorSelector).Smoothed {
|
||||
p.addParseErrf(e.PositionRange(), "anchored and smoothed modifiers cannot be used together")
|
||||
}
|
||||
default:
|
||||
p.addParseErrf(e.PositionRange(), "anchored modifier not implemented")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *parser) setSmoothed(e Node) {
|
||||
if !EnableExtendedRangeSelectors {
|
||||
p.addParseErrf(e.PositionRange(), "smoothed modifier is experimental and not enabled")
|
||||
return
|
||||
}
|
||||
switch s := e.(type) {
|
||||
case *VectorSelector:
|
||||
s.Smoothed = true
|
||||
if s.Anchored {
|
||||
p.addParseErrf(e.PositionRange(), "anchored and smoothed modifiers cannot be used together")
|
||||
}
|
||||
case *MatrixSelector:
|
||||
s.VectorSelector.(*VectorSelector).Smoothed = true
|
||||
if s.VectorSelector.(*VectorSelector).Anchored {
|
||||
p.addParseErrf(e.PositionRange(), "anchored and smoothed modifiers cannot be used together")
|
||||
}
|
||||
default:
|
||||
p.addParseErrf(e.PositionRange(), "smoothed modifier not implemented")
|
||||
}
|
||||
}
|
||||
|
||||
// setTimestamp is used to set the timestamp from the @ modifier in the generated parser.
|
||||
func (p *parser) setTimestamp(e Node, ts float64) {
|
||||
if math.IsInf(ts, -1) || math.IsInf(ts, 1) || math.IsNaN(ts) ||
|
||||
|
@ -222,11 +222,17 @@ func (node *MatrixSelector) String() string {
|
||||
vecSelector.Timestamp = nil
|
||||
vecSelector.StartOrEnd = 0
|
||||
|
||||
extendedAttribute := ""
|
||||
if vecSelector.Anchored {
|
||||
extendedAttribute = " anchored"
|
||||
} else if vecSelector.Smoothed {
|
||||
extendedAttribute = " smoothed"
|
||||
}
|
||||
rangeStr := model.Duration(node.Range).String()
|
||||
if node.RangeExpr != nil {
|
||||
rangeStr = node.RangeExpr.String()
|
||||
}
|
||||
str := fmt.Sprintf("%s[%s]%s%s", vecSelector.String(), rangeStr, at, offset)
|
||||
str := fmt.Sprintf("%s[%s]%s%s%s", vecSelector.String(), rangeStr, extendedAttribute, at, offset)
|
||||
|
||||
vecSelector.OriginalOffset, vecSelector.OriginalOffsetExpr, vecSelector.Timestamp, vecSelector.StartOrEnd = offsetVal, offsetExprVal, atVal, preproc
|
||||
|
||||
@ -321,6 +327,12 @@ func (node *VectorSelector) String() string {
|
||||
}
|
||||
labelStrings = append(labelStrings, matcher.String())
|
||||
}
|
||||
extendedAttribute := ""
|
||||
if node.Anchored {
|
||||
extendedAttribute = " anchored"
|
||||
} else if node.Smoothed {
|
||||
extendedAttribute = " smoothed"
|
||||
}
|
||||
offset := ""
|
||||
switch {
|
||||
case node.OriginalOffsetExpr != nil:
|
||||
@ -344,5 +356,5 @@ func (node *VectorSelector) String() string {
|
||||
return fmt.Sprintf("%s%s%s", node.Name, at, offset)
|
||||
}
|
||||
sort.Strings(labelStrings)
|
||||
return fmt.Sprintf("%s{%s}%s%s", node.Name, strings.Join(labelStrings, ","), at, offset)
|
||||
return fmt.Sprintf("%s{%s}%s%s%s", node.Name, strings.Join(labelStrings, ","), extendedAttribute, at, offset)
|
||||
}
|
||||
|
@ -123,6 +123,7 @@ func RunBuiltinTestsWithStorage(t TBRun, engine promql.QueryEngine, newStorage f
|
||||
})
|
||||
parser.EnableExperimentalFunctions = true
|
||||
parser.ExperimentalDurationExpr = true
|
||||
parser.EnableExtendedRangeSelectors = true
|
||||
|
||||
files, err := fs.Glob(testsFs, "*/*.test")
|
||||
require.NoError(t, err)
|
||||
|
216
promql/promqltest/testdata/extended_vectors.test
vendored
Normal file
216
promql/promqltest/testdata/extended_vectors.test
vendored
Normal file
@ -0,0 +1,216 @@
|
||||
# Reference from PROM-52: Complete dataset
|
||||
|
||||
load 15s
|
||||
metric 1+1x4 9+1x4
|
||||
|
||||
eval instant at 5s increase(metric[1m])
|
||||
|
||||
eval instant at 20s increase(metric[1m])
|
||||
{} 1.833333333
|
||||
|
||||
eval instant at 35s increase(metric[1m])
|
||||
{} 2.833333333
|
||||
|
||||
eval instant at 50s increase(metric[1m])
|
||||
{} 4
|
||||
|
||||
eval instant at 65s increase(metric[1m])
|
||||
{} 4
|
||||
|
||||
eval instant at 80s increase(metric[1m])
|
||||
{} 8
|
||||
|
||||
eval instant at 95s increase(metric[1m])
|
||||
{} 8
|
||||
|
||||
eval instant at 110s increase(metric[1m])
|
||||
{} 8
|
||||
|
||||
eval instant at 125s increase(metric[1m])
|
||||
{} 4
|
||||
|
||||
eval instant at 5s increase(metric[1m] anchored)
|
||||
{} 0
|
||||
|
||||
eval instant at 20s increase(metric[1m] anchored)
|
||||
{} 1
|
||||
|
||||
eval instant at 35s increase(metric[1m] anchored)
|
||||
{} 2
|
||||
|
||||
eval instant at 50s increase(metric[1m] anchored)
|
||||
{} 3
|
||||
|
||||
eval instant at 65s increase(metric[1m] anchored)
|
||||
{} 4
|
||||
|
||||
eval instant at 80s increase(metric[1m] anchored)
|
||||
{} 7
|
||||
|
||||
eval instant at 95s increase(metric[1m] anchored)
|
||||
{} 7
|
||||
|
||||
eval instant at 110s increase(metric[1m] anchored)
|
||||
{} 7
|
||||
|
||||
eval instant at 125s increase(metric[1m] anchored)
|
||||
{} 7
|
||||
|
||||
eval instant at 5s increase(metric[1m] smoothed)
|
||||
{} 0.333333333
|
||||
|
||||
eval instant at 20s increase(metric[1m] smoothed)
|
||||
{} 1.333333333
|
||||
|
||||
eval instant at 35s increase(metric[1m] smoothed)
|
||||
{} 2.333333333
|
||||
|
||||
eval instant at 50s increase(metric[1m] smoothed)
|
||||
{} 3.333333333
|
||||
|
||||
eval instant at 65s increase(metric[1m] smoothed)
|
||||
{} 5
|
||||
|
||||
eval instant at 80s increase(metric[1m] smoothed)
|
||||
{} 7
|
||||
|
||||
eval instant at 95s increase(metric[1m] smoothed)
|
||||
{} 7
|
||||
|
||||
eval instant at 110s increase(metric[1m] smoothed)
|
||||
{} 7
|
||||
|
||||
eval instant at 125s increase(metric[1m] smoothed)
|
||||
{} 6
|
||||
|
||||
# Reference from PROM-52: Partial dataset
|
||||
|
||||
clear
|
||||
load 15s
|
||||
metric 1+1x2 _ _ 9+1x4
|
||||
|
||||
eval instant at 5s increase(metric[1m])
|
||||
|
||||
eval instant at 20s increase(metric[1m])
|
||||
{} 1.833333333
|
||||
|
||||
eval instant at 35s increase(metric[1m])
|
||||
{} 2.833333333
|
||||
|
||||
eval instant at 50s increase(metric[1m])
|
||||
{} 3.166666666
|
||||
|
||||
eval instant at 65s increase(metric[1m])
|
||||
{} 2.166666666
|
||||
|
||||
eval instant at 80s increase(metric[1m])
|
||||
{} 8
|
||||
|
||||
eval instant at 95s increase(metric[1m])
|
||||
{} 1.833333333
|
||||
|
||||
eval instant at 110s increase(metric[1m])
|
||||
{} 2.833333333
|
||||
|
||||
eval instant at 125s increase(metric[1m])
|
||||
{} 4
|
||||
|
||||
eval instant at 5s increase(metric[1m] anchored)
|
||||
{} 0
|
||||
|
||||
eval instant at 20s increase(metric[1m] anchored)
|
||||
{} 1
|
||||
|
||||
eval instant at 35s increase(metric[1m] anchored)
|
||||
{} 2
|
||||
|
||||
eval instant at 50s increase(metric[1m] anchored)
|
||||
{} 2
|
||||
|
||||
eval instant at 65s increase(metric[1m] anchored)
|
||||
{} 2
|
||||
|
||||
eval instant at 80s increase(metric[1m] anchored)
|
||||
{} 7
|
||||
|
||||
eval instant at 95s increase(metric[1m] anchored)
|
||||
{} 7
|
||||
|
||||
eval instant at 110s increase(metric[1m] anchored)
|
||||
{} 8
|
||||
|
||||
eval instant at 125s increase(metric[1m] anchored)
|
||||
{} 9
|
||||
|
||||
eval instant at 5s increase(metric[1m] smoothed)
|
||||
{} 0.333333333
|
||||
|
||||
eval instant at 20s increase(metric[1m] smoothed)
|
||||
{} 1.333333333
|
||||
|
||||
eval instant at 35s increase(metric[1m] smoothed)
|
||||
{} 2.666666666
|
||||
|
||||
eval instant at 50s increase(metric[1m] smoothed)
|
||||
{} 4.666666666
|
||||
|
||||
eval instant at 65s increase(metric[1m] smoothed)
|
||||
{} 6.333333333
|
||||
|
||||
eval instant at 80s increase(metric[1m] smoothed)
|
||||
{} 7
|
||||
|
||||
eval instant at 95s increase(metric[1m] smoothed)
|
||||
{} 6.666666666
|
||||
|
||||
eval instant at 110s increase(metric[1m] smoothed)
|
||||
{} 5.666666666
|
||||
|
||||
eval instant at 125s increase(metric[1m] smoothed)
|
||||
{} 4.666666666
|
||||
|
||||
# Test that inverval is left-open.
|
||||
|
||||
clear
|
||||
load 1m
|
||||
metric 1 2 _ 4 5
|
||||
|
||||
eval instant at 2m increase(metric[1m] smoothed)
|
||||
{} 1
|
||||
|
||||
eval instant at 2m increase(metric[1m] anchored)
|
||||
|
||||
# Basic test with counter resets
|
||||
|
||||
clear
|
||||
load 1m
|
||||
metric{id="1"} 1+1x4 1+1x4
|
||||
metric{id="2"} 3 2+2x9
|
||||
metric{id="3"} 5+3x2 3+3x6
|
||||
|
||||
eval instant at 1m30s increase(metric[1m])
|
||||
|
||||
eval instant at 1m30s increase(metric[1m] smoothed)
|
||||
{id="1"} 1
|
||||
{id="2"} 2
|
||||
{id="3"} 3
|
||||
|
||||
eval instant at 1m30s increase(metric[1m] anchored)
|
||||
{id="1"} 1
|
||||
{id="2"} 2
|
||||
{id="3"} 3
|
||||
|
||||
eval instant at 6m increase(metric[5m])
|
||||
{id="1"} 5
|
||||
{id="2"} 10
|
||||
{id="3"} 15
|
||||
|
||||
eval instant at 6m increase(metric[5m] smoothed)
|
||||
{id="1"} 5
|
||||
{id="2"} 10
|
||||
{id="3"} 15
|
||||
|
||||
eval instant at 5m increase(metric[5m] anchored)
|
||||
{id="1"} 5
|
||||
{id="2"} 10
|
||||
{id="3"} 15
|
@ -145,6 +145,8 @@ var (
|
||||
MixedExponentialCustomHistogramsWarning = fmt.Errorf("%w: vector contains a mix of histograms with exponential and custom buckets schemas for metric name", PromQLWarning)
|
||||
IncompatibleCustomBucketsHistogramsWarning = fmt.Errorf("%w: vector contains histograms with incompatible custom buckets for metric name", PromQLWarning)
|
||||
IncompatibleBucketLayoutInBinOpWarning = fmt.Errorf("%w: incompatible bucket layout encountered for binary operator", PromQLWarning)
|
||||
AnchoredWithUnsupportedFunctionWarning = fmt.Errorf("%w: anchored vector with unsupported function:", PromQLWarning)
|
||||
SmoothedWithUnsupportedFunctionWarning = fmt.Errorf("%w: smoothed vector with unsupported function:", PromQLWarning)
|
||||
|
||||
PossibleNonCounterInfo = fmt.Errorf("%w: metric might not be a counter, name does not end in _total/_sum/_count/_bucket:", PromQLInfo)
|
||||
PossibleNonCounterLabelInfo = fmt.Errorf("%w: metric might not be a counter, __type__ label is not set to %q or %q", PromQLInfo, model.MetricTypeCounter, model.MetricTypeHistogram)
|
||||
@ -348,3 +350,17 @@ func NewNativeHistogramFractionNaNsInfo(metricName string, pos posrange.Position
|
||||
Err: fmt.Errorf("%w %q", NativeHistogramFractionNaNsInfo, metricName),
|
||||
}
|
||||
}
|
||||
|
||||
func NewAnchoredWithUnsupportedFunctionWarning(function string, pos posrange.PositionRange) error {
|
||||
return annoErr{
|
||||
PositionRange: pos,
|
||||
Err: fmt.Errorf("%w %s", AnchoredWithUnsupportedFunctionWarning, function),
|
||||
}
|
||||
}
|
||||
|
||||
func NewSmoothedWithUnsupportedFunctionWarning(function string, pos posrange.PositionRange) error {
|
||||
return annoErr{
|
||||
PositionRange: pos,
|
||||
Err: fmt.Errorf("%w %s", SmoothedWithUnsupportedFunctionWarning, function),
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user