diff --git a/storage/generic.go b/storage/generic.go index e85ac77b9c..1fbd960959 100644 --- a/storage/generic.go +++ b/storage/generic.go @@ -17,12 +17,18 @@ package storage import ( + "cmp" "context" + "errors" + "slices" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/util/annotations" ) +// querierAdapter must implement the Searcher interface. +var _ Searcher = &querierAdapter{} + type genericQuerier interface { LabelQuerier Select(context.Context, bool, *SelectHints, ...*labels.Matcher) genericSeriesSet @@ -132,6 +138,382 @@ func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels { return a.VerticalChunkSeriesMergeFunc(buf...) } +// searcherFromGenericQuerier extracts a Searcher from a genericQuerierAdapter. +// Falls back to a direct Searcher assertion if the querier is not a +// genericQuerierAdapter. +func searcherFromGenericQuerier(gq genericQuerier) (Searcher, bool) { + if a, ok := gq.(*genericQuerierAdapter); ok { + s, ok := a.q.(Searcher) + return s, ok + } + s, ok := gq.(Searcher) + return s, ok +} + +// collectSearchers extracts Searcher implementations from a genericQuerier tree. +func collectSearchers(gq genericQuerier) []Searcher { + if m, ok := gq.(*mergeGenericQuerier); ok { + var searchers []Searcher + for _, q := range m.queriers { + searchers = append(searchers, collectSearchers(q)...) + } + return searchers + } + if s, ok := searcherFromGenericQuerier(gq); ok { + return []Searcher{s} + } + return nil +} + +// sliceSearchResultSet is a SearchResultSet backed by a pre-built slice. +type sliceSearchResultSet struct { + results []SearchResult + warnings annotations.Annotations + idx int +} + +func (s *sliceSearchResultSet) Next() bool { + s.idx++ + return s.idx < len(s.results) +} + +func (s *sliceSearchResultSet) At() SearchResult { return s.results[s.idx] } + +func (s *sliceSearchResultSet) Warnings() annotations.Annotations { return s.warnings } + +func (*sliceSearchResultSet) Err() error { return nil } + +func (*sliceSearchResultSet) Close() error { return nil } + +// NewSearchResultSetFromSlice returns a SearchResultSet that iterates over the given slice. +func NewSearchResultSetFromSlice(results []SearchResult, warns annotations.Annotations) SearchResultSet { + return &sliceSearchResultSet{results: results, warnings: warns, idx: -1} +} + +// ApplySearchHints filters, sorts, and limits a slice of string values according to hints, +// returning scored SearchResult entries. A nil hints value is treated as the zero value. +// The input values slice is assumed to be ordered ascending by value; the function only +// performs extra work for orderings that differ from this. +func ApplySearchHints(values []string, hints *SearchHints) []SearchResult { + if hints == nil { + hints = &SearchHints{} + } + results := make([]SearchResult, 0, len(values)) + for _, v := range values { + if hints.Filter != nil { + accepted, score := hints.Filter.Accept(v) + if accepted { + results = append(results, SearchResult{Value: v, Score: score}) + } + } else { + results = append(results, SearchResult{Value: v, Score: 1.0}) + } + } + switch hints.OrderBy { + case OrderByValueAsc: + // Input is already ascending; nothing to do. + case OrderByValueDesc: + slices.Reverse(results) + case OrderByScoreDesc: + slices.SortFunc(results, compareSearchResults(OrderByScoreDesc)) + } + if hints.Limit > 0 && len(results) > hints.Limit { + results = results[:hints.Limit] + } + return results +} + +// compareSearchResults returns the total-order comparison function for the +// given Ordering. For OrderByValueAsc and OrderByValueDesc the order is on +// Value alone. For OrderByScoreDesc the order is (Score desc, Value asc), +// which is a total order and defines the position at which a duplicate value +// is first emitted by the streaming merge. +func compareSearchResults(o Ordering) func(a, b SearchResult) int { + switch o { + case OrderByValueDesc: + return func(a, b SearchResult) int { return cmp.Compare(b.Value, a.Value) } + case OrderByScoreDesc: + return func(a, b SearchResult) int { + if c := cmp.Compare(b.Score, a.Score); c != 0 { + return c + } + return cmp.Compare(a.Value, b.Value) + } + default: + return func(a, b SearchResult) int { return cmp.Compare(a.Value, b.Value) } + } +} + +// mergeSearchSets merges results from multiple Searcher calls using a streaming +// pairwise k-way merge. Each searcher is required to emit results in the order +// requested by hints.OrderBy; the merge deduplicates by value so that a value +// appearing in several sources is emitted once, carrying its highest score. +func mergeSearchSets(hints *SearchHints, fn func(Searcher) SearchResultSet, searchers []Searcher) SearchResultSet { + if len(searchers) == 0 { + return EmptySearchResultSet() + } + + sets := make([]SearchResultSet, len(searchers)) + for i, s := range searchers { + sets[i] = &lazySearchResultSet{init: func() SearchResultSet { return fn(s) }} + } + var ( + order Ordering + limit int + ) + if hints != nil { + order = hints.OrderBy + limit = hints.Limit + } + + // Duplicate Values collapse in place inside mergingSearchResultSet. + // Under value-based orderings they are trivially adjacent. Under + // OrderByScoreDesc the Searcher contract requires identical scores + // for a given Value, so duplicates tie on (Score, Value) and are + // adjacent there too. + return pairwiseMergeSearchSets(sets, order, limit) +} + +// pairwiseMergeSearchSets recursively merges SearchResultSets in a balanced +// binary tree. Each merge node respects the requested ordering and stops after +// emitting limit results, enabling early termination that avoids consuming the +// full input from child nodes. +func pairwiseMergeSearchSets(sets []SearchResultSet, order Ordering, limit int) SearchResultSet { + switch len(sets) { + case 0: + return EmptySearchResultSet() + case 1: + if limit > 0 { + return &limitSearchResultSet{rs: sets[0], limit: limit} + } + return sets[0] + default: + mid := len(sets) / 2 + left := pairwiseMergeSearchSets(sets[:mid], order, limit) + right := pairwiseMergeSearchSets(sets[mid:], order, limit) + return newMergingSearchResultSet(left, right, order, limit) + } +} + +// lazySearchResultSet defers the creation of a SearchResultSet until the first +// call to Next. This avoids invoking searchers whose results are never consumed. +type lazySearchResultSet struct { + init func() SearchResultSet + rs SearchResultSet +} + +func (s *lazySearchResultSet) ensure() { + if s.rs == nil { + s.rs = s.init() + s.init = nil + } +} + +func (s *lazySearchResultSet) Next() bool { + s.ensure() + return s.rs.Next() +} + +func (s *lazySearchResultSet) At() SearchResult { + if s.rs == nil { + return SearchResult{} + } + return s.rs.At() +} + +func (s *lazySearchResultSet) Warnings() annotations.Annotations { + if s.rs == nil { + return nil + } + return s.rs.Warnings() +} + +func (s *lazySearchResultSet) Err() error { + if s.rs == nil { + return nil + } + return s.rs.Err() +} + +func (s *lazySearchResultSet) Close() error { + if s.rs == nil { + return nil + } + return s.rs.Close() +} + +// limitSearchResultSet wraps a SearchResultSet and stops after limit results. +type limitSearchResultSet struct { + rs SearchResultSet + limit int + emitted int +} + +func (s *limitSearchResultSet) Next() bool { + if s.limit > 0 && s.emitted >= s.limit { + return false + } + if s.rs.Next() { + s.emitted++ + return true + } + return false +} + +func (s *limitSearchResultSet) At() SearchResult { return s.rs.At() } +func (s *limitSearchResultSet) Warnings() annotations.Annotations { return s.rs.Warnings() } +func (s *limitSearchResultSet) Err() error { return s.rs.Err() } +func (s *limitSearchResultSet) Close() error { return s.rs.Close() } + +// mergingSearchResultSet lazily merges two pre-sorted SearchResultSets using +// the comparison function defined by order. Both inputs must yield results in +// that order. Equal entries (same Value under value orderings, same +// (Score, Value) under OrderByScoreDesc) collapse in place; under value +// orderings the higher score wins. +type mergingSearchResultSet struct { + a, b SearchResultSet + cmpFn func(a, b SearchResult) int + valueOrder bool // true when order collapses adjacent duplicates by Value. + limit int + emitted int + curr SearchResult + aVal, bVal SearchResult + aOk, bOk bool // Whether aVal/bVal hold a buffered value. + aInit, bInit bool // Whether a/b have been advanced at least once. + done bool +} + +func newMergingSearchResultSet(a, b SearchResultSet, order Ordering, limit int) *mergingSearchResultSet { + return &mergingSearchResultSet{ + a: a, + b: b, + cmpFn: compareSearchResults(order), + valueOrder: order == OrderByValueAsc || order == OrderByValueDesc, + limit: limit, + } +} + +func (s *mergingSearchResultSet) Next() bool { + if s.done { + return false + } + if s.limit > 0 && s.emitted >= s.limit { + s.done = true + return false + } + + // Prime both sides on first call. + if !s.aInit { + s.aOk = s.a.Next() + if s.aOk { + s.aVal = s.a.At() + } + s.aInit = true + } + if !s.bInit { + s.bOk = s.b.Next() + if s.bOk { + s.bVal = s.b.At() + } + s.bInit = true + } + + // Check for errors from either side after priming or after the + // previous advance. An error means we should stop iteration. + if s.a.Err() != nil || s.b.Err() != nil { + s.done = true + return false + } + + switch { + case !s.aOk && !s.bOk: + s.done = true + return false + case !s.aOk: + s.curr = s.bVal + s.bOk = s.b.Next() + if s.bOk { + s.bVal = s.b.At() + } + case !s.bOk: + s.curr = s.aVal + s.aOk = s.a.Next() + if s.aOk { + s.aVal = s.a.At() + } + default: + // Under value-based orderings, equal-Value entries collapse in + // place and keep the higher score. Under OrderByScoreDesc the + // comparator tie-breaks on Value, so equal cmp means equal + // (Score, Value) — collapsing is safe there too. + c := s.cmpFn(s.aVal, s.bVal) + switch { + case c < 0: + s.curr = s.aVal + s.aOk = s.a.Next() + if s.aOk { + s.aVal = s.a.At() + } + case c > 0: + s.curr = s.bVal + s.bOk = s.b.Next() + if s.bOk { + s.bVal = s.b.At() + } + default: + if s.valueOrder && s.bVal.Score > s.aVal.Score { + s.curr = s.bVal + } else { + s.curr = s.aVal + } + s.aOk = s.a.Next() + if s.aOk { + s.aVal = s.a.At() + } + s.bOk = s.b.Next() + if s.bOk { + s.bVal = s.b.At() + } + } + } + + s.emitted++ + return true +} + +func (s *mergingSearchResultSet) At() SearchResult { return s.curr } + +func (s *mergingSearchResultSet) Warnings() annotations.Annotations { + var ws annotations.Annotations + ws.Merge(s.a.Warnings()) + ws.Merge(s.b.Warnings()) + return ws +} + +func (s *mergingSearchResultSet) Err() error { + return errors.Join(s.a.Err(), s.b.Err()) +} + +func (s *mergingSearchResultSet) Close() error { + return errors.Join(s.a.Close(), s.b.Close()) +} + +// SearchLabelNames implements Searcher by merging results from all underlying queriers +// that support the Searcher interface. +func (q *querierAdapter) SearchLabelNames(ctx context.Context, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet { + return mergeSearchSets(hints, func(s Searcher) SearchResultSet { + return s.SearchLabelNames(ctx, hints, matchers...) + }, collectSearchers(q.genericQuerier)) +} + +// SearchLabelValues implements Searcher by merging results from all underlying queriers +// that support the Searcher interface. +func (q *querierAdapter) SearchLabelValues(ctx context.Context, name string, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet { + return mergeSearchSets(hints, func(s Searcher) SearchResultSet { + return s.SearchLabelValues(ctx, name, hints, matchers...) + }, collectSearchers(q.genericQuerier)) +} + type noopGenericSeriesSet struct{} func (noopGenericSeriesSet) Next() bool { return false } diff --git a/storage/interface.go b/storage/interface.go index d15ba547c8..a30da613c8 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -249,13 +249,131 @@ type SelectHints struct { ProjectionInclude bool } +// Filter determines whether a value should be included in results. +// Returns (accepted, score) where score is used for relevance ranking. +// Score should be in range [0.0, 1.0] where 1.0 is perfect match. +type Filter interface { + Accept(value string) (accepted bool, score float64) +} + +// Ordering is a closed set of result orderings that searchers may natively +// produce and that the merge layer can exploit for streaming k-way merges. +// New orderings must be added explicitly here so that wire protocols, +// downstream implementations, and merge strategies stay in sync. +type Ordering uint8 + +const ( + // OrderByValueAsc orders results ascending by Value. This matches the + // natural index order and is the zero value of Ordering. + OrderByValueAsc Ordering = iota + // OrderByValueDesc orders results descending by Value. + OrderByValueDesc + // OrderByScoreDesc orders results descending by Score, breaking ties + // ascending by Value for determinism. + OrderByScoreDesc +) + +// SearchResultSet is an iterator over search results. +// Callers must call Close when done, regardless of whether all results were consumed. +type SearchResultSet interface { + // Next advances the iterator. Returns false when exhausted or on error. + Next() bool + // At returns the current search result. Must only be called after a successful Next. + At() SearchResult + // Warnings returns warnings accumulated so far. + Warnings() annotations.Annotations + // Err returns any error that caused iteration to stop. + Err() error + // Close releases resources associated with this result set. + Close() error +} + +type emptySearchResultSet struct{} + +func (emptySearchResultSet) Next() bool { return false } +func (emptySearchResultSet) At() SearchResult { return SearchResult{} } +func (emptySearchResultSet) Warnings() annotations.Annotations { return nil } +func (emptySearchResultSet) Err() error { return nil } +func (emptySearchResultSet) Close() error { return nil } + +// EmptySearchResultSet returns a SearchResultSet that contains no results. +func EmptySearchResultSet() SearchResultSet { return emptySearchResultSet{} } + +type errSearchResultSet struct { + err error + warnings annotations.Annotations +} + +func (errSearchResultSet) Next() bool { return false } +func (errSearchResultSet) At() SearchResult { return SearchResult{} } +func (s errSearchResultSet) Warnings() annotations.Annotations { return s.warnings } +func (s errSearchResultSet) Err() error { return s.err } +func (errSearchResultSet) Close() error { return nil } + +// ErrSearchResultSet returns a SearchResultSet that immediately returns the given error. +// Any supplied annotations.Annotations are merged and exposed via Warnings, allowing +// callers to surface warnings accumulated before the error occurred. +func ErrSearchResultSet(err error, warnings ...annotations.Annotations) SearchResultSet { + var warns annotations.Annotations + for _, w := range warnings { + warns.Merge(w) + } + return errSearchResultSet{err: err, warnings: warns} +} + // LabelHints specifies hints passed for label reads. // This is used only as an option for implementation to use. +// Results are returned in natural (alphabetical) order. type LabelHints struct { // Maximum number of results returned. Use a value of 0 to disable. Limit int } +// SearchHints configures search operations with filtering and scoring. +// Unlike LabelHints, SearchHints is specifically designed for search APIs +// that need relevance scoring and ranking. +type SearchHints struct { + // Filter determines which values to include and their relevance scores. + // A nil Filter accepts all values and each result has a score of 1.0. + Filter Filter + + // Limit is the maximum number of results to return. + // Use 0 to disable limiting. + Limit int + + // OrderBy selects the ordering of results. The zero value is + // OrderByValueAsc, which matches the natural index order. + OrderBy Ordering +} + +// SearchResult represents a single search result with its relevance score. +type SearchResult struct { + // Value is the label name or label value. + Value string + + // Score represents relevance, with 1.0 being a perfect match. + // Score range is [0.0, 1.0]. + Score float64 +} + +// Searcher provides search capabilities with relevance scoring. +// This interface is designed for autocomplete and search UIs that need +// to rank results by relevance rather than just filter them. +// +// For a given Value and SearchHints.Filter, Score must be deterministic +// and identical across Searcher implementations. +type Searcher interface { + // SearchLabelNames returns an iterator over label names matching the search criteria. + // Results include relevance scores based on the Filter. + // The caller must call Close on the returned SearchResultSet when done. + SearchLabelNames(ctx context.Context, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet + + // SearchLabelValues returns an iterator over label values for the given label name. + // Results include relevance scores based on the Filter. + // The caller must call Close on the returned SearchResultSet when done. + SearchLabelValues(ctx context.Context, name string, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet +} + // QueryableFunc is an adapter to allow the use of ordinary functions as // Queryables. It follows the idea of http.HandlerFunc. // TODO(bwplotka): Move to promql/engine_test.go? diff --git a/storage/merge_test.go b/storage/merge_test.go index e42a6a4ce1..2afc2caeb6 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1723,3 +1723,491 @@ func (errIterator) AtST() int64 { func (e errIterator) Err() error { return e.err } + +// searchQuerier is a Querier that also implements Searcher, for testing merge behaviour. +type searchQuerier struct { + mockQuerier + names []SearchResult + values []SearchResult + searchWarns annotations.Annotations + err error +} + +func (q *searchQuerier) SearchLabelNames(_ context.Context, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet { + if q.err != nil { + return ErrSearchResultSet(q.err) + } + return NewSearchResultSetFromSlice(q.names, q.searchWarns) +} + +func (q *searchQuerier) SearchLabelValues(_ context.Context, _ string, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet { + if q.err != nil { + return ErrSearchResultSet(q.err) + } + return NewSearchResultSetFromSlice(q.values, q.searchWarns) +} + +// partialErrSearchQuerier is a Querier+Searcher that yields partial results +// then returns an error, for testing secondary querier mid-stream failure. +type partialErrSearchQuerier struct { + mockQuerier + names []SearchResult + values []SearchResult + err error +} + +func (q *partialErrSearchQuerier) SearchLabelNames(_ context.Context, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet { + return newErrAfterResultsSet(q.names, q.err) +} + +func (q *partialErrSearchQuerier) SearchLabelValues(_ context.Context, _ string, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet { + return newErrAfterResultsSet(q.values, q.err) +} + +// errAfterResultsSet is a SearchResultSet that yields results then returns an error. +type errAfterResultsSet struct { + results []SearchResult + idx int // starts at -1; incremented by Next. + err error +} + +func newErrAfterResultsSet(results []SearchResult, err error) *errAfterResultsSet { + return &errAfterResultsSet{results: results, err: err, idx: -1} +} + +func (s *errAfterResultsSet) Next() bool { + s.idx++ + return s.idx < len(s.results) +} + +func (s *errAfterResultsSet) At() SearchResult { return s.results[s.idx] } + +func (*errAfterResultsSet) Warnings() annotations.Annotations { return nil } + +func (s *errAfterResultsSet) Err() error { + if s.idx >= len(s.results) { + return s.err + } + return nil +} + +func (*errAfterResultsSet) Close() error { return nil } + +func collectSearchResults(t *testing.T, rs SearchResultSet) []SearchResult { + t.Helper() + var got []SearchResult + for rs.Next() { + got = append(got, rs.At()) + } + require.NoError(t, rs.Err()) + require.NoError(t, rs.Close()) + return got +} + +func TestMergeQuerierSearch(t *testing.T) { + ctx := t.Context() + + // newMerged creates a merged querier from two queriers, ensuring the querierAdapter + // path is always taken (NewMergeQuerier returns the single querier directly if only + // one is provided). + newMerged := func(a, b Querier) Querier { + return NewMergeQuerier([]Querier{a, b}, nil, ChainedSeriesMerge) + } + + t.Run("results from both searchers are merged", func(t *testing.T) { + q1 := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 0.9}}, + values: []SearchResult{{Value: "prod", Score: 1.0}}, + } + q2 := &searchQuerier{ + names: []SearchResult{{Value: "job", Score: 0.5}}, + values: []SearchResult{{Value: "dev", Score: 0.8}}, + } + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Len(t, got, 2) + + got = collectSearchResults(t, merged.(Searcher).SearchLabelValues(ctx, "env", nil)) + require.Len(t, got, 2) + }) + + t.Run("duplicate values keep max score", func(t *testing.T) { + q1 := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 0.6}, {Value: "job", Score: 0.9}}, + } + q2 := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 0.8}, {Value: "region", Score: 0.5}}, + } + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Len(t, got, 3) + scores := make(map[string]float64, len(got)) + for _, r := range got { + scores[r.Value] = r.Score + } + // "env" appears in both; max score wins. + require.Equal(t, 0.8, scores["env"]) + require.Equal(t, 0.9, scores["job"]) + require.Equal(t, 0.5, scores["region"]) + }) + + t.Run("natural order is preserved for merged searchers", func(t *testing.T) { + q1 := &searchQuerier{ + names: []SearchResult{{Value: "region", Score: 0.5}, {Value: "zone", Score: 0.4}}, + } + q2 := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 0.8}, {Value: "job", Score: 0.9}}, + } + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Equal(t, []SearchResult{ + {Value: "env", Score: 0.8}, + {Value: "job", Score: 0.9}, + {Value: "region", Score: 0.5}, + {Value: "zone", Score: 0.4}, + }, got) + }) + + t.Run("secondary search errors become warnings", func(t *testing.T) { + primary := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 1.0}}, + } + secondary := &searchQuerier{err: errors.New("secondary search failed")} + merged := NewMergeQuerier([]Querier{primary}, []Querier{secondary}, ChainedSeriesMerge) + defer merged.Close() + + rs := merged.(Searcher).SearchLabelNames(ctx, nil) + got := collectSearchResults(t, rs) + require.Equal(t, []SearchResult{{Value: "env", Score: 1.0}}, got) + warnings := rs.Warnings().AsErrors() + require.Len(t, warnings, 1) + require.Contains(t, warnings[0].Error(), "secondary search failed") + }) + + t.Run("secondary partial label names followed by error become warnings", func(t *testing.T) { + primary := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 1.0}}, + } + secondary := &partialErrSearchQuerier{ + names: []SearchResult{{Value: "zone", Score: 0.5}}, + err: errors.New("partial secondary failure"), + } + merged := NewMergeQuerier([]Querier{primary}, []Querier{secondary}, ChainedSeriesMerge) + defer merged.Close() + + rs := merged.(Searcher).SearchLabelNames(ctx, nil) + for rs.Next() { + } + require.NoError(t, rs.Err()) + warnings := rs.Warnings().AsErrors() + require.Len(t, warnings, 1) + require.Contains(t, warnings[0].Error(), "partial secondary failure") + require.NoError(t, rs.Close()) + }) + + t.Run("warnings accessible after early Close", func(t *testing.T) { + // Exercises the internal warningsOnErrorSearchSet lifecycle when + // the caller closes before exhaustion. This cannot be tested + // through the public API because the error-to-warning conversion + // only fires on exhaustion. + inner := newErrAfterResultsSet( + []SearchResult{{Value: "zone", Score: 0.5}}, + errors.New("early close failure"), + ) + rs := warningsOnErrorSearchResultSet(inner) + require.True(t, rs.Next()) + // Do not call Next again; close early. + require.NoError(t, rs.Close()) + // Inner set has no warnings yet (error only fires after exhaustion), + // but Close must not panic and Warnings must be callable. + require.NoError(t, rs.Err()) + _ = rs.Warnings() + // Calling Close a second time must be a no-op. + require.NoError(t, rs.Close()) + }) + + t.Run("secondary partial label values followed by error become warnings", func(t *testing.T) { + primary := &searchQuerier{ + values: []SearchResult{{Value: "prod", Score: 1.0}}, + } + secondary := &partialErrSearchQuerier{ + values: []SearchResult{{Value: "staging", Score: 0.5}}, + err: errors.New("partial secondary failure"), + } + merged := NewMergeQuerier([]Querier{primary}, []Querier{secondary}, ChainedSeriesMerge) + defer merged.Close() + + rs := merged.(Searcher).SearchLabelValues(ctx, "env", nil) + for rs.Next() { + } + require.NoError(t, rs.Err()) + warnings := rs.Warnings().AsErrors() + require.Len(t, warnings, 1) + require.Contains(t, warnings[0].Error(), "partial secondary failure") + require.NoError(t, rs.Close()) + }) + + t.Run("non-searcher querier is skipped", func(t *testing.T) { + q1 := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 1.0}}, + } + q2 := &mockQuerier{resp: []string{"should_be_ignored"}} + // Verify precondition: mockQuerier does not implement Searcher. + _, isSearcher := (Querier)(q2).(Searcher) + require.False(t, isSearcher) + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Len(t, got, 1) + require.Equal(t, "env", got[0].Value) + }) + + t.Run("no searcher queriers returns empty", func(t *testing.T) { + // Two non-searcher queriers force the querierAdapter path. + q1 := &mockQuerier{resp: []string{"a", "b"}} + q2 := &mockQuerier{resp: []string{"c", "d"}} + // Verify precondition. + _, isSearcher := (Querier)(q1).(Searcher) + require.False(t, isSearcher) + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Empty(t, got) + }) + + t.Run("OrderByScoreDesc overrides natural ordering", func(t *testing.T) { + // Each searcher must emit in the requested order; score-desc + // rearranges the overall result stream. The inputs below are + // already score-desc within each searcher. + q1 := &searchQuerier{ + names: []SearchResult{{Value: "job", Score: 0.9}, {Value: "env", Score: 0.3}}, + } + q2 := &searchQuerier{ + names: []SearchResult{{Value: "region", Score: 0.5}}, + } + merged := newMerged(q1, q2) + defer merged.Close() + + hints := &SearchHints{OrderBy: OrderByScoreDesc} + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, hints)) + require.Equal(t, []SearchResult{ + {Value: "job", Score: 0.9}, + {Value: "region", Score: 0.5}, + {Value: "env", Score: 0.3}, + }, got) + }) + + t.Run("OrderByScoreDesc error preserves warnings", func(t *testing.T) { + var ws annotations.Annotations + ws.Add(errors.New("prior warning")) + q1 := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 1.0}}, + searchWarns: ws, + } + q2 := &searchQuerier{err: errors.New("score path failure")} + merged := newMerged(q1, q2) + defer merged.Close() + + hints := &SearchHints{OrderBy: OrderByScoreDesc} + rs := merged.(Searcher).SearchLabelNames(ctx, hints) + // Drain to completion so that the failing searcher's error + // surfaces. Streaming may emit some results before the error. + for rs.Next() { + } + require.Error(t, rs.Err()) + require.Contains(t, rs.Err().Error(), "score path failure") + warnings := rs.Warnings().AsErrors() + require.Len(t, warnings, 1) + require.Contains(t, warnings[0].Error(), "prior warning") + require.NoError(t, rs.Close()) + }) + + t.Run("OrderByValueDesc reverses natural ordering", func(t *testing.T) { + // Each searcher must emit in descending-Value order. + q1 := &searchQuerier{ + names: []SearchResult{{Value: "job", Score: 1.0}, {Value: "env", Score: 1.0}}, + } + q2 := &searchQuerier{ + names: []SearchResult{{Value: "region", Score: 1.0}, {Value: "cluster", Score: 1.0}}, + } + merged := newMerged(q1, q2) + defer merged.Close() + + hints := &SearchHints{OrderBy: OrderByValueDesc} + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, hints)) + require.Equal(t, []SearchResult{ + {Value: "region", Score: 1.0}, + {Value: "job", Score: 1.0}, + {Value: "env", Score: 1.0}, + {Value: "cluster", Score: 1.0}, + }, got) + }) + + t.Run("primary error preserves warnings from prior searchers", func(t *testing.T) { + var ws annotations.Annotations + ws.Add(errors.New("prior warning")) + q1 := &searchQuerier{ + names: []SearchResult{{Value: "env", Score: 1.0}}, + searchWarns: ws, + } + q2 := &searchQuerier{err: errors.New("primary failure")} + merged := newMerged(q1, q2) + defer merged.Close() + + rs := merged.(Searcher).SearchLabelNames(ctx, nil) + // Iteration should see no results because the merge errored. + require.False(t, rs.Next()) + require.Error(t, rs.Err()) + require.Contains(t, rs.Err().Error(), "primary failure") + // Warnings from the successful first searcher must be preserved. + warnings := rs.Warnings().AsErrors() + require.Len(t, warnings, 1) + require.Contains(t, warnings[0].Error(), "prior warning") + require.NoError(t, rs.Close()) + }) + + t.Run("limit is applied at merge level", func(t *testing.T) { + q1 := &searchQuerier{ + names: []SearchResult{{Value: "a", Score: 1.0}, {Value: "b", Score: 0.9}}, + } + q2 := &searchQuerier{ + names: []SearchResult{{Value: "c", Score: 0.8}}, + } + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, &SearchHints{Limit: 2})) + require.Len(t, got, 2) + // Alphabetical order, limited to 2. + require.Equal(t, "a", got[0].Value) + require.Equal(t, "b", got[1].Value) + }) + + t.Run("empty searchers", func(t *testing.T) { + q1 := &searchQuerier{} + q2 := &searchQuerier{} + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Empty(t, got) + }) + + t.Run("single searcher passthrough", func(t *testing.T) { + // NewMergeQuerier with one querier returns it directly, so use two + // where one is empty to force the merge path. + q1 := &searchQuerier{ + names: []SearchResult{ + {Value: "a", Score: 1.0}, + {Value: "b", Score: 1.0}, + {Value: "c", Score: 1.0}, + }, + } + q2 := &searchQuerier{} + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Equal(t, []SearchResult{ + {Value: "a", Score: 1.0}, + {Value: "b", Score: 1.0}, + {Value: "c", Score: 1.0}, + }, got) + }) + + t.Run("overlapping values are deduplicated", func(t *testing.T) { + q1 := &searchQuerier{ + names: []SearchResult{ + {Value: "a", Score: 1.0}, + {Value: "b", Score: 1.0}, + }, + } + q2 := &searchQuerier{ + names: []SearchResult{ + {Value: "b", Score: 1.0}, + {Value: "c", Score: 1.0}, + }, + } + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, nil)) + require.Equal(t, []SearchResult{ + {Value: "a", Score: 1.0}, + {Value: "b", Score: 1.0}, + {Value: "c", Score: 1.0}, + }, got) + }) + + t.Run("interleaved values with limit", func(t *testing.T) { + q1 := &searchQuerier{ + names: []SearchResult{ + {Value: "a", Score: 1.0}, + {Value: "c", Score: 1.0}, + {Value: "e", Score: 1.0}, + }, + } + q2 := &searchQuerier{ + names: []SearchResult{ + {Value: "b", Score: 1.0}, + {Value: "d", Score: 1.0}, + }, + } + merged := newMerged(q1, q2) + defer merged.Close() + + got := collectSearchResults(t, merged.(Searcher).SearchLabelNames(ctx, &SearchHints{Limit: 3})) + require.Equal(t, []SearchResult{ + {Value: "a", Score: 1.0}, + {Value: "b", Score: 1.0}, + {Value: "c", Score: 1.0}, + }, got) + }) +} + +func BenchmarkMergeSearchSets(b *testing.B) { + // Build N searchers each returning M sorted results with no overlap. + const nSearchers = 4 + const resultsPerSearcher = 10000 + queriers := make([]Querier, nSearchers) + for i := range nSearchers { + names := make([]SearchResult, resultsPerSearcher) + for j := range resultsPerSearcher { + names[j] = SearchResult{Value: fmt.Sprintf("label_%04d_%06d", i, j), Score: 1.0} + } + queriers[i] = &searchQuerier{names: names} + } + + b.Run("no_limit", func(b *testing.B) { + for b.Loop() { + merged := NewMergeQuerier(queriers, nil, ChainedSeriesMerge) + rs := merged.(Searcher).SearchLabelNames(context.Background(), nil) + for rs.Next() { + } + require.NoError(b, rs.Err()) + rs.Close() + merged.Close() + } + }) + + b.Run("limit_100", func(b *testing.B) { + hints := &SearchHints{Limit: 100} + for b.Loop() { + merged := NewMergeQuerier(queriers, nil, ChainedSeriesMerge) + rs := merged.(Searcher).SearchLabelNames(context.Background(), hints) + for rs.Next() { + } + require.NoError(b, rs.Err()) + rs.Close() + merged.Close() + } + }) +} diff --git a/storage/noop.go b/storage/noop.go index 751e6304db..3c49cdb60c 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -43,6 +43,16 @@ func (noopQuerier) Close() error { return nil } +func (noopQuerier) SearchLabelNames(_ context.Context, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet { + return nil +} + +func (noopQuerier) SearchLabelValues(_ context.Context, _ string, _ *SearchHints, _ ...*labels.Matcher) SearchResultSet { + return nil +} + +var _ Searcher = noopQuerier{} + type noopChunkQuerier struct{} // NoopChunkedQuerier is a ChunkQuerier that does nothing. diff --git a/storage/secondary.go b/storage/secondary.go index a071ddcfa3..dc25c7d70d 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -21,6 +21,9 @@ import ( "github.com/prometheus/prometheus/util/annotations" ) +// secondaryQuerier must implement the Searcher interface. +var _ Searcher = (*secondaryQuerier)(nil) + // secondaryQuerier is a wrapper that allows a querier to be treated in a best effort manner. // This means that an error on any method returned by Querier except Close will be returned as a warning, // and the result will be empty. @@ -65,6 +68,24 @@ func (s *secondaryQuerier) LabelNames(ctx context.Context, hints *LabelHints, ma return names, w, nil } +// SearchLabelNames returns search results from the wrapped querier and converts errors into warnings. +func (s *secondaryQuerier) SearchLabelNames(ctx context.Context, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet { + searcher, ok := searcherFromGenericQuerier(s.genericQuerier) + if !ok { + return EmptySearchResultSet() + } + return warningsOnErrorSearchResultSet(searcher.SearchLabelNames(ctx, hints, matchers...)) +} + +// SearchLabelValues returns search results from the wrapped querier and converts errors into warnings. +func (s *secondaryQuerier) SearchLabelValues(ctx context.Context, name string, hints *SearchHints, matchers ...*labels.Matcher) SearchResultSet { + searcher, ok := searcherFromGenericQuerier(s.genericQuerier) + if !ok { + return EmptySearchResultSet() + } + return warningsOnErrorSearchResultSet(searcher.SearchLabelValues(ctx, name, hints, matchers...)) +} + func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { if s.done { panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done") @@ -107,3 +128,56 @@ func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *S } }} } + +type warningsOnErrorSearchSet struct { + rs SearchResultSet + warnings annotations.Annotations +} + +// warningsOnErrorSearchResultSet wraps rs so that an iteration error is surfaced as a warning. +func warningsOnErrorSearchResultSet(rs SearchResultSet) SearchResultSet { + return &warningsOnErrorSearchSet{rs: rs} +} + +func (s *warningsOnErrorSearchSet) Next() bool { + if s.rs == nil { + return false + } + if s.rs.Next() { + return true + } + if err := s.rs.Err(); err != nil { + var ws annotations.Annotations + ws.Merge(s.rs.Warnings()) + s.warnings = ws.Add(err) + _ = s.rs.Close() + s.rs = nil + } + return false +} + +func (s *warningsOnErrorSearchSet) At() SearchResult { + if s.rs == nil { + return SearchResult{} + } + return s.rs.At() +} + +func (s *warningsOnErrorSearchSet) Warnings() annotations.Annotations { + if s.rs != nil { + return s.rs.Warnings() + } + return s.warnings +} + +func (*warningsOnErrorSearchSet) Err() error { return nil } + +func (s *warningsOnErrorSearchSet) Close() error { + if s.rs == nil { + return nil + } + s.warnings = s.rs.Warnings() + err := s.rs.Close() + s.rs = nil + return err +} diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 0aa381a673..53440591ca 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -562,6 +562,8 @@ type HeadAndOOOQuerier struct { querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end. } +var _ storage.Searcher = &HeadAndOOOQuerier{} + func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { cr := &headChunkReader{ head: head, @@ -593,6 +595,28 @@ func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.Label return q.querier.LabelNames(ctx, hints, matchers...) } +// SearchLabelNames implements storage.Searcher by delegating to the inner querier. +func (q *HeadAndOOOQuerier) SearchLabelNames(ctx context.Context, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet { + if q.querier == nil { + return storage.EmptySearchResultSet() + } + if s, ok := q.querier.(storage.Searcher); ok { + return s.SearchLabelNames(ctx, hints, matchers...) + } + return storage.EmptySearchResultSet() +} + +// SearchLabelValues implements storage.Searcher by delegating to the inner querier. +func (q *HeadAndOOOQuerier) SearchLabelValues(ctx context.Context, name string, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet { + if q.querier == nil { + return storage.EmptySearchResultSet() + } + if s, ok := q.querier.(storage.Searcher); ok { + return s.SearchLabelValues(ctx, name, hints, matchers...) + } + return storage.EmptySearchResultSet() +} + func (q *HeadAndOOOQuerier) Close() error { q.chunkr.Close() if q.querier == nil { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index f58ee3aada..ddb3c9e48f 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/compression" ) @@ -1114,3 +1115,112 @@ func TestSortMetaByMinTimeAndMinRef(t *testing.T) { }) } } + +// mockSearchQuerier is a minimal storage.Querier that also implements storage.Searcher. +type mockSearchQuerier struct { + labelNames []string + labelValues []string +} + +func (*mockSearchQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { + return storage.EmptySeriesSet() +} + +func (m *mockSearchQuerier) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return m.labelValues, nil, nil +} + +func (m *mockSearchQuerier) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return m.labelNames, nil, nil +} + +func (*mockSearchQuerier) Close() error { return nil } + +func (m *mockSearchQuerier) SearchLabelNames(_ context.Context, _ *storage.SearchHints, _ ...*labels.Matcher) storage.SearchResultSet { + results := make([]storage.SearchResult, len(m.labelNames)) + for i, n := range m.labelNames { + results[i] = storage.SearchResult{Value: n, Score: 1.0} + } + return storage.NewSearchResultSetFromSlice(results, nil) +} + +func (m *mockSearchQuerier) SearchLabelValues(_ context.Context, _ string, _ *storage.SearchHints, _ ...*labels.Matcher) storage.SearchResultSet { + results := make([]storage.SearchResult, len(m.labelValues)) + for i, v := range m.labelValues { + results[i] = storage.SearchResult{Value: v, Score: 1.0} + } + return storage.NewSearchResultSetFromSlice(results, nil) +} + +// mockSearchQuerierNoSearch is a storage.Querier that does not implement storage.Searcher. +type mockSearchQuerierNoSearch struct{} + +func (mockSearchQuerierNoSearch) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { + return storage.EmptySeriesSet() +} + +func (mockSearchQuerierNoSearch) LabelValues(context.Context, string, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (mockSearchQuerierNoSearch) LabelNames(context.Context, *storage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (mockSearchQuerierNoSearch) Close() error { return nil } + +func TestHeadAndOOOQuerierSearch(t *testing.T) { + ctx := context.Background() + + t.Run("nil inner querier returns empty", func(t *testing.T) { + q := &HeadAndOOOQuerier{} + rs := q.SearchLabelNames(ctx, nil) + require.False(t, rs.Next()) + require.NoError(t, rs.Err()) + require.NoError(t, rs.Close()) + + rs = q.SearchLabelValues(ctx, "env", nil) + require.False(t, rs.Next()) + require.NoError(t, rs.Err()) + require.NoError(t, rs.Close()) + }) + + t.Run("delegates to inner searcher", func(t *testing.T) { + inner := &mockSearchQuerier{ + labelNames: []string{"env", "job"}, + labelValues: []string{"prod", "dev"}, + } + q := &HeadAndOOOQuerier{querier: inner} + + rs := q.SearchLabelNames(ctx, nil) + var names []string + for rs.Next() { + names = append(names, rs.At().Value) + } + require.NoError(t, rs.Err()) + require.NoError(t, rs.Close()) + require.Equal(t, []string{"env", "job"}, names) + + rs = q.SearchLabelValues(ctx, "env", nil) + var values []string + for rs.Next() { + values = append(values, rs.At().Value) + } + require.NoError(t, rs.Err()) + require.NoError(t, rs.Close()) + require.Equal(t, []string{"prod", "dev"}, values) + }) + + t.Run("non-searcher inner querier returns empty", func(t *testing.T) { + inner := mockSearchQuerierNoSearch{} + q := &HeadAndOOOQuerier{querier: inner} + + rs := q.SearchLabelNames(ctx, nil) + require.False(t, rs.Next()) + require.NoError(t, rs.Close()) + + rs = q.SearchLabelValues(ctx, "env", nil) + require.False(t, rs.Next()) + require.NoError(t, rs.Close()) + }) +} diff --git a/tsdb/querier.go b/tsdb/querier.go index 29472c0725..0dc8a9259f 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -46,6 +46,8 @@ type blockBaseQuerier struct { mint, maxt int64 } +var _ storage.Searcher = &blockBaseQuerier{} + func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, error) { indexr, err := b.Index() if err != nil { @@ -94,6 +96,49 @@ func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelH return res, nil, nil } +// SearchLabelNames implements storage.Searcher. +func (q *blockBaseQuerier) SearchLabelNames(ctx context.Context, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet { + names, err := q.index.LabelNames(ctx, matchers...) + if err != nil { + return storage.ErrSearchResultSet(err) + } + + return storage.NewSearchResultSetFromSlice(storage.ApplySearchHints(names, hints), nil) +} + +// SearchLabelValues implements storage.Searcher. +func (q *blockBaseQuerier) SearchLabelValues(ctx context.Context, name string, hints *storage.SearchHints, matchers ...*labels.Matcher) storage.SearchResultSet { + if hints == nil { + hints = &storage.SearchHints{} + } + + // Limit pushdown is only correct when natural (ascending) index order + // is preserved all the way to the output and no filtering discards + // values ahead of the limit. + labelHints := &storage.LabelHints{} + if hints.OrderBy == storage.OrderByValueAsc && hints.Filter == nil { + labelHints.Limit = hints.Limit + } + + var ( + values []string + err error + ) + switch hints.OrderBy { + case storage.OrderByScoreDesc: + // Score-based sorting happens in ApplySearchHints; avoid the + // index-level sort. + values, err = q.index.LabelValues(ctx, name, labelHints, matchers...) + default: + values, err = q.index.SortedLabelValues(ctx, name, labelHints, matchers...) + } + if err != nil { + return storage.ErrSearchResultSet(err) + } + + return storage.NewSearchResultSetFromSlice(storage.ApplySearchHints(values, hints), nil) +} + func (q *blockBaseQuerier) Close() error { if q.closed { return errors.New("block querier already closed") diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 73799e71d4..68320aee6e 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -4046,3 +4046,138 @@ func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) { require.Equal(t, originalMatchers, matchers) } + +// prefixFilter accepts values that start with the given prefix and scores them 1.0. +type prefixFilter struct{ prefix string } + +func (f prefixFilter) Accept(v string) (bool, float64) { + if strings.HasPrefix(v, f.prefix) { + return true, 1.0 + } + return false, 0 +} + +func newBlockBaseQuerierForSearch(t *testing.T, labelSets ...labels.Labels) *blockBaseQuerier { + t.Helper() + ix := newMockIndex() + // Group series refs by label for writing postings. + postings := make(map[labels.Label][]storage.SeriesRef) + for i, ls := range labelSets { + ref := storage.SeriesRef(i) + require.NoError(t, ix.AddSeries(ref, ls)) + ls.Range(func(lbl labels.Label) { + postings[lbl] = append(postings[lbl], ref) + }) + } + for lbl, refs := range postings { + require.NoError(t, ix.WritePostings(lbl.Name, lbl.Value, index.NewListPostings(refs))) + } + return &blockBaseQuerier{index: ix, chunks: nil, tombstones: tombstones.NewMemTombstones()} +} + +func collectSearchResultSet(t *testing.T, rs storage.SearchResultSet) []storage.SearchResult { + t.Helper() + var got []storage.SearchResult + for rs.Next() { + got = append(got, rs.At()) + } + require.NoError(t, rs.Err()) + require.NoError(t, rs.Close()) + return got +} + +func TestBlockBaseQuerierSearchLabelNames(t *testing.T) { + ctx := t.Context() + q := newBlockBaseQuerierForSearch(t, + labels.FromStrings("env", "prod", "job", "api"), + labels.FromStrings("env", "dev", "job", "api"), + labels.FromStrings("region", "eu"), + ) + + t.Run("no filter returns all names", func(t *testing.T) { + rs := q.SearchLabelNames(ctx, nil) + got := collectSearchResultSet(t, rs) + gotValues := make([]string, len(got)) + for i, r := range got { + gotValues[i] = r.Value + require.Equal(t, 1.0, r.Score) + } + slices.Sort(gotValues) + require.Equal(t, []string{"env", "job", "region"}, gotValues) + }) + + t.Run("filter selects matching names", func(t *testing.T) { + rs := q.SearchLabelNames(ctx, &storage.SearchHints{Filter: prefixFilter{"e"}}) + got := collectSearchResultSet(t, rs) + require.Len(t, got, 1) + require.Equal(t, "env", got[0].Value) + }) + + t.Run("limit is applied", func(t *testing.T) { + rs := q.SearchLabelNames(ctx, &storage.SearchHints{Limit: 1}) + got := collectSearchResultSet(t, rs) + require.Len(t, got, 1) + }) +} + +func TestBlockBaseQuerierSearchLabelValues(t *testing.T) { + ctx := t.Context() + q := newBlockBaseQuerierForSearch(t, + labels.FromStrings("env", "prod"), + labels.FromStrings("env", "dev"), + labels.FromStrings("env", "staging"), + ) + + t.Run("no filter returns all values", func(t *testing.T) { + rs := q.SearchLabelValues(ctx, "env", nil) + got := collectSearchResultSet(t, rs) + gotValues := make([]string, len(got)) + for i, r := range got { + gotValues[i] = r.Value + require.Equal(t, 1.0, r.Score) + } + slices.Sort(gotValues) + require.Equal(t, []string{"dev", "prod", "staging"}, gotValues) + }) + + t.Run("filter selects matching values", func(t *testing.T) { + rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{Filter: prefixFilter{"p"}}) + got := collectSearchResultSet(t, rs) + require.Len(t, got, 1) + require.Equal(t, "prod", got[0].Value) + }) + + t.Run("limit is applied after filtering", func(t *testing.T) { + rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{ + Filter: prefixFilter{"p"}, + Limit: 1, + }) + got := collectSearchResultSet(t, rs) + require.Equal(t, []storage.SearchResult{{Value: "prod", Score: 1.0}}, got) + }) + + t.Run("limit is applied", func(t *testing.T) { + rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{Limit: 2}) + got := collectSearchResultSet(t, rs) + require.Len(t, got, 2) + }) + + t.Run("unknown label name returns empty", func(t *testing.T) { + rs := q.SearchLabelValues(ctx, "unknown", nil) + got := collectSearchResultSet(t, rs) + require.Empty(t, got) + }) + + t.Run("OrderByValueDesc returns values in reverse alphabetical order", func(t *testing.T) { + rs := q.SearchLabelValues(ctx, "env", &storage.SearchHints{ + OrderBy: storage.OrderByValueDesc, + }) + got := collectSearchResultSet(t, rs) + gotValues := make([]string, len(got)) + for i, r := range got { + gotValues[i] = r.Value + require.Equal(t, 1.0, r.Score) + } + require.Equal(t, []string{"staging", "prod", "dev"}, gotValues) + }) +}