mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-04 12:01:06 +02:00
storage: introduce search interface with scoring and filtering
Signed-off-by: Julien Pivotto <291750+roidelapluie@users.noreply.github.com>
This commit is contained in:
parent
3b9caf6564
commit
f69db5bc54
@ -17,12 +17,18 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
// querierAdapter must implement the Searcher interface.
|
||||
var _ Searcher = &querierAdapter{}
|
||||
|
||||
type genericQuerier interface {
|
||||
LabelQuerier
|
||||
Select(context.Context, bool, *SelectHints, ...*labels.Matcher) genericSeriesSet
|
||||
@ -132,6 +138,382 @@ func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels {
|
||||
return a.VerticalChunkSeriesMergeFunc(buf...)
|
||||
}
|
||||
|
||||
// searcherFromGenericQuerier extracts a Searcher from a genericQuerierAdapter.
|
||||
// Falls back to a direct Searcher assertion if the querier is not a
|
||||
// genericQuerierAdapter.
|
||||
func searcherFromGenericQuerier(gq genericQuerier) (Searcher, bool) {
|
||||
if a, ok := gq.(*genericQuerierAdapter); ok {
|
||||
s, ok := a.q.(Searcher)
|
||||
return s, ok
|
||||
}
|
||||
s, ok := gq.(Searcher)
|
||||
return s, ok
|
||||
}
|
||||
|
||||
// collectSearchers extracts Searcher implementations from a genericQuerier tree.
|
||||
func collectSearchers(gq genericQuerier) []Searcher {
|
||||
if m, ok := gq.(*mergeGenericQuerier); ok {
|
||||
var searchers []Searcher
|
||||
for _, q := range m.queriers {
|
||||
searchers = append(searchers, collectSearchers(q)...)
|
||||
}
|
||||
return searchers
|
||||
}
|
||||
if s, ok := searcherFromGenericQuerier(gq); ok {
|
||||
return []Searcher{s}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sliceSearchResultSet is a SearchResultSet backed by a pre-built slice.
|
||||
type sliceSearchResultSet struct {
|
||||
results []SearchResult
|
||||
warnings annotations.Annotations
|
||||
idx int
|
||||
}
|
||||
|
||||
func (s *sliceSearchResultSet) Next() bool {
|
||||
s.idx++
|
||||
return s.idx < len(s.results)
|
||||
}
|
||||
|
||||
func (s *sliceSearchResultSet) At() SearchResult { return s.results[s.idx] }
|
||||
|
||||
func (s *sliceSearchResultSet) Warnings() annotations.Annotations { return s.warnings }
|
||||
|
||||
func (*sliceSearchResultSet) Err() error { return nil }
|
||||
|
||||
func (*sliceSearchResultSet) Close() error { return nil }
|
||||
|
||||
// NewSearchResultSetFromSlice returns a SearchResultSet that iterates over the given slice.
|
||||
func NewSearchResultSetFromSlice(results []SearchResult, warns annotations.Annotations) SearchResultSet {
|
||||
return &sliceSearchResultSet{results: results, warnings: warns, idx: -1}
|
||||
}
|
||||
|
||||
// ApplySearchHints filters, sorts, and limits a slice of string values according to hints,
|
||||
// returning scored SearchResult entries. A nil hints value is treated as the zero value.
|
||||
// The input values slice is assumed to be ordered ascending by value; the function only
|
||||
// performs extra work for orderings that differ from this.
|
||||
func ApplySearchHints(values []string, hints *SearchHints) []SearchResult {
|
||||
if hints == nil {
|
||||
hints = &SearchHints{}
|
||||
}
|
||||
results := make([]SearchResult, 0, len(values))
|
||||
for _, v := range values {
|
||||
if hints.Filter != nil {
|
||||
accepted, score := hints.Filter.Accept(v)
|
||||
if accepted {
|
||||
results = append(results, SearchResult{Value: v, Score: score})
|
||||
}
|
||||
} else {
|
||||
results = append(results, SearchResult{Value: v, Score: 1.0})
|
||||
}
|
||||
}
|
||||
switch hints.OrderBy {
|
||||
case OrderByValueAsc:
|
||||
// Input is already ascending; nothing to do.
|
||||
case OrderByValueDesc:
|
||||
slices.Reverse(results)
|
||||
case OrderByScoreDesc:
|
||||
slices.SortFunc(results, compareSearchResults(OrderByScoreDesc))
|
||||
}
|
||||
if hints.Limit > 0 && len(results) > hints.Limit {
|
||||
results = results[:hints.Limit]
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
// compareSearchResults returns the total-order comparison function for the
|
||||
// given Ordering. For OrderByValueAsc and OrderByValueDesc the order is on
|
||||
// Value alone. For OrderByScoreDesc the order is (Score desc, Value asc),
|
||||
// which is a total order and defines the position at which a duplicate value
|
||||
// is first emitted by the streaming merge.
|
||||
func compareSearchResults(o Ordering) func(a, b SearchResult) int {
|
||||
switch o {
|
||||
case OrderByValueDesc:
|
||||
return func(a, b SearchResult) int { return cmp.Compare(b.Value, a.Value) }
|
||||
case OrderByScoreDesc:
|
||||
return func(a, b SearchResult) int {
|
||||
if c := cmp.Compare(b.Score, a.Score); c != 0 {
|
||||
return c
|
||||
}
|
||||
return cmp.Compare(a.Value, b.Value)
|
||||
}
|
||||
default:
|
||||
return func(a, b SearchResult) int { return cmp.Compare(a.Value, b.Value) }
|
||||
}
|
||||
}
|
||||
|
||||
// mergeSearchSets merges results from multiple Searcher calls using a streaming
|
||||
// pairwise k-way merge. Each searcher is required to emit results in the order
|
||||
// requested by hints.OrderBy; the merge deduplicates by value so that a value
|
||||
// appearing in several sources is emitted once, carrying its highest score.
|
||||
func mergeSearchSets(hints *SearchHints, fn func(Searcher) SearchResultSet, searchers []Searcher) SearchResultSet {
|
||||
if len(searchers) == 0 {
|
||||
return EmptySearchResultSet()
|
||||
}
|
||||
|
||||
sets := make([]SearchResultSet, len(searchers))
|
||||
for i, s := range searchers {
|
||||
sets[i] = &lazySearchResultSet{init: func() SearchResultSet { return fn(s) }}
|
||||
}
|
||||
var (
|
||||
order Ordering
|
||||
limit int
|
||||
)
|
||||
if hints != nil {
|
||||
order = hints.OrderBy
|
||||
limit = hints.Limit
|
||||
}
|
||||
|
||||
// Duplicate Values collapse in place inside mergingSearchResultSet.
|
||||
// Under value-based orderings they are trivially adjacent. Under
|
||||
// OrderByScoreDesc the Searcher contract requires identical scores
|
||||
// for a given Value, so duplicates tie on (Score, Value) and are
|
||||
// adjacent there too.
|
||||
return pairwiseMergeSearchSets(sets, order, limit)
|
||||
}
|
||||
|
||||
// pairwiseMergeSearchSets recursively merges SearchResultSets in a balanced
|
||||
// binary tree. Each merge node respects the requested ordering and stops after
|
||||
// emitting limit results, enabling early termination that avoids consuming the
|
||||
// full input from child nodes.
|
||||
func pairwiseMergeSearchSets(sets []SearchResultSet, order Ordering, limit int) SearchResultSet {
|
||||
switch len(sets) {
|
||||
case 0:
|
||||
return EmptySearchResultSet()
|
||||
case 1:
|
||||
if limit > 0 {
|
||||
return &limitSearchResultSet{rs: sets[0], limit: limit}
|
||||
}
|
||||
return sets[0]
|
||||
default:
|
||||
mid := len(sets) / 2
|
||||
left := pairwiseMergeSearchSets(sets[:mid], order, limit)
|
||||
right := pairwiseMergeSearchSets(sets[mid:], order, limit)
|
||||
return newMergingSearchResultSet(left, right, order, limit)
|
||||
}
|
||||
}
|
||||
|
||||
// lazySearchResultSet defers the creation of a SearchResultSet until the first
|
||||
// call to Next. This avoids invoking searchers whose results are never consumed.
|
||||
type lazySearchResultSet struct {
|
||||
init func() SearchResultSet
|
||||
rs SearchResultSet
|
||||
}
|
||||
|
||||
func (s *lazySearchResultSet) ensure() {
|
||||
if s.rs == nil {
|
||||
s.rs = s.init()
|
||||
s.init = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *lazySearchResultSet) Next() bool {
|
||||
s.ensure()
|
||||
return s.rs.Next()
|
||||
}
|
||||
|
||||
func (s *lazySearchResultSet) At() SearchResult {
|
||||
if s.rs == nil {
|
||||
return SearchResult{}
|
||||
}
|
||||
return s.rs.At()
|
||||
}
|
||||
|
||||
func (s *lazySearchResultSet) Warnings() annotations.Annotations {
|
||||
if s.rs == nil {
|
||||
return nil
|
||||
}
|
||||
return s.rs.Warnings()
|
||||
}
|
||||
|
||||
func (s *lazySearchResultSet) Err() error {
|
||||
if s.rs == nil {
|
||||
return nil
|
||||
}
|
||||
return s.rs.Err()
|
||||
}
|
||||
|
||||
func (s *lazySearchResultSet) Close() error {
|
||||
if s.rs == nil {
|
||||
return nil
|
||||
}
|
||||
return s.rs.Close()
|
||||
}
|
||||
|
||||
// limitSearchResultSet wraps a SearchResultSet and stops after limit results.
|
||||
type limitSearchResultSet struct {
|
||||
rs SearchResultSet
|
||||
limit int
|
||||
emitted int
|
||||
}
|
||||
|
||||
func (s *limitSearchResultSet) Next() bool {
|
||||
if s.limit > 0 && s.emitted >= s.limit {
|
||||
return false
|
||||
}
|
||||
if s.rs.Next() {
|
||||
s.emitted++
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *limitSearchResultSet) At() SearchResult { return s.rs.At() }
|
||||
func (s *limitSearchResultSet) Warnings() annotations.Annotations { return s.rs.Warnings() }
|
||||
func (s *limitSearchResultSet) Err() error { return s.rs.Err() }
|
||||
func (s *limitSearchResultSet) Close() error { return s.rs.Close() }
|
||||
|
||||
// mergingSearchResultSet lazily merges two pre-sorted SearchResultSets using
|
||||
// the comparison function defined by order. Both inputs must yield results in
|
||||
// that order. Equal entries (same Value under value orderings, same
|
||||
// (Score, Value) under OrderByScoreDesc) collapse in place; under value
|
||||
// orderings the higher score wins.
|
||||
type mergingSearchResultSet struct {
|
||||
a, b SearchResultSet
|
||||
cmpFn func(a, b SearchResult) int
|
||||
valueOrder bool // true when order collapses adjacent duplicates by Value.
|
||||
limit int
|
||||
emitted int
|
||||
curr SearchResult
|
||||
aVal, bVal SearchResult
|
||||
aOk, bOk bool // Whether aVal/bVal hold a buffered value.
|
||||
aInit, bInit bool // Whether a/b have been advanced at least once.
|
||||
done bool
|
||||
}
|
||||
|
||||
func newMergingSearchResultSet(a, b SearchResultSet, order Ordering, limit int) *mergingSearchResultSet {
|
||||
return &mergingSearchResultSet{
|
||||
a: a,
|
||||
b: b,
|
||||
cmpFn: compareSearchResults(order),
|
||||
valueOrder: order == OrderByValueAsc || order == OrderByValueDesc,
|
||||
limit: limit,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *mergingSearchResultSet) Next() bool {
|
||||
if s.done {
|
||||
return false
|
||||
}
|
||||
if s.limit > 0 && s.emitted >= s.limit {
|
||||
s.done = true
|
||||
return false
|
||||
}
|
||||
|
||||
// Prime both sides on first call.
|
||||
if !s.aInit {
|
||||
s.aOk = s.a.Next()
|
||||
if s.aOk {
|
||||
s.aVal = s.a.At()
|
||||
}
|
||||
s.aInit = true
|
||||
}
|
||||
if !s.bInit {
|
||||
s.bOk = s.b.Next()
|
||||
if s.bOk {
|
||||
s.bVal = s.b.At()
|
||||
}
|
||||
s.bInit = true
|
||||
}
|
||||
|
||||
// Check for errors from either side after priming or after the
|
||||
// previous advance. An error means we should stop iteration.
|
||||
if s.a.Err() != nil || s.b.Err() != nil {
|
||||
s.done = true
|
||||
return false
|
||||
}
|
||||
|
||||
switch {
|
||||
case !s.aOk && !s.bOk:
|
||||
s.done = true
|
||||
return false
|
||||
case !s.aOk:
|
||||
s.curr = s.bVal
|
||||
s.bOk = s.b.Next()
|
||||
if s.bOk {
|
||||
s.bVal = s.b.At()
|
||||
}
|
||||
case !s.bOk:
|
||||
s.curr = s.aVal
|
||||
s.aOk = s.a.Next()
|
||||
if s.aOk {
|
||||
s.aVal = s.a.At()
|
||||
}
|
||||
default:
|
||||
// Under value-based orderings, equal-Value entries collapse in
|
||||
// place and keep the higher score. Under OrderByScoreDesc the
|
||||
// comparator tie-breaks on Value, so equal cmp means equal
|
||||
// (Score, Value) — collapsing is safe there too.
|
||||
c := s.cmpFn(s.aVal, s.bVal)
|
||||
switch {
|
||||
case c < 0:
|
||||
s.curr = s.aVal
|
||||
s.aOk = s.a.Next()
|
||||
if s.aOk {
|
||||
s.aVal = s.a.At()
|
||||
}
|
||||
case c > 0:
|
||||
s.curr = s.bVal
|
||||
s.bOk = s.b.Next()
|
||||
if s.bOk {
|
||||
s.bVal = s.b.At()
|
||||
}
|
||||
default:
|
||||
if s.valueOrder && s.bVal.Score > s.aVal.Score {
|
||||
s.curr = s.bVal
|
||||
} else {
|
||||
s.curr = s.aVal
|
||||
}
|
||||
s.aOk = s.a.Next()
|
||||
if s.aOk {
|
||||
s.aVal = s.a.At()
|
||||
}
|
||||
s.bOk = s.b.Next()
|
||||
if s.bOk {
|
||||
s.bVal = s.b.At()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.emitted++
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *mergingSearchResultSet) At() SearchResult { return s.curr }
|
||||
|
||||
func (s *mergingSearchResultSet) Warnings() annotations.Annotations {
|
||||
var ws annotations.Annotations
|
||||
ws.Merge(s.a.Warnings())
|
||||
ws.Merge(s.b.Warnings())
|
||||
return ws
|
||||
}
|
||||
|
||||
func (s *mergingSearchResultSet) Err() error {
|
||||
return errors.Join(s.a.Err(), s.b.Err())
|
||||
}
|
||||
|
||||
func (s *mergingSearchResultSet) Close() error {
|
||||
return errors.Join(s.a.Close(), s.b.Close())
|
||||
}
|
||||
|
||||
// SearchLabelNames implements Searcher by merging results from all underlying queriers
|
||||
// that support the Searcher interface.
|
||||
func (q *querierAdapter) SearchLabelNames(ctx context.Context, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet {
|
||||
return mergeSearchSets(hints, func(s Searcher) SearchResultSet {
|
||||
return s.SearchLabelNames(ctx, hints, matchers...)
|
||||
}, collectSearchers(q.genericQuerier))
|
||||
}
|
||||
|
||||
// SearchLabelValues implements Searcher by merging results from all underlying queriers
|
||||
// that support the Searcher interface.
|
||||
func (q *querierAdapter) SearchLabelValues(ctx context.Context, name string, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet {
|
||||
return mergeSearchSets(hints, func(s Searcher) SearchResultSet {
|
||||
return s.SearchLabelValues(ctx, name, hints, matchers...)
|
||||
}, collectSearchers(q.genericQuerier))
|
||||
}
|
||||
|
||||
type noopGenericSeriesSet struct{}
|
||||
|
||||
func (noopGenericSeriesSet) Next() bool { return false }
|
||||
|
||||
@ -249,13 +249,131 @@ type SelectHints struct {
|
||||
ProjectionInclude bool
|
||||
}
|
||||
|
||||
// Filter determines whether a value should be included in results.
|
||||
// Returns (accepted, score) where score is used for relevance ranking.
|
||||
// Score should be in range [0.0, 1.0] where 1.0 is perfect match.
|
||||
type Filter interface {
|
||||
Accept(value string) (accepted bool, score float64)
|
||||
}
|
||||
|
||||
// Ordering is a closed set of result orderings that searchers may natively
|
||||
// produce and that the merge layer can exploit for streaming k-way merges.
|
||||
// New orderings must be added explicitly here so that wire protocols,
|
||||
// downstream implementations, and merge strategies stay in sync.
|
||||
type Ordering uint8
|
||||
|
||||
const (
|
||||
// OrderByValueAsc orders results ascending by Value. This matches the
|
||||
// natural index order and is the zero value of Ordering.
|
||||
OrderByValueAsc Ordering = iota
|
||||
// OrderByValueDesc orders results descending by Value.
|
||||
OrderByValueDesc
|
||||
// OrderByScoreDesc orders results descending by Score, breaking ties
|
||||
// ascending by Value for determinism.
|
||||
OrderByScoreDesc
|
||||
)
|
||||
|
||||
// SearchResultSet is an iterator over search results.
|
||||
// Callers must call Close when done, regardless of whether all results were consumed.
|
||||
type SearchResultSet interface {
|
||||
// Next advances the iterator. Returns false when exhausted or on error.
|
||||
Next() bool
|
||||
// At returns the current search result. Must only be called after a successful Next.
|
||||
At() SearchResult
|
||||
// Warnings returns warnings accumulated so far.
|
||||
Warnings() annotations.Annotations
|
||||
// Err returns any error that caused iteration to stop.
|
||||
Err() error
|
||||
// Close releases resources associated with this result set.
|
||||
Close() error
|
||||
}
|
||||
|
||||
type emptySearchResultSet struct{}
|
||||
|
||||
func (emptySearchResultSet) Next() bool { return false }
|
||||
func (emptySearchResultSet) At() SearchResult { return SearchResult{} }
|
||||
func (emptySearchResultSet) Warnings() annotations.Annotations { return nil }
|
||||
func (emptySearchResultSet) Err() error { return nil }
|
||||
func (emptySearchResultSet) Close() error { return nil }
|
||||
|
||||
// EmptySearchResultSet returns a SearchResultSet that contains no results.
|
||||
func EmptySearchResultSet() SearchResultSet { return emptySearchResultSet{} }
|
||||
|
||||
type errSearchResultSet struct {
|
||||
err error
|
||||
warnings annotations.Annotations
|
||||
}
|
||||
|
||||
func (errSearchResultSet) Next() bool { return false }
|
||||
func (errSearchResultSet) At() SearchResult { return SearchResult{} }
|
||||
func (s errSearchResultSet) Warnings() annotations.Annotations { return s.warnings }
|
||||
func (s errSearchResultSet) Err() error { return s.err }
|
||||
func (errSearchResultSet) Close() error { return nil }
|
||||
|
||||
// ErrSearchResultSet returns a SearchResultSet that immediately returns the given error.
|
||||
// Any supplied annotations.Annotations are merged and exposed via Warnings, allowing
|
||||
// callers to surface warnings accumulated before the error occurred.
|
||||
func ErrSearchResultSet(err error, warnings ...annotations.Annotations) SearchResultSet {
|
||||
var warns annotations.Annotations
|
||||
for _, w := range warnings {
|
||||
warns.Merge(w)
|
||||
}
|
||||
return errSearchResultSet{err: err, warnings: warns}
|
||||
}
|
||||
|
||||
// LabelHints specifies hints passed for label reads.
|
||||
// This is used only as an option for implementation to use.
|
||||
// Results are returned in natural (alphabetical) order.
|
||||
type LabelHints struct {
|
||||
// Maximum number of results returned. Use a value of 0 to disable.
|
||||
Limit int
|
||||
}
|
||||
|
||||
// SearchHints configures search operations with filtering and scoring.
|
||||
// Unlike LabelHints, SearchHints is specifically designed for search APIs
|
||||
// that need relevance scoring and ranking.
|
||||
type SearchHints struct {
|
||||
// Filter determines which values to include and their relevance scores.
|
||||
// A nil Filter accepts all values and each result has a score of 1.0.
|
||||
Filter Filter
|
||||
|
||||
// Limit is the maximum number of results to return.
|
||||
// Use 0 to disable limiting.
|
||||
Limit int
|
||||
|
||||
// OrderBy selects the ordering of results. The zero value is
|
||||
// OrderByValueAsc, which matches the natural index order.
|
||||
OrderBy Ordering
|
||||
}
|
||||
|
||||
// SearchResult represents a single search result with its relevance score.
|
||||
type SearchResult struct {
|
||||
// Value is the label name or label value.
|
||||
Value string
|
||||
|
||||
// Score represents relevance, with 1.0 being a perfect match.
|
||||
// Score range is [0.0, 1.0].
|
||||
Score float64
|
||||
}
|
||||
|
||||
// Searcher provides search capabilities with relevance scoring.
|
||||
// This interface is designed for autocomplete and search UIs that need
|
||||
// to rank results by relevance rather than just filter them.
|
||||
//
|
||||
// For a given Value and SearchHints.Filter, Score must be deterministic
|
||||
// and identical across Searcher implementations.
|
||||
type Searcher interface {
|
||||
// SearchLabelNames returns an iterator over label names matching the search criteria.
|
||||
// Results include relevance scores based on the Filter.
|
||||
// The caller must call Close on the returned SearchResultSet when done.
|
||||
SearchLabelNames(ctx context.Context, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet
|
||||
|
||||
// SearchLabelValues returns an iterator over label values for the given label name.
|
||||
// Results include relevance scores based on the Filter.
|
||||
// The caller must call Close on the returned SearchResultSet when done.
|
||||
SearchLabelValues(ctx context.Context, name string, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet
|
||||
}
|
||||
|
||||
// QueryableFunc is an adapter to allow the use of ordinary functions as
|
||||
// Queryables. It follows the idea of http.HandlerFunc.
|
||||
// TODO(bwplotka): Move to promql/engine_test.go?
|
||||
|
||||
@ -1723,3 +1723,491 @@ func (errIterator) AtST() int64 {
|
||||
func (e errIterator) Err() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// searchQuerier is a Querier that also implements Searcher, for testing merge behaviour.
|
||||
type searchQuerier struct {
|
||||
mockQuerier
|
||||
names []SearchResult
|
||||
values []SearchResult
|
||||
searchWarns annotations.Annotations
|
||||
err error
|
||||
}
|
||||
|
||||
func (q *searchQuerier) SearchLabelNames(_ context.Context, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet {
|
||||
if q.err != nil {
|
||||
return ErrSearchResultSet(q.err)
|
||||
}
|
||||
return NewSearchResultSetFromSlice(q.names, q.searchWarns)
|
||||
}
|
||||
|
||||
func (q *searchQuerier) SearchLabelValues(_ context.Context, _ string, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet {
|
||||
if q.err != nil {
|
||||
return ErrSearchResultSet(q.err)
|
||||
}
|
||||
return NewSearchResultSetFromSlice(q.values, q.searchWarns)
|
||||
}
|
||||
|
||||
// partialErrSearchQuerier is a Querier+Searcher that yields partial results
|
||||
// then returns an error, for testing secondary querier mid-stream failure.
|
||||
type partialErrSearchQuerier struct {
|
||||
mockQuerier
|
||||
names []SearchResult
|
||||
values []SearchResult
|
||||
err error
|
||||
}
|
||||
|
||||
func (q *partialErrSearchQuerier) SearchLabelNames(_ context.Context, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet {
|
||||
return newErrAfterResultsSet(q.names, q.err)
|
||||
}
|
||||
|
||||
func (q *partialErrSearchQuerier) SearchLabelValues(_ context.Context, _ string, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet {
|
||||
return newErrAfterResultsSet(q.values, q.err)
|
||||
}
|
||||
|
||||
// errAfterResultsSet is a SearchResultSet that yields results then returns an error.
|
||||
type errAfterResultsSet struct {
|
||||
results []SearchResult
|
||||
idx int // starts at -1; incremented by Next.
|
||||
err error
|
||||
}
|
||||
|
||||
func newErrAfterResultsSet(results []SearchResult, err error) *errAfterResultsSet {
|
||||
return &errAfterResultsSet{results: results, err: err, idx: -1}
|
||||
}
|
||||
|
||||
func (s *errAfterResultsSet) Next() bool {
|
||||
s.idx++
|
||||
return s.idx < len(s.results)
|
||||
}
|
||||
|
||||
func (s *errAfterResultsSet) At() SearchResult { return s.results[s.idx] }
|
||||
|
||||
func (*errAfterResultsSet) Warnings() annotations.Annotations { return nil }
|
||||
|
||||
func (s *errAfterResultsSet) Err() error {
|
||||
if s.idx >= len(s.results) {
|
||||
return s.err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*errAfterResultsSet) Close() error { return nil }
|
||||
|
||||
func collectSearchResults(t *testing.T, rs SearchResultSet) []SearchResult {
|
||||
t.Helper()
|
||||
var got []SearchResult
|
||||
for rs.Next() {
|
||||
got = append(got, rs.At())
|
||||
}
|
||||
require.NoError(t, rs.Err())
|
||||
require.NoError(t, rs.Close())
|
||||
return got
|
||||
}
|
||||
|
||||
func TestMergeQuerierSearch(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
// newMerged creates a merged querier from two queriers, ensuring the querierAdapter
|
||||
// path is always taken (NewMergeQuerier returns the single querier directly if only
|
||||
// one is provided).
|
||||
newMerged := func(a, b Querier) Querier {
|
||||
return NewMergeQuerier([]Querier{a, b}, nil, ChainedSeriesMerge)
|
||||
}
|
||||
|
||||
t.Run("results from both searchers are merged", func(t *testing.T) {
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 0.9}},
|
||||
values: []SearchResult{{Value: "prod", Score: 1.0}},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "job", Score: 0.5}},
|
||||
values: []SearchResult{{Value: "dev", Score: 0.8}},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Len(t, got, 2)
|
||||
|
||||
got = collectSearchResults(t, merged.(Searcher).SearchLabelValues(ctx, "env", nil))
|
||||
require.Len(t, got, 2)
|
||||
})
|
||||
|
||||
t.Run("duplicate values keep max score", func(t *testing.T) {
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 0.6}, {Value: "job", Score: 0.9}},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 0.8}, {Value: "region", Score: 0.5}},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Len(t, got, 3)
|
||||
scores := make(map[string]float64, len(got))
|
||||
for _, r := range got {
|
||||
scores[r.Value] = r.Score
|
||||
}
|
||||
// "env" appears in both; max score wins.
|
||||
require.Equal(t, 0.8, scores["env"])
|
||||
require.Equal(t, 0.9, scores["job"])
|
||||
require.Equal(t, 0.5, scores["region"])
|
||||
})
|
||||
|
||||
t.Run("natural order is preserved for merged searchers", func(t *testing.T) {
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "region", Score: 0.5}, {Value: "zone", Score: 0.4}},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 0.8}, {Value: "job", Score: 0.9}},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Equal(t, []SearchResult{
|
||||
{Value: "env", Score: 0.8},
|
||||
{Value: "job", Score: 0.9},
|
||||
{Value: "region", Score: 0.5},
|
||||
{Value: "zone", Score: 0.4},
|
||||
}, got)
|
||||
})
|
||||
|
||||
t.Run("secondary search errors become warnings", func(t *testing.T) {
|
||||
primary := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 1.0}},
|
||||
}
|
||||
secondary := &searchQuerier{err: errors.New("secondary search failed")}
|
||||
merged := NewMergeQuerier([]Querier{primary}, []Querier{secondary}, ChainedSeriesMerge)
|
||||
defer merged.Close()
|
||||
|
||||
rs := merged.(Searcher).SearchLabelNames(ctx, nil)
|
||||
got := collectSearchResults(t, rs)
|
||||
require.Equal(t, []SearchResult{{Value: "env", Score: 1.0}}, got)
|
||||
warnings := rs.Warnings().AsErrors()
|
||||
require.Len(t, warnings, 1)
|
||||
require.Contains(t, warnings[0].Error(), "secondary search failed")
|
||||
})
|
||||
|
||||
t.Run("secondary partial label names followed by error become warnings", func(t *testing.T) {
|
||||
primary := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 1.0}},
|
||||
}
|
||||
secondary := &partialErrSearchQuerier{
|
||||
names: []SearchResult{{Value: "zone", Score: 0.5}},
|
||||
err: errors.New("partial secondary failure"),
|
||||
}
|
||||
merged := NewMergeQuerier([]Querier{primary}, []Querier{secondary}, ChainedSeriesMerge)
|
||||
defer merged.Close()
|
||||
|
||||
rs := merged.(Searcher).SearchLabelNames(ctx, nil)
|
||||
for rs.Next() {
|
||||
}
|
||||
require.NoError(t, rs.Err())
|
||||
warnings := rs.Warnings().AsErrors()
|
||||
require.Len(t, warnings, 1)
|
||||
require.Contains(t, warnings[0].Error(), "partial secondary failure")
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
||||
t.Run("warnings accessible after early Close", func(t *testing.T) {
|
||||
// Exercises the internal warningsOnErrorSearchSet lifecycle when
|
||||
// the caller closes before exhaustion. This cannot be tested
|
||||
// through the public API because the error-to-warning conversion
|
||||
// only fires on exhaustion.
|
||||
inner := newErrAfterResultsSet(
|
||||
[]SearchResult{{Value: "zone", Score: 0.5}},
|
||||
errors.New("early close failure"),
|
||||
)
|
||||
rs := warningsOnErrorSearchResultSet(inner)
|
||||
require.True(t, rs.Next())
|
||||
// Do not call Next again; close early.
|
||||
require.NoError(t, rs.Close())
|
||||
// Inner set has no warnings yet (error only fires after exhaustion),
|
||||
// but Close must not panic and Warnings must be callable.
|
||||
require.NoError(t, rs.Err())
|
||||
_ = rs.Warnings()
|
||||
// Calling Close a second time must be a no-op.
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
||||
t.Run("secondary partial label values followed by error become warnings", func(t *testing.T) {
|
||||
primary := &searchQuerier{
|
||||
values: []SearchResult{{Value: "prod", Score: 1.0}},
|
||||
}
|
||||
secondary := &partialErrSearchQuerier{
|
||||
values: []SearchResult{{Value: "staging", Score: 0.5}},
|
||||
err: errors.New("partial secondary failure"),
|
||||
}
|
||||
merged := NewMergeQuerier([]Querier{primary}, []Querier{secondary}, ChainedSeriesMerge)
|
||||
defer merged.Close()
|
||||
|
||||
rs := merged.(Searcher).SearchLabelValues(ctx, "env", nil)
|
||||
for rs.Next() {
|
||||
}
|
||||
require.NoError(t, rs.Err())
|
||||
warnings := rs.Warnings().AsErrors()
|
||||
require.Len(t, warnings, 1)
|
||||
require.Contains(t, warnings[0].Error(), "partial secondary failure")
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
||||
t.Run("non-searcher querier is skipped", func(t *testing.T) {
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 1.0}},
|
||||
}
|
||||
q2 := &mockQuerier{resp: []string{"should_be_ignored"}}
|
||||
// Verify precondition: mockQuerier does not implement Searcher.
|
||||
_, isSearcher := (Querier)(q2).(Searcher)
|
||||
require.False(t, isSearcher)
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Len(t, got, 1)
|
||||
require.Equal(t, "env", got[0].Value)
|
||||
})
|
||||
|
||||
t.Run("no searcher queriers returns empty", func(t *testing.T) {
|
||||
// Two non-searcher queriers force the querierAdapter path.
|
||||
q1 := &mockQuerier{resp: []string{"a", "b"}}
|
||||
q2 := &mockQuerier{resp: []string{"c", "d"}}
|
||||
// Verify precondition.
|
||||
_, isSearcher := (Querier)(q1).(Searcher)
|
||||
require.False(t, isSearcher)
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Empty(t, got)
|
||||
})
|
||||
|
||||
t.Run("OrderByScoreDesc overrides natural ordering", func(t *testing.T) {
|
||||
// Each searcher must emit in the requested order; score-desc
|
||||
// rearranges the overall result stream. The inputs below are
|
||||
// already score-desc within each searcher.
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "job", Score: 0.9}, {Value: "env", Score: 0.3}},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "region", Score: 0.5}},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
hints := &SearchHints{OrderBy: OrderByScoreDesc}
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, hints))
|
||||
require.Equal(t, []SearchResult{
|
||||
{Value: "job", Score: 0.9},
|
||||
{Value: "region", Score: 0.5},
|
||||
{Value: "env", Score: 0.3},
|
||||
}, got)
|
||||
})
|
||||
|
||||
t.Run("OrderByScoreDesc error preserves warnings", func(t *testing.T) {
|
||||
var ws annotations.Annotations
|
||||
ws.Add(errors.New("prior warning"))
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 1.0}},
|
||||
searchWarns: ws,
|
||||
}
|
||||
q2 := &searchQuerier{err: errors.New("score path failure")}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
hints := &SearchHints{OrderBy: OrderByScoreDesc}
|
||||
rs := merged.(Searcher).SearchLabelNames(ctx, hints)
|
||||
// Drain to completion so that the failing searcher's error
|
||||
// surfaces. Streaming may emit some results before the error.
|
||||
for rs.Next() {
|
||||
}
|
||||
require.Error(t, rs.Err())
|
||||
require.Contains(t, rs.Err().Error(), "score path failure")
|
||||
warnings := rs.Warnings().AsErrors()
|
||||
require.Len(t, warnings, 1)
|
||||
require.Contains(t, warnings[0].Error(), "prior warning")
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
||||
t.Run("OrderByValueDesc reverses natural ordering", func(t *testing.T) {
|
||||
// Each searcher must emit in descending-Value order.
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "job", Score: 1.0}, {Value: "env", Score: 1.0}},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "region", Score: 1.0}, {Value: "cluster", Score: 1.0}},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
hints := &SearchHints{OrderBy: OrderByValueDesc}
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, hints))
|
||||
require.Equal(t, []SearchResult{
|
||||
{Value: "region", Score: 1.0},
|
||||
{Value: "job", Score: 1.0},
|
||||
{Value: "env", Score: 1.0},
|
||||
{Value: "cluster", Score: 1.0},
|
||||
}, got)
|
||||
})
|
||||
|
||||
t.Run("primary error preserves warnings from prior searchers", func(t *testing.T) {
|
||||
var ws annotations.Annotations
|
||||
ws.Add(errors.New("prior warning"))
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "env", Score: 1.0}},
|
||||
searchWarns: ws,
|
||||
}
|
||||
q2 := &searchQuerier{err: errors.New("primary failure")}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
rs := merged.(Searcher).SearchLabelNames(ctx, nil)
|
||||
// Iteration should see no results because the merge errored.
|
||||
require.False(t, rs.Next())
|
||||
require.Error(t, rs.Err())
|
||||
require.Contains(t, rs.Err().Error(), "primary failure")
|
||||
// Warnings from the successful first searcher must be preserved.
|
||||
warnings := rs.Warnings().AsErrors()
|
||||
require.Len(t, warnings, 1)
|
||||
require.Contains(t, warnings[0].Error(), "prior warning")
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
||||
t.Run("limit is applied at merge level", func(t *testing.T) {
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "a", Score: 1.0}, {Value: "b", Score: 0.9}},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{{Value: "c", Score: 0.8}},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, &SearchHints{Limit: 2}))
|
||||
require.Len(t, got, 2)
|
||||
// Alphabetical order, limited to 2.
|
||||
require.Equal(t, "a", got[0].Value)
|
||||
require.Equal(t, "b", got[1].Value)
|
||||
})
|
||||
|
||||
t.Run("empty searchers", func(t *testing.T) {
|
||||
q1 := &searchQuerier{}
|
||||
q2 := &searchQuerier{}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Empty(t, got)
|
||||
})
|
||||
|
||||
t.Run("single searcher passthrough", func(t *testing.T) {
|
||||
// NewMergeQuerier with one querier returns it directly, so use two
|
||||
// where one is empty to force the merge path.
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{
|
||||
{Value: "a", Score: 1.0},
|
||||
{Value: "b", Score: 1.0},
|
||||
{Value: "c", Score: 1.0},
|
||||
},
|
||||
}
|
||||
q2 := &searchQuerier{}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Equal(t, []SearchResult{
|
||||
{Value: "a", Score: 1.0},
|
||||
{Value: "b", Score: 1.0},
|
||||
{Value: "c", Score: 1.0},
|
||||
}, got)
|
||||
})
|
||||
|
||||
t.Run("overlapping values are deduplicated", func(t *testing.T) {
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{
|
||||
{Value: "a", Score: 1.0},
|
||||
{Value: "b", Score: 1.0},
|
||||
},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{
|
||||
{Value: "b", Score: 1.0},
|
||||
{Value: "c", Score: 1.0},
|
||||
},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil))
|
||||
require.Equal(t, []SearchResult{
|
||||
{Value: "a", Score: 1.0},
|
||||
{Value: "b", Score: 1.0},
|
||||
{Value: "c", Score: 1.0},
|
||||
}, got)
|
||||
})
|
||||
|
||||
t.Run("interleaved values with limit", func(t *testing.T) {
|
||||
q1 := &searchQuerier{
|
||||
names: []SearchResult{
|
||||
{Value: "a", Score: 1.0},
|
||||
{Value: "c", Score: 1.0},
|
||||
{Value: "e", Score: 1.0},
|
||||
},
|
||||
}
|
||||
q2 := &searchQuerier{
|
||||
names: []SearchResult{
|
||||
{Value: "b", Score: 1.0},
|
||||
{Value: "d", Score: 1.0},
|
||||
},
|
||||
}
|
||||
merged := newMerged(q1, q2)
|
||||
defer merged.Close()
|
||||
|
||||
got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, &SearchHints{Limit: 3}))
|
||||
require.Equal(t, []SearchResult{
|
||||
{Value: "a", Score: 1.0},
|
||||
{Value: "b", Score: 1.0},
|
||||
{Value: "c", Score: 1.0},
|
||||
}, got)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkMergeSearchSets(b *testing.B) {
|
||||
// Build N searchers each returning M sorted results with no overlap.
|
||||
const nSearchers = 4
|
||||
const resultsPerSearcher = 10000
|
||||
queriers := make([]Querier, nSearchers)
|
||||
for i := range nSearchers {
|
||||
names := make([]SearchResult, resultsPerSearcher)
|
||||
for j := range resultsPerSearcher {
|
||||
names[j] = SearchResult{Value: fmt.Sprintf("label_%04d_%06d", i, j), Score: 1.0}
|
||||
}
|
||||
queriers[i] = &searchQuerier{names: names}
|
||||
}
|
||||
|
||||
b.Run("no_limit", func(b *testing.B) {
|
||||
for b.Loop() {
|
||||
merged := NewMergeQuerier(queriers, nil, ChainedSeriesMerge)
|
||||
rs := merged.(Searcher).SearchLabelNames(context.Background(), nil)
|
||||
for rs.Next() {
|
||||
}
|
||||
require.NoError(b, rs.Err())
|
||||
rs.Close()
|
||||
merged.Close()
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("limit_100", func(b *testing.B) {
|
||||
hints := &SearchHints{Limit: 100}
|
||||
for b.Loop() {
|
||||
merged := NewMergeQuerier(queriers, nil, ChainedSeriesMerge)
|
||||
rs := merged.(Searcher).SearchLabelNames(context.Background(), hints)
|
||||
for rs.Next() {
|
||||
}
|
||||
require.NoError(b, rs.Err())
|
||||
rs.Close()
|
||||
merged.Close()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -43,6 +43,16 @@ func (noopQuerier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (noopQuerier) SearchLabelNames(_ context.Context, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (noopQuerier) SearchLabelValues(_ context.Context, _ string, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ Searcher = noopQuerier{}
|
||||
|
||||
type noopChunkQuerier struct{}
|
||||
|
||||
// NoopChunkedQuerier is a ChunkQuerier that does nothing.
|
||||
|
||||
@ -21,6 +21,9 @@ import (
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
// secondaryQuerier must implement the Searcher interface.
|
||||
var _ Searcher = (*secondaryQuerier)(nil)
|
||||
|
||||
// secondaryQuerier is a wrapper that allows a querier to be treated in a best effort manner.
|
||||
// This means that an error on any method returned by Querier except Close will be returned as a warning,
|
||||
// and the result will be empty.
|
||||
@ -65,6 +68,24 @@ func (s *secondaryQuerier) LabelNames(ctx context.Context, hints *LabelHints, ma
|
||||
return names, w, nil
|
||||
}
|
||||
|
||||
// SearchLabelNames returns search results from the wrapped querier and converts errors into warnings.
|
||||
func (s *secondaryQuerier) SearchLabelNames(ctx context.Context, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet {
|
||||
searcher, ok := searcherFromGenericQuerier(s.genericQuerier)
|
||||
if !ok {
|
||||
return EmptySearchResultSet()
|
||||
}
|
||||
return warningsOnErrorSearchResultSet(searcher.SearchLabelNames(ctx, hints, matchers...))
|
||||
}
|
||||
|
||||
// SearchLabelValues returns search results from the wrapped querier and converts errors into warnings.
|
||||
func (s *secondaryQuerier) SearchLabelValues(ctx context.Context, name string, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet {
|
||||
searcher, ok := searcherFromGenericQuerier(s.genericQuerier)
|
||||
if !ok {
|
||||
return EmptySearchResultSet()
|
||||
}
|
||||
return warningsOnErrorSearchResultSet(searcher.SearchLabelValues(ctx, name, hints, matchers...))
|
||||
}
|
||||
|
||||
func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
if s.done {
|
||||
panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done")
|
||||
@ -107,3 +128,56 @@ func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *S
|
||||
}
|
||||
}}
|
||||
}
|
||||
|
||||
type warningsOnErrorSearchSet struct {
|
||||
rs SearchResultSet
|
||||
warnings annotations.Annotations
|
||||
}
|
||||
|
||||
// warningsOnErrorSearchResultSet wraps rs so that an iteration error is surfaced as a warning.
|
||||
func warningsOnErrorSearchResultSet(rs SearchResultSet) SearchResultSet {
|
||||
return &warningsOnErrorSearchSet{rs: rs}
|
||||
}
|
||||
|
||||
func (s *warningsOnErrorSearchSet) Next() bool {
|
||||
if s.rs == nil {
|
||||
return false
|
||||
}
|
||||
if s.rs.Next() {
|
||||
return true
|
||||
}
|
||||
if err := s.rs.Err(); err != nil {
|
||||
var ws annotations.Annotations
|
||||
ws.Merge(s.rs.Warnings())
|
||||
s.warnings = ws.Add(err)
|
||||
_ = s.rs.Close()
|
||||
s.rs = nil
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *warningsOnErrorSearchSet) At() SearchResult {
|
||||
if s.rs == nil {
|
||||
return SearchResult{}
|
||||
}
|
||||
return s.rs.At()
|
||||
}
|
||||
|
||||
func (s *warningsOnErrorSearchSet) Warnings() annotations.Annotations {
|
||||
if s.rs != nil {
|
||||
return s.rs.Warnings()
|
||||
}
|
||||
return s.warnings
|
||||
}
|
||||
|
||||
func (*warningsOnErrorSearchSet) Err() error { return nil }
|
||||
|
||||
func (s *warningsOnErrorSearchSet) Close() error {
|
||||
if s.rs == nil {
|
||||
return nil
|
||||
}
|
||||
s.warnings = s.rs.Warnings()
|
||||
err := s.rs.Close()
|
||||
s.rs = nil
|
||||
return err
|
||||
}
|
||||
|
||||
@ -562,6 +562,8 @@ type HeadAndOOOQuerier struct {
|
||||
querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end.
|
||||
}
|
||||
|
||||
var _ storage.Searcher = &HeadAndOOOQuerier{}
|
||||
|
||||
func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
|
||||
cr := &headChunkReader{
|
||||
head: head,
|
||||
@ -593,6 +595,28 @@ func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.Label
|
||||
return q.querier.LabelNames(ctx, hints, matchers...)
|
||||
}
|
||||
|
||||
// SearchLabelNames implements storage.Searcher by delegating to the inner querier.
|
||||
func (q *HeadAndOOOQuerier) SearchLabelNames(ctx context.Context, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet {
|
||||
if q.querier == nil {
|
||||
return storage.EmptySearchResultSet()
|
||||
}
|
||||
if s, ok := q.querier.(storage.Searcher); ok {
|
||||
return s.SearchLabelNames(ctx, hints, matchers...)
|
||||
}
|
||||
return storage.EmptySearchResultSet()
|
||||
}
|
||||
|
||||
// SearchLabelValues implements storage.Searcher by delegating to the inner querier.
|
||||
func (q *HeadAndOOOQuerier) SearchLabelValues(ctx context.Context, name string, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet {
|
||||
if q.querier == nil {
|
||||
return storage.EmptySearchResultSet()
|
||||
}
|
||||
if s, ok := q.querier.(storage.Searcher); ok {
|
||||
return s.SearchLabelValues(ctx, name, hints, matchers...)
|
||||
}
|
||||
return storage.EmptySearchResultSet()
|
||||
}
|
||||
|
||||
func (q *HeadAndOOOQuerier) Close() error {
|
||||
q.chunkr.Close()
|
||||
if q.querier == nil {
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
"github.com/prometheus/prometheus/util/compression"
|
||||
)
|
||||
|
||||
@ -1114,3 +1115,112 @@ func TestSortMetaByMinTimeAndMinRef(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// mockSearchQuerier is a minimal storage.Querier that also implements storage.Searcher.
|
||||
type mockSearchQuerier struct {
|
||||
labelNames []string
|
||||
labelValues []string
|
||||
}
|
||||
|
||||
func (*mockSearchQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.EmptySeriesSet()
|
||||
}
|
||||
|
||||
func (m *mockSearchQuerier) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return m.labelValues, nil, nil
|
||||
}
|
||||
|
||||
func (m *mockSearchQuerier) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return m.labelNames, nil, nil
|
||||
}
|
||||
|
||||
func (*mockSearchQuerier) Close() error { return nil }
|
||||
|
||||
func (m *mockSearchQuerier) SearchLabelNames(_ context.Context, _ *storage.SearchHints, _ ...*labels.Matcher) storage.SearchResultSet {
|
||||
results := make([]storage.SearchResult, len(m.labelNames))
|
||||
for i, n := range m.labelNames {
|
||||
results[i] = storage.SearchResult{Value: n, Score: 1.0}
|
||||
}
|
||||
return storage.NewSearchResultSetFromSlice(results, nil)
|
||||
}
|
||||
|
||||
func (m *mockSearchQuerier) SearchLabelValues(_ context.Context, _ string, _ *storage.SearchHints, _ ...*labels.Matcher) storage.SearchResultSet {
|
||||
results := make([]storage.SearchResult, len(m.labelValues))
|
||||
for i, v := range m.labelValues {
|
||||
results[i] = storage.SearchResult{Value: v, Score: 1.0}
|
||||
}
|
||||
return storage.NewSearchResultSetFromSlice(results, nil)
|
||||
}
|
||||
|
||||
// mockSearchQuerierNoSearch is a storage.Querier that does not implement storage.Searcher.
|
||||
type mockSearchQuerierNoSearch struct{}
|
||||
|
||||
func (mockSearchQuerierNoSearch) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.EmptySeriesSet()
|
||||
}
|
||||
|
||||
func (mockSearchQuerierNoSearch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (mockSearchQuerierNoSearch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (mockSearchQuerierNoSearch) Close() error { return nil }
|
||||
|
||||
func TestHeadAndOOOQuerierSearch(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("nil inner querier returns empty", func(t *testing.T) {
|
||||
q := &HeadAndOOOQuerier{}
|
||||
rs := q.SearchLabelNames(ctx, nil)
|
||||
require.False(t, rs.Next())
|
||||
require.NoError(t, rs.Err())
|
||||
require.NoError(t, rs.Close())
|
||||
|
||||
rs = q.SearchLabelValues(ctx, "env", nil)
|
||||
require.False(t, rs.Next())
|
||||
require.NoError(t, rs.Err())
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
|
||||
t.Run("delegates to inner searcher", func(t *testing.T) {
|
||||
inner := &mockSearchQuerier{
|
||||
labelNames: []string{"env", "job"},
|
||||
labelValues: []string{"prod", "dev"},
|
||||
}
|
||||
q := &HeadAndOOOQuerier{querier: inner}
|
||||
|
||||
rs := q.SearchLabelNames(ctx, nil)
|
||||
var names []string
|
||||
for rs.Next() {
|
||||
names = append(names, rs.At().Value)
|
||||
}
|
||||
require.NoError(t, rs.Err())
|
||||
require.NoError(t, rs.Close())
|
||||
require.Equal(t, []string{"env", "job"}, names)
|
||||
|
||||
rs = q.SearchLabelValues(ctx, "env", nil)
|
||||
var values []string
|
||||
for rs.Next() {
|
||||
values = append(values, rs.At().Value)
|
||||
}
|
||||
require.NoError(t, rs.Err())
|
||||
require.NoError(t, rs.Close())
|
||||
require.Equal(t, []string{"prod", "dev"}, values)
|
||||
})
|
||||
|
||||
t.Run("non-searcher inner querier returns empty", func(t *testing.T) {
|
||||
inner := mockSearchQuerierNoSearch{}
|
||||
q := &HeadAndOOOQuerier{querier: inner}
|
||||
|
||||
rs := q.SearchLabelNames(ctx, nil)
|
||||
require.False(t, rs.Next())
|
||||
require.NoError(t, rs.Close())
|
||||
|
||||
rs = q.SearchLabelValues(ctx, "env", nil)
|
||||
require.False(t, rs.Next())
|
||||
require.NoError(t, rs.Close())
|
||||
})
|
||||
}
|
||||
|
||||
@ -46,6 +46,8 @@ type blockBaseQuerier struct {
|
||||
mint, maxt int64
|
||||
}
|
||||
|
||||
var _ storage.Searcher = &blockBaseQuerier{}
|
||||
|
||||
func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
|
||||
indexr, err := b.Index()
|
||||
if err != nil {
|
||||
@ -94,6 +96,49 @@ func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelH
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
// SearchLabelNames implements storage.Searcher.
|
||||
func (q *blockBaseQuerier) SearchLabelNames(ctx context.Context, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet {
|
||||
names, err := q.index.LabelNames(ctx, matchers...)
|
||||
if err != nil {
|
||||
return storage.ErrSearchResultSet(err)
|
||||
}
|
||||
|
||||
return storage.NewSearchResultSetFromSlice(storage.ApplySearchHints(names, hints), nil)
|
||||
}
|
||||
|
||||
// SearchLabelValues implements storage.Searcher.
|
||||
func (q *blockBaseQuerier) SearchLabelValues(ctx context.Context, name string, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet {
|
||||
if hints == nil {
|
||||
hints = &storage.SearchHints{}
|
||||
}
|
||||
|
||||
// Limit pushdown is only correct when natural (ascending) index order
|
||||
// is preserved all the way to the output and no filtering discards
|
||||
// values ahead of the limit.
|
||||
labelHints := &storage.LabelHints{}
|
||||
if hints.OrderBy == storage.OrderByValueAsc && hints.Filter == nil {
|
||||
labelHints.Limit = hints.Limit
|
||||
}
|
||||
|
||||
var (
|
||||
values []string
|
||||
err error
|
||||
)
|
||||
switch hints.OrderBy {
|
||||
case storage.OrderByScoreDesc:
|
||||
// Score-based sorting happens in ApplySearchHints; avoid the
|
||||
// index-level sort.
|
||||
values, err = q.index.LabelValues(ctx, name, labelHints, matchers...)
|
||||
default:
|
||||
values, err = q.index.SortedLabelValues(ctx, name, labelHints, matchers...)
|
||||
}
|
||||
if err != nil {
|
||||
return storage.ErrSearchResultSet(err)
|
||||
}
|
||||
|
||||
return storage.NewSearchResultSetFromSlice(storage.ApplySearchHints(values, hints), nil)
|
||||
}
|
||||
|
||||
func (q *blockBaseQuerier) Close() error {
|
||||
if q.closed {
|
||||
return errors.New("block querier already closed")
|
||||
|
||||
@ -4046,3 +4046,138 @@ func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) {
|
||||
|
||||
require.Equal(t, originalMatchers, matchers)
|
||||
}
|
||||
|
||||
// prefixFilter accepts values that start with the given prefix and scores them 1.0.
|
||||
type prefixFilter struct{ prefix string }
|
||||
|
||||
func (f prefixFilter) Accept(v string) (bool, float64) {
|
||||
if strings.HasPrefix(v, f.prefix) {
|
||||
return true, 1.0
|
||||
}
|
||||
return false, 0
|
||||
}
|
||||
|
||||
func newBlockBaseQuerierForSearch(t *testing.T, labelSets ...labels.Labels) *blockBaseQuerier {
|
||||
t.Helper()
|
||||
ix := newMockIndex()
|
||||
// Group series refs by label for writing postings.
|
||||
postings := make(map[labels.Label][]storage.SeriesRef)
|
||||
for i, ls := range labelSets {
|
||||
ref := storage.SeriesRef(i)
|
||||
require.NoError(t, ix.AddSeries(ref, ls))
|
||||
ls.Range(func(lbl labels.Label) {
|
||||
postings[lbl] = append(postings[lbl], ref)
|
||||
})
|
||||
}
|
||||
for lbl, refs := range postings {
|
||||
require.NoError(t, ix.WritePostings(lbl.Name, lbl.Value, index.NewListPostings(refs)))
|
||||
}
|
||||
return &blockBaseQuerier{index: ix, chunks: nil, tombstones: tombstones.NewMemTombstones()}
|
||||
}
|
||||
|
||||
func collectSearchResultSet(t *testing.T, rs storage.SearchResultSet) []storage.SearchResult {
|
||||
t.Helper()
|
||||
var got []storage.SearchResult
|
||||
for rs.Next() {
|
||||
got = append(got, rs.At())
|
||||
}
|
||||
require.NoError(t, rs.Err())
|
||||
require.NoError(t, rs.Close())
|
||||
return got
|
||||
}
|
||||
|
||||
func TestBlockBaseQuerierSearchLabelNames(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
q := newBlockBaseQuerierForSearch(t,
|
||||
labels.FromStrings("env", "prod", "job", "api"),
|
||||
labels.FromStrings("env", "dev", "job", "api"),
|
||||
labels.FromStrings("region", "eu"),
|
||||
)
|
||||
|
||||
t.Run("no filter returns all names", func(t *testing.T) {
|
||||
rs := q.SearchLabelNames(ctx, nil)
|
||||
got := collectSearchResultSet(t, rs)
|
||||
gotValues := make([]string, len(got))
|
||||
for i, r := range got {
|
||||
gotValues[i] = r.Value
|
||||
require.Equal(t, 1.0, r.Score)
|
||||
}
|
||||
slices.Sort(gotValues)
|
||||
require.Equal(t, []string{"env", "job", "region"}, gotValues)
|
||||
})
|
||||
|
||||
t.Run("filter selects matching names", func(t *testing.T) {
|
||||
rs := q.SearchLabelNames(ctx, &storage.SearchHints{Filter: prefixFilter{"e"}})
|
||||
got := collectSearchResultSet(t, rs)
|
||||
require.Len(t, got, 1)
|
||||
require.Equal(t, "env", got[0].Value)
|
||||
})
|
||||
|
||||
t.Run("limit is applied", func(t *testing.T) {
|
||||
rs := q.SearchLabelNames(ctx, &storage.SearchHints{Limit: 1})
|
||||
got := collectSearchResultSet(t, rs)
|
||||
require.Len(t, got, 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBlockBaseQuerierSearchLabelValues(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
q := newBlockBaseQuerierForSearch(t,
|
||||
labels.FromStrings("env", "prod"),
|
||||
labels.FromStrings("env", "dev"),
|
||||
labels.FromStrings("env", "staging"),
|
||||
)
|
||||
|
||||
t.Run("no filter returns all values", func(t *testing.T) {
|
||||
rs := q.SearchLabelValues(ctx, "env", nil)
|
||||
got := collectSearchResultSet(t, rs)
|
||||
gotValues := make([]string, len(got))
|
||||
for i, r := range got {
|
||||
gotValues[i] = r.Value
|
||||
require.Equal(t, 1.0, r.Score)
|
||||
}
|
||||
slices.Sort(gotValues)
|
||||
require.Equal(t, []string{"dev", "prod", "staging"}, gotValues)
|
||||
})
|
||||
|
||||
t.Run("filter selects matching values", func(t *testing.T) {
|
||||
rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{Filter: prefixFilter{"p"}})
|
||||
got := collectSearchResultSet(t, rs)
|
||||
require.Len(t, got, 1)
|
||||
require.Equal(t, "prod", got[0].Value)
|
||||
})
|
||||
|
||||
t.Run("limit is applied after filtering", func(t *testing.T) {
|
||||
rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{
|
||||
Filter: prefixFilter{"p"},
|
||||
Limit: 1,
|
||||
})
|
||||
got := collectSearchResultSet(t, rs)
|
||||
require.Equal(t, []storage.SearchResult{{Value: "prod", Score: 1.0}}, got)
|
||||
})
|
||||
|
||||
t.Run("limit is applied", func(t *testing.T) {
|
||||
rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{Limit: 2})
|
||||
got := collectSearchResultSet(t, rs)
|
||||
require.Len(t, got, 2)
|
||||
})
|
||||
|
||||
t.Run("unknown label name returns empty", func(t *testing.T) {
|
||||
rs := q.SearchLabelValues(ctx, "unknown", nil)
|
||||
got := collectSearchResultSet(t, rs)
|
||||
require.Empty(t, got)
|
||||
})
|
||||
|
||||
t.Run("OrderByValueDesc returns values in reverse alphabetical order", func(t *testing.T) {
|
||||
rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{
|
||||
OrderBy: storage.OrderByValueDesc,
|
||||
})
|
||||
got := collectSearchResultSet(t, rs)
|
||||
gotValues := make([]string, len(got))
|
||||
for i, r := range got {
|
||||
gotValues[i] = r.Value
|
||||
require.Equal(t, 1.0, r.Score)
|
||||
}
|
||||
require.Equal(t, []string{"staging", "prod", "dev"}, gotValues)
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user