diff --git a/storage/merge.go b/storage/merge.go index 194494b6a9..2424b26ab7 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -45,25 +45,24 @@ type mergeGenericQuerier struct { // // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier { + primaries = filterQueriers(primaries) + secondaries = filterQueriers(secondaries) + switch { - case len(primaries)+len(secondaries) == 0: + case len(primaries) == 0 && len(secondaries) == 0: return noopQuerier{} case len(primaries) == 1 && len(secondaries) == 0: return primaries[0] case len(primaries) == 0 && len(secondaries) == 1: - return secondaries[0] + return &querierAdapter{newSecondaryQuerierFrom(secondaries[0])} } queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { - if _, ok := q.(noopQuerier); !ok && q != nil { - queriers = append(queriers, newGenericQuerierFrom(q)) - } + queriers = append(queriers, newGenericQuerierFrom(q)) } for _, q := range secondaries { - if _, ok := q.(noopQuerier); !ok && q != nil { - queriers = append(queriers, newSecondaryQuerierFrom(q)) - } + queriers = append(queriers, newSecondaryQuerierFrom(q)) } concurrentSelect := false @@ -77,31 +76,40 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer }} } +func filterQueriers(qs []Querier) []Querier { + ret := make([]Querier, 0, len(qs)) + for _, q := range qs { + if _, ok := q.(noopQuerier); !ok && q != nil { + ret = append(ret, q) + } + } + return ret +} + // NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers. // See NewFanout commentary to learn more about primary vs secondary differences. // // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier { + primaries = filterChunkQueriers(primaries) + secondaries = filterChunkQueriers(secondaries) + switch { case len(primaries) == 0 && len(secondaries) == 0: return noopChunkQuerier{} case len(primaries) == 1 && len(secondaries) == 0: return primaries[0] case len(primaries) == 0 && len(secondaries) == 1: - return secondaries[0] + return &chunkQuerierAdapter{newSecondaryQuerierFromChunk(secondaries[0])} } queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { - if _, ok := q.(noopChunkQuerier); !ok && q != nil { - queriers = append(queriers, newGenericQuerierFromChunk(q)) - } + queriers = append(queriers, newGenericQuerierFromChunk(q)) } - for _, querier := range secondaries { - if _, ok := querier.(noopChunkQuerier); !ok && querier != nil { - queriers = append(queriers, newSecondaryQuerierFromChunk(querier)) - } + for _, q := range secondaries { + queriers = append(queriers, newSecondaryQuerierFromChunk(q)) } concurrentSelect := false @@ -115,6 +123,16 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica }} } +func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier { + ret := make([]ChunkQuerier, 0, len(qs)) + for _, q := range qs { + if _, ok := q.(noopChunkQuerier); !ok && q != nil { + ret = append(ret, q) + } + } + return ret +} + // Select returns a set of series that matches the given label matchers. func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { seriesSets := make([]genericSeriesSet, 0, len(q.queriers)) diff --git a/storage/merge_test.go b/storage/merge_test.go index 7619af3c1f..b145743c86 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -912,9 +912,23 @@ func TestConcatenatingChunkIterator(t *testing.T) { } type mockQuerier struct { - LabelQuerier + mtx sync.Mutex - toReturn []Series + toReturn []Series // Response for Select. + + closed bool + labelNamesCalls int + labelNamesRequested []labelNameRequest + sortedSeriesRequested []bool + + resp []string // Response for LabelNames and LabelValues; turned into Select response if toReturn is not supplied. + warnings annotations.Annotations + err error +} + +type labelNameRequest struct { + name string + matchers []*labels.Matcher } type seriesByLabel []Series @@ -924,13 +938,47 @@ func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } func (m *mockQuerier) Select(_ context.Context, sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet { - cpy := make([]Series, len(m.toReturn)) - copy(cpy, m.toReturn) + m.mtx.Lock() + defer m.mtx.Unlock() + m.sortedSeriesRequested = append(m.sortedSeriesRequested, sortSeries) + + var ret []Series + if len(m.toReturn) > 0 { + ret = make([]Series, len(m.toReturn)) + copy(ret, m.toReturn) + } else if len(m.resp) > 0 { + ret = make([]Series, 0, len(m.resp)) + for _, l := range m.resp { + ret = append(ret, NewListSeries(labels.FromStrings("test", l), nil)) + } + } if sortSeries { - sort.Sort(seriesByLabel(cpy)) + sort.Sort(seriesByLabel(ret)) } - return NewMockSeriesSet(cpy...) + return &mockSeriesSet{idx: -1, series: ret, warnings: m.warnings, err: m.err} +} + +func (m *mockQuerier) LabelValues(_ context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + m.mtx.Lock() + m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{ + name: name, + matchers: matchers, + }) + m.mtx.Unlock() + return m.resp, m.warnings, m.err +} + +func (m *mockQuerier) LabelNames(context.Context, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + m.mtx.Lock() + m.labelNamesCalls++ + m.mtx.Unlock() + return m.resp, m.warnings, m.err +} + +func (m *mockQuerier) Close() error { + m.closed = true + return nil } type mockChunkQuerier struct { @@ -960,6 +1008,9 @@ func (m *mockChunkQuerier) Select(_ context.Context, sortSeries bool, _ *SelectH type mockSeriesSet struct { idx int series []Series + + warnings annotations.Annotations + err error } func NewMockSeriesSet(series ...Series) SeriesSet { @@ -970,15 +1021,18 @@ func NewMockSeriesSet(series ...Series) SeriesSet { } func (m *mockSeriesSet) Next() bool { + if m.err != nil { + return false + } m.idx++ return m.idx < len(m.series) } func (m *mockSeriesSet) At() Series { return m.series[m.idx] } -func (m *mockSeriesSet) Err() error { return nil } +func (m *mockSeriesSet) Err() error { return m.err } -func (m *mockSeriesSet) Warnings() annotations.Annotations { return nil } +func (m *mockSeriesSet) Warnings() annotations.Annotations { return m.warnings } type mockChunkSeriesSet struct { idx int @@ -1336,105 +1390,44 @@ func BenchmarkMergeSeriesSet(b *testing.B) { } } -type mockGenericQuerier struct { - mtx sync.Mutex - - closed bool - labelNamesCalls int - labelNamesRequested []labelNameRequest - sortedSeriesRequested []bool - - resp []string - warnings annotations.Annotations - err error -} - -type labelNameRequest struct { - name string - matchers []*labels.Matcher -} - -func (m *mockGenericQuerier) Select(_ context.Context, b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet { - m.mtx.Lock() - m.sortedSeriesRequested = append(m.sortedSeriesRequested, b) - m.mtx.Unlock() - return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err} -} - -func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - m.mtx.Lock() - m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{ - name: name, - matchers: matchers, - }) - m.mtx.Unlock() - return m.resp, m.warnings, m.err -} - -func (m *mockGenericQuerier) LabelNames(context.Context, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { - m.mtx.Lock() - m.labelNamesCalls++ - m.mtx.Unlock() - return m.resp, m.warnings, m.err -} - -func (m *mockGenericQuerier) Close() error { - m.closed = true - return nil -} - -type mockGenericSeriesSet struct { - resp []string - warnings annotations.Annotations - err error - - curr int -} - -func (m *mockGenericSeriesSet) Next() bool { - if m.err != nil { - return false +func visitMockQueriers(t *testing.T, qr Querier, f func(t *testing.T, q *mockQuerier)) int { + count := 0 + switch x := qr.(type) { + case *mockQuerier: + count++ + f(t, x) + case *querierAdapter: + count += visitMockQueriersInGenericQuerier(t, x.genericQuerier, f) } - if m.curr >= len(m.resp) { - return false + return count +} + +func visitMockQueriersInGenericQuerier(t *testing.T, g genericQuerier, f func(t *testing.T, q *mockQuerier)) int { + count := 0 + switch x := g.(type) { + case *mergeGenericQuerier: + for _, q := range x.queriers { + count += visitMockQueriersInGenericQuerier(t, q, f) + } + case *genericQuerierAdapter: + // Visitor for chunkQuerier not implemented. + count += visitMockQueriers(t, x.q, f) + case *secondaryQuerier: + count += visitMockQueriersInGenericQuerier(t, x.genericQuerier, f) } - m.curr++ - return true + return count } -func (m *mockGenericSeriesSet) Err() error { return m.err } -func (m *mockGenericSeriesSet) Warnings() annotations.Annotations { return m.warnings } - -func (m *mockGenericSeriesSet) At() Labels { - return mockLabels(m.resp[m.curr-1]) -} - -type mockLabels string - -func (l mockLabels) Labels() labels.Labels { - return labels.FromStrings("test", string(l)) -} - -func unwrapMockGenericQuerier(t *testing.T, qr genericQuerier) *mockGenericQuerier { - m, ok := qr.(*mockGenericQuerier) - if !ok { - s, ok := qr.(*secondaryQuerier) - require.True(t, ok, "expected secondaryQuerier got something else") - m, ok = s.genericQuerier.(*mockGenericQuerier) - require.True(t, ok, "expected mockGenericQuerier got something else") - } - return m -} - -func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { +func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) { var ( errStorage = errors.New("storage error") warnStorage = errors.New("storage warning") ctx = context.Background() ) for _, tcase := range []struct { - name string - queriers []genericQuerier + name string + primaries []Querier + secondaries []Querier expectedSelectsSeries []labels.Labels expectedLabels []string @@ -1443,10 +1436,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedErrs [4]error }{ { - // NewMergeQuerier will not create a mergeGenericQuerier - // with just one querier inside, but we can test it anyway. - name: "one successful primary querier", - queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}}, + name: "one successful primary querier", + primaries: []Querier{&mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}}, expectedSelectsSeries: []labels.Labels{ labels.FromStrings("test", "a"), labels.FromStrings("test", "b"), @@ -1455,9 +1446,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }, { name: "multiple successful primary queriers", - queriers: []genericQuerier{ - &mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}, - &mockGenericQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil}, + primaries: []Querier{ + &mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}, + &mockQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil}, }, expectedSelectsSeries: []labels.Labels{ labels.FromStrings("test", "a"), @@ -1468,15 +1459,17 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }, { name: "one failed primary querier", - queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}}, + primaries: []Querier{&mockQuerier{warnings: nil, err: errStorage}}, expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with successful secondaries", - queriers: []genericQuerier{ - &mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}}, + primaries: []Querier{ + &mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}, + }, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b"}, warnings: nil, err: nil}, + &mockQuerier{resp: []string{"c"}, warnings: nil, err: nil}, }, expectedSelectsSeries: []labels.Labels{ labels.FromStrings("test", "a"), @@ -1487,10 +1480,12 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }, { name: "one successful primary querier with empty response and successful secondaries", - queriers: []genericQuerier{ - &mockGenericQuerier{resp: []string{}, warnings: nil, err: nil}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}}, + primaries: []Querier{ + &mockQuerier{resp: []string{}, warnings: nil, err: nil}, + }, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b"}, warnings: nil, err: nil}, + &mockQuerier{resp: []string{"c"}, warnings: nil, err: nil}, }, expectedSelectsSeries: []labels.Labels{ labels.FromStrings("test", "b"), @@ -1500,19 +1495,42 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }, { name: "one failed primary querier with successful secondaries", - queriers: []genericQuerier{ - &mockGenericQuerier{warnings: nil, err: errStorage}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}}, + primaries: []Querier{ + &mockQuerier{warnings: nil, err: errStorage}, + }, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b"}, warnings: nil, err: nil}, + &mockQuerier{resp: []string{"c"}, warnings: nil, err: nil}, }, expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, }, + { + name: "nil primary querier with failed secondary", + primaries: nil, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}, + }, + expectedLabels: []string{}, + expectedWarnings: annotations.New().Add(errStorage), + }, + { + name: "nil primary querier with two failed secondaries", + primaries: nil, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}, + &mockQuerier{resp: []string{"c"}, warnings: nil, err: errStorage}, + }, + expectedLabels: []string{}, + expectedWarnings: annotations.New().Add(errStorage), + }, { name: "one successful primary querier with failed secondaries", - queriers: []genericQuerier{ - &mockGenericQuerier{resp: []string{"a"}, warnings: nil, err: nil}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: errStorage}}, + primaries: []Querier{ + &mockQuerier{resp: []string{"a"}, warnings: nil, err: nil}, + }, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}, + &mockQuerier{resp: []string{"c"}, warnings: nil, err: errStorage}, }, expectedSelectsSeries: []labels.Labels{ labels.FromStrings("test", "a"), @@ -1522,9 +1540,11 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }, { name: "successful queriers with warnings", - queriers: []genericQuerier{ - &mockGenericQuerier{resp: []string{"a"}, warnings: annotations.New().Add(warnStorage), err: nil}, - &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: annotations.New().Add(warnStorage), err: nil}}, + primaries: []Querier{ + &mockQuerier{resp: []string{"a"}, warnings: annotations.New().Add(warnStorage), err: nil}, + }, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b"}, warnings: annotations.New().Add(warnStorage), err: nil}, }, expectedSelectsSeries: []labels.Labels{ labels.FromStrings("test", "a"), @@ -1535,10 +1555,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }, } { t.Run(tcase.name, func(t *testing.T) { - q := &mergeGenericQuerier{ - queriers: tcase.queriers, - mergeFn: func(l ...Labels) Labels { return l[0] }, - } + q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] }) t.Run("Select", func(t *testing.T) { res := q.Select(context.Background(), false, nil) @@ -1551,65 +1568,70 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.ErrorIs(t, res.Err(), tcase.expectedErrs[0], "expected error doesn't match") require.Equal(t, tcase.expectedSelectsSeries, lbls) - for _, qr := range q.queriers { - m := unwrapMockGenericQuerier(t, qr) - // mergeGenericQuerier forces all Selects to be sorted. - require.Equal(t, []bool{true}, m.sortedSeriesRequested) - } + n := visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) { + // Single queries should be unsorted; merged queries sorted. + exp := len(tcase.primaries)+len(tcase.secondaries) > 1 + require.Equal(t, []bool{exp}, m.sortedSeriesRequested) + }) + // Check we visited all queriers. + require.Equal(t, len(tcase.primaries)+len(tcase.secondaries), n) }) t.Run("LabelNames", func(t *testing.T) { res, w, err := q.LabelNames(ctx, nil) require.Subset(t, tcase.expectedWarnings, w) require.ErrorIs(t, err, tcase.expectedErrs[1], "expected error doesn't match") - require.Equal(t, tcase.expectedLabels, res) + requireEqualSlice(t, tcase.expectedLabels, res) if err != nil { return } - for _, qr := range q.queriers { - m := unwrapMockGenericQuerier(t, qr) - + visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) { require.Equal(t, 1, m.labelNamesCalls) - } + }) }) t.Run("LabelValues", func(t *testing.T) { res, w, err := q.LabelValues(ctx, "test", nil) require.Subset(t, tcase.expectedWarnings, w) require.ErrorIs(t, err, tcase.expectedErrs[2], "expected error doesn't match") - require.Equal(t, tcase.expectedLabels, res) + requireEqualSlice(t, tcase.expectedLabels, res) if err != nil { return } - for _, qr := range q.queriers { - m := unwrapMockGenericQuerier(t, qr) - + visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) { require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested) - } + }) }) t.Run("LabelValuesWithMatchers", func(t *testing.T) { matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue") res, w, err := q.LabelValues(ctx, "test2", nil, matcher) require.Subset(t, tcase.expectedWarnings, w) require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match") - require.Equal(t, tcase.expectedLabels, res) + requireEqualSlice(t, tcase.expectedLabels, res) if err != nil { return } - for _, qr := range q.queriers { - m := unwrapMockGenericQuerier(t, qr) - + visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) { require.Equal(t, []labelNameRequest{ {name: "test"}, {name: "test2", matchers: []*labels.Matcher{matcher}}, }, m.labelNamesRequested) - } + }) }) }) } } +// Check slice but ignore difference between nil and empty. +func requireEqualSlice[T any](t require.TestingT, a, b []T, msgAndArgs ...interface{}) { + if len(a) == 0 { + require.Empty(t, b, msgAndArgs...) + } else { + require.Equal(t, a, b, msgAndArgs...) + } +} + type errIterator struct { err error }