diff --git a/storage/remote/client.go b/storage/remote/client.go index 68891f659e..0c32002f66 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "net/http/httptrace" "strconv" @@ -112,8 +113,9 @@ type Client struct { Client *http.Client timeout time.Duration - retryOnRateLimit bool - chunkedReadLimit uint64 + retryOnRateLimit bool + chunkedReadLimit uint64 + acceptedResponseTypes []prompb.ReadRequest_ResponseType readQueries prometheus.Gauge readQueriesTotal *prometheus.CounterVec @@ -125,23 +127,25 @@ type Client struct { // ClientConfig configures a client. type ClientConfig struct { - URL *config_util.URL - Timeout model.Duration - HTTPClientConfig config_util.HTTPClientConfig - SigV4Config *sigv4.SigV4Config - AzureADConfig *azuread.AzureADConfig - GoogleIAMConfig *googleiam.Config - Headers map[string]string - RetryOnRateLimit bool - WriteProtoMsg config.RemoteWriteProtoMsg - ChunkedReadLimit uint64 - RoundRobinDNS bool + URL *config_util.URL + Timeout model.Duration + HTTPClientConfig config_util.HTTPClientConfig + SigV4Config *sigv4.SigV4Config + AzureADConfig *azuread.AzureADConfig + GoogleIAMConfig *googleiam.Config + Headers map[string]string + RetryOnRateLimit bool + WriteProtoMsg config.RemoteWriteProtoMsg + ChunkedReadLimit uint64 + RoundRobinDNS bool + AcceptedResponseTypes []prompb.ReadRequest_ResponseType } // ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can // also fall back to the SAMPLES method if necessary. type ReadClient interface { Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) + ReadMultiple(ctx context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error) } // NewReadClient creates a new client for remote read. @@ -157,15 +161,22 @@ func NewReadClient(name string, conf *ClientConfig, optFuncs ...config_util.HTTP } httpClient.Transport = otelhttp.NewTransport(t) + // Set accepted response types, default to existing behavior if not specified + acceptedResponseTypes := conf.AcceptedResponseTypes + if len(acceptedResponseTypes) == 0 { + acceptedResponseTypes = AcceptedResponseTypes + } + return &Client{ - remoteName: name, - urlString: conf.URL.String(), - Client: httpClient, - timeout: time.Duration(conf.Timeout), - chunkedReadLimit: conf.ChunkedReadLimit, - readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), - readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), - readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), + remoteName: name, + urlString: conf.URL.String(), + Client: httpClient, + timeout: time.Duration(conf.Timeout), + chunkedReadLimit: conf.ChunkedReadLimit, + acceptedResponseTypes: acceptedResponseTypes, + readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), + readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), + readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), }, nil } @@ -337,27 +348,44 @@ func (c *Client) Endpoint() string { return c.urlString } -// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a sampled response; +// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a samples response; // chunked responses arrive already sorted by the server. func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) { + return c.ReadMultiple(ctx, []*prompb.Query{query}, sortSeries) +} + +// ReadMultiple reads from a remote endpoint using multiple queries in a single request. +// The sortSeries parameter is only respected in the case of a samples response; +// chunked responses arrive already sorted by the server. +// Returns a single SeriesSet with interleaved series from all queries. +func (c *Client) ReadMultiple(ctx context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error) { c.readQueries.Inc() defer c.readQueries.Dec() req := &prompb.ReadRequest{ - // TODO: Support batching multiple queries into one read request, - // as the protobuf interface allows for it. - Queries: []*prompb.Query{query}, - AcceptedResponseTypes: AcceptedResponseTypes, + Queries: queries, + AcceptedResponseTypes: c.acceptedResponseTypes, } + + httpResp, cancel, start, err := c.executeReadRequest(ctx, req, "Remote Read Multiple") + if err != nil { + return nil, err + } + + return c.handleReadResponse(httpResp, req, queries, sortSeries, start, cancel) +} + +// executeReadRequest creates and executes an HTTP request for reading data. +func (c *Client) executeReadRequest(ctx context.Context, req *prompb.ReadRequest, spanName string) (*http.Response, context.CancelFunc, time.Time, error) { data, err := proto.Marshal(req) if err != nil { - return nil, fmt.Errorf("unable to marshal read request: %w", err) + return nil, nil, time.Time{}, fmt.Errorf("unable to marshal read request: %w", err) } compressed := snappy.Encode(nil, data) httpReq, err := http.NewRequest(http.MethodPost, c.urlString, bytes.NewReader(compressed)) if err != nil { - return nil, fmt.Errorf("unable to create request: %w", err) + return nil, nil, time.Time{}, fmt.Errorf("unable to create request: %w", err) } httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Add("Accept-Encoding", "snappy") @@ -368,16 +396,21 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) errTimeout := fmt.Errorf("%w: request timed out after %s", context.DeadlineExceeded, c.timeout) ctx, cancel := context.WithTimeoutCause(ctx, c.timeout, errTimeout) - ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span := otel.Tracer("").Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindClient)) defer span.End() start := time.Now() httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) if err != nil { cancel() - return nil, fmt.Errorf("error sending request: %w", err) + return nil, nil, time.Time{}, fmt.Errorf("error sending request: %w", err) } + return httpResp, cancel, start, nil +} + +// handleReadResponse processes the HTTP response and returns a SeriesSet. +func (c *Client) handleReadResponse(httpResp *http.Response, req *prompb.ReadRequest, queries []*prompb.Query, sortSeries bool, start time.Time, cancel context.CancelFunc) (storage.SeriesSet, error) { if httpResp.StatusCode/100 != 2 { // Make an attempt at getting an error message. body, _ := io.ReadAll(httpResp.Body) @@ -393,16 +426,16 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) switch { case strings.HasPrefix(contentType, "application/x-protobuf"): - c.readQueriesDuration.WithLabelValues("sampled").Observe(time.Since(start).Seconds()) - c.readQueriesTotal.WithLabelValues("sampled", strconv.Itoa(httpResp.StatusCode)).Inc() - ss, err := c.handleSampledResponse(req, httpResp, sortSeries) + c.readQueriesDuration.WithLabelValues("samples").Observe(time.Since(start).Seconds()) + c.readQueriesTotal.WithLabelValues("samples", strconv.Itoa(httpResp.StatusCode)).Inc() + ss, err := c.handleSamplesResponseImpl(req, httpResp, sortSeries) cancel() return ss, err case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"): c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds()) s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil) - return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) { + return c.handleChunkedResponseImpl(s, httpResp, queries, func(err error) { code := strconv.Itoa(httpResp.StatusCode) if !errors.Is(err, io.EOF) { code = "aborted_stream" @@ -418,7 +451,8 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) } } -func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.Response, sortSeries bool) (storage.SeriesSet, error) { +// handleSamplesResponseImpl handles samples responses for both single and multiple queries. +func (c *Client) handleSamplesResponseImpl(req *prompb.ReadRequest, httpResp *http.Response, sortSeries bool) (storage.SeriesSet, error) { compressed, err := io.ReadAll(httpResp.Body) if err != nil { return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err) @@ -443,8 +477,60 @@ func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.R return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) } - // This client does not batch queries so there's always only 1 result. - res := resp.Results[0] - - return FromQueryResult(sortSeries, res), nil + return combineQueryResults(resp.Results, sortSeries) +} + +// combineQueryResults combines multiple query results into a single SeriesSet, +// handling both sorted and unsorted cases appropriately. +func combineQueryResults(results []*prompb.QueryResult, sortSeries bool) (storage.SeriesSet, error) { + if len(results) == 0 { + return &concreteSeriesSet{series: nil, cur: 0}, nil + } + + if len(results) == 1 { + return FromQueryResult(sortSeries, results[0]), nil + } + + // Multiple queries case - combine all results + if sortSeries { + // When sorting is requested, use MergeSeriesSet which can efficiently merge sorted inputs + var allSeriesSets []storage.SeriesSet + for _, result := range results { + seriesSet := FromQueryResult(sortSeries, result) + if err := seriesSet.Err(); err != nil { + return nil, fmt.Errorf("error reading series from query result: %w", err) + } + allSeriesSets = append(allSeriesSets, seriesSet) + } + return storage.NewMergeSeriesSet(allSeriesSets, 0, storage.ChainedSeriesMerge), nil + } + + // When sorting is not requested, just concatenate all series without using MergeSeriesSet + // since MergeSeriesSet requires sorted inputs + var allSeries []storage.Series + for _, result := range results { + seriesSet := FromQueryResult(sortSeries, result) + for seriesSet.Next() { + allSeries = append(allSeries, seriesSet.At()) + } + if err := seriesSet.Err(); err != nil { + return nil, fmt.Errorf("error reading series from query result: %w", err) + } + } + + return &concreteSeriesSet{series: allSeries, cur: 0}, nil +} + +// handleChunkedResponseImpl handles chunked responses for both single and multiple queries. +func (c *Client) handleChunkedResponseImpl(s *ChunkedReader, httpResp *http.Response, queries []*prompb.Query, onClose func(error)) storage.SeriesSet { + // For multiple queries in chunked response, we'll still use the existing infrastructure + // but we need to provide the timestamp range that covers all queries + var minStartTs, maxEndTs int64 = math.MaxInt64, math.MinInt64 + + for _, query := range queries { + minStartTs = min(minStartTs, query.StartTimestampMs) + maxEndTs = max(maxEndTs, query.EndTimestampMs) + } + + return NewChunkedSeriesSet(s, httpResp.Body, minStartTs, maxEndTs, onClose) } diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 8073f23b3b..78ed98f1a8 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/testutil" ) @@ -194,9 +195,10 @@ func TestSeriesSetFilter(t *testing.T) { } type mockedRemoteClient struct { - got *prompb.Query - store []*prompb.TimeSeries - b labels.ScratchBuilder + got *prompb.Query + gotMultiple []*prompb.Query + store []*prompb.TimeSeries + b labels.ScratchBuilder } func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) { @@ -224,15 +226,69 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSe } } - if !notMatch { - q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels}) + if notMatch { + continue } + // Filter samples by query time range + var filteredSamples []prompb.Sample + for _, sample := range s.Samples { + if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp <= query.EndTimestampMs { + filteredSamples = append(filteredSamples, sample) + } + } + q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels, Samples: filteredSamples}) } return FromQueryResult(sortSeries, q), nil } +func (c *mockedRemoteClient) ReadMultiple(_ context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error) { + // Store the queries for verification + c.gotMultiple = make([]*prompb.Query, len(queries)) + copy(c.gotMultiple, queries) + + // Simulate the same behavior as the real client + var results []*prompb.QueryResult + for _, query := range queries { + matchers, err := FromLabelMatchers(query.Matchers) + if err != nil { + return nil, err + } + + q := &prompb.QueryResult{} + for _, s := range c.store { + l := s.ToLabels(&c.b, nil) + var notMatch bool + + for _, m := range matchers { + v := l.Get(m.Name) + if !m.Matches(v) { + notMatch = true + break + } + } + + if notMatch { + continue + } + // Filter samples by query time range + var filteredSamples []prompb.Sample + for _, sample := range s.Samples { + if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp <= query.EndTimestampMs { + filteredSamples = append(filteredSamples, sample) + } + } + q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels, Samples: filteredSamples}) + } + results = append(results, q) + } + + // Use the same logic as the real client + return combineQueryResults(results, sortSeries) +} + func (c *mockedRemoteClient) reset() { c.got = nil + c.gotMultiple = nil } // NOTE: We don't need to test ChunkQuerier as it's uses querier for all operations anyway. @@ -494,3 +550,336 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { }) } } + +func TestReadMultiple(t *testing.T) { + const sampleIntervalMs = 250 + + // Helper function to calculate series multiplier based on labels + getSeriesMultiplier := func(labels []prompb.Label) uint64 { + // Create a simple hash from labels to generate unique values per series + labelHash := uint64(0) + for _, label := range labels { + for _, b := range label.Name + label.Value { + labelHash = labelHash*31 + uint64(b) + } + } + return labelHash % sampleIntervalMs + } + + // Helper function to generate a complete time series with samples at 250ms intervals + // Each series gets different sample values based on a hash of their labels + generateSeries := func(labels []prompb.Label, startMs, endMs int64) *prompb.TimeSeries { + seriesMultiplier := getSeriesMultiplier(labels) + + var samples []prompb.Sample + for ts := startMs; ts <= endMs; ts += sampleIntervalMs { + samples = append(samples, prompb.Sample{ + Timestamp: ts, + Value: float64(ts + int64(seriesMultiplier)), // Unique value per series + }) + } + + return &prompb.TimeSeries{ + Labels: labels, + Samples: samples, + } + } + + m := &mockedRemoteClient{ + store: []*prompb.TimeSeries{ + generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 0, 10000), + generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 0, 10000), + generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 0, 10000), + generateSeries([]prompb.Label{{Name: "instance", Value: "localhost:9090"}}, 0, 10000), + }, + b: labels.NewScratchBuilder(0), + } + + testCases := []struct { + name string + queries []*prompb.Query + expectedResults []*prompb.TimeSeries + }{ + { + name: "single query", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"}, + }, + }, + }, + expectedResults: []*prompb.TimeSeries{ + generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000), + }, + }, + { + name: "multiple queries - different matchers", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"}, + }, + }, + { + StartTimestampMs: 1500, + EndTimestampMs: 2500, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "job", Value: "node_exporter"}, + }, + }, + }, + expectedResults: []*prompb.TimeSeries{ + generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 1500, 2500), + generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000), + }, + }, + { + name: "multiple queries - overlapping results", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_RE, Name: "job", Value: "prometheus|node_exporter"}, + }, + }, + { + StartTimestampMs: 1500, + EndTimestampMs: 2500, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"}, + }, + }, + }, + expectedResults: []*prompb.TimeSeries{ + generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 1500, 2500), + generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 1000, 2000), + generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000), + }, + }, + { + name: "query with no results", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "job", Value: "nonexistent"}, + }, + }, + }, + expectedResults: nil, // empty result + }, + { + name: "empty query list", + queries: []*prompb.Query{}, + expectedResults: nil, + }, + { + name: "three queries with mixed results", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"}, + }, + }, + { + StartTimestampMs: 1500, + EndTimestampMs: 2500, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "job", Value: "nonexistent"}, + }, + }, + { + StartTimestampMs: 2000, + EndTimestampMs: 3000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "instance", Value: "localhost:9090"}, + }, + }, + }, + expectedResults: []*prompb.TimeSeries{ + generateSeries([]prompb.Label{{Name: "instance", Value: "localhost:9090"}}, 2000, 3000), + generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000), + }, + }, + { + name: "same matchers with overlapping time ranges", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 5000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"}, + }, + }, + { + StartTimestampMs: 3000, + EndTimestampMs: 8000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"}, + }, + }, + }, + expectedResults: []*prompb.TimeSeries{ + generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 1000, 8000), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m.reset() + + result, err := m.ReadMultiple(context.Background(), tc.queries, true) + require.NoError(t, err) + + // Verify the queries were stored correctly + require.Equal(t, tc.queries, m.gotMultiple) + + // Verify the combined result matches expected + var got []*prompb.TimeSeries + for result.Next() { + series := result.At() + var samples []prompb.Sample + iterator := series.Iterator(nil) + + // Collect actual samples + for iterator.Next() != chunkenc.ValNone { + ts, value := iterator.At() + samples = append(samples, prompb.Sample{ + Timestamp: ts, + Value: value, + }) + } + require.NoError(t, iterator.Err()) + + got = append(got, &prompb.TimeSeries{ + Labels: prompb.FromLabels(series.Labels(), nil), + Samples: samples, + }) + } + require.NoError(t, result.Err()) + + require.ElementsMatch(t, tc.expectedResults, got) + }) + } +} + +func TestReadMultipleErrorHandling(t *testing.T) { + m := &mockedRemoteClient{ + store: []*prompb.TimeSeries{ + {Labels: []prompb.Label{{Name: "job", Value: "prometheus"}}}, + }, + b: labels.NewScratchBuilder(0), + } + + // Test with invalid matcher - should return error + queries := []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_Type(999), Name: "job", Value: "prometheus"}, // invalid matcher type + }, + }, + } + + result, err := m.ReadMultiple(context.Background(), queries, true) + require.Error(t, err) + require.Nil(t, result) +} + +func TestReadMultipleSorting(t *testing.T) { + // Test data with labels designed to test sorting behavior + // When sorted: aaa < bbb < ccc + // When unsorted: order depends on processing order + m := &mockedRemoteClient{ + store: []*prompb.TimeSeries{ + {Labels: []prompb.Label{{Name: "series", Value: "ccc"}}}, // Will be returned by query 1 + {Labels: []prompb.Label{{Name: "series", Value: "aaa"}}}, // Will be returned by query 2 + {Labels: []prompb.Label{{Name: "series", Value: "bbb"}}}, // Will be returned by both queries (overlapping) + }, + b: labels.NewScratchBuilder(0), + } + + testCases := []struct { + name string + queries []*prompb.Query + sortSeries bool + expectedOrder []string + }{ + { + name: "multiple queries with sortSeries=true - should be sorted", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_RE, Name: "series", Value: "ccc|bbb"}, // Returns: ccc, bbb (unsorted in store) + }, + }, + { + StartTimestampMs: 1500, + EndTimestampMs: 2500, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_RE, Name: "series", Value: "aaa|bbb"}, // Returns: aaa, bbb (unsorted in store) + }, + }, + }, + sortSeries: true, + expectedOrder: []string{"aaa", "bbb", "ccc"}, // Should be sorted after merge + }, + { + name: "multiple queries with sortSeries=false - concatenates without deduplication", + queries: []*prompb.Query{ + { + StartTimestampMs: 1000, + EndTimestampMs: 2000, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_RE, Name: "series", Value: "ccc|bbb"}, // Returns: ccc, bbb (unsorted) + }, + }, + { + StartTimestampMs: 1500, + EndTimestampMs: 2500, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_RE, Name: "series", Value: "aaa|bbb"}, // Returns: aaa, bbb (unsorted) + }, + }, + }, + sortSeries: false, + expectedOrder: []string{"ccc", "bbb", "aaa", "bbb"}, // Concatenated results - duplicates included + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m.reset() + + result, err := m.ReadMultiple(context.Background(), tc.queries, tc.sortSeries) + require.NoError(t, err) + + // Collect the actual results + var actualOrder []string + for result.Next() { + series := result.At() + seriesValue := series.Labels().Get("series") + actualOrder = append(actualOrder, seriesValue) + } + require.NoError(t, result.Err()) + + // Verify the expected order matches actual order + // For sortSeries=true: results should be in sorted order + // For sortSeries=false: results should be in concatenated order (with duplicates) + testutil.RequireEqual(t, tc.expectedOrder, actualOrder) + }) + } +}