remote.ReadClient: allow multiple queries (#16742)

* remote read: simplify ReadMultiple to return single SeriesSet

Changed ReadMultiple to return a single SeriesSet with interleaved
series from all queries instead of a slice of SeriesSets. This
simplifies the interface and removes the complex multiplexing
infrastructure while maintaining the ability to send multiple
queries in a single HTTP request.

Changes:
- Updated ReadClient interface: ReadMultiple now returns storage.SeriesSet
- Removed multiplexing infrastructure (MessageQueue, QueueConsumer, etc.)
- Simplified response handling to interleave series from all queries
- Updated tests to match new interface
- All existing tests pass

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Fix sorting behavior in ReadMultiple for samples responses

When sortSeries=false, the previous implementation incorrectly used
storage.NewMergeSeriesSet which requires sorted inputs, violating the
function's contract and potentially producing incorrect results.

Changes:
- When sortSeries=true: Use NewMergeSeriesSet for efficient merging and
  deduplication of sorted series
- When sortSeries=false: Use simple concatenation to avoid the sorted
  input requirement, preserving duplicates from overlapping queries
- Add comprehensive tests to verify both sorting behaviors
- Update existing test expectations to match correct sorted order

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Refactor to reduce code duplication in ReadMultiple implementation

Extract common query result combination logic into a shared
combineQueryResults function that handles both sorted and unsorted
cases. This eliminates duplication between the real client
implementation and the mock client used in tests.

Changes:
- Add combineQueryResults helper function in client.go
- Refactor handleSamplesResponseImpl to use the helper
- Simplify mockedRemoteClient.ReadMultiple to use the same helper
- Reduce code duplication by ~30 lines while maintaining same functionality


Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
This commit is contained in:
Dimitar Dimitrov 2025-08-14 15:00:07 +02:00 committed by GitHub
parent 17d5d80c80
commit d94dab92a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 799 additions and 39 deletions

View File

@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"net/http/httptrace"
"strconv"
@ -112,8 +113,9 @@ type Client struct {
Client *http.Client
timeout time.Duration
retryOnRateLimit bool
chunkedReadLimit uint64
retryOnRateLimit bool
chunkedReadLimit uint64
acceptedResponseTypes []prompb.ReadRequest_ResponseType
readQueries prometheus.Gauge
readQueriesTotal *prometheus.CounterVec
@ -125,23 +127,25 @@ type Client struct {
// ClientConfig configures a client.
type ClientConfig struct {
URL *config_util.URL
Timeout model.Duration
HTTPClientConfig config_util.HTTPClientConfig
SigV4Config *sigv4.SigV4Config
AzureADConfig *azuread.AzureADConfig
GoogleIAMConfig *googleiam.Config
Headers map[string]string
RetryOnRateLimit bool
WriteProtoMsg config.RemoteWriteProtoMsg
ChunkedReadLimit uint64
RoundRobinDNS bool
URL *config_util.URL
Timeout model.Duration
HTTPClientConfig config_util.HTTPClientConfig
SigV4Config *sigv4.SigV4Config
AzureADConfig *azuread.AzureADConfig
GoogleIAMConfig *googleiam.Config
Headers map[string]string
RetryOnRateLimit bool
WriteProtoMsg config.RemoteWriteProtoMsg
ChunkedReadLimit uint64
RoundRobinDNS bool
AcceptedResponseTypes []prompb.ReadRequest_ResponseType
}
// ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can
// also fall back to the SAMPLES method if necessary.
type ReadClient interface {
Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error)
ReadMultiple(ctx context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error)
}
// NewReadClient creates a new client for remote read.
@ -157,15 +161,22 @@ func NewReadClient(name string, conf *ClientConfig, optFuncs ...config_util.HTTP
}
httpClient.Transport = otelhttp.NewTransport(t)
// Set accepted response types, default to existing behavior if not specified.
acceptedResponseTypes := conf.AcceptedResponseTypes
if len(acceptedResponseTypes) == 0 {
acceptedResponseTypes = AcceptedResponseTypes
}
return &Client{
remoteName: name,
urlString: conf.URL.String(),
Client: httpClient,
timeout: time.Duration(conf.Timeout),
chunkedReadLimit: conf.ChunkedReadLimit,
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
remoteName: name,
urlString: conf.URL.String(),
Client: httpClient,
timeout: time.Duration(conf.Timeout),
chunkedReadLimit: conf.ChunkedReadLimit,
acceptedResponseTypes: acceptedResponseTypes,
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
}, nil
}
@ -337,27 +348,44 @@ func (c *Client) Endpoint() string {
return c.urlString
}
// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a sampled response;
// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a samples response;
// chunked responses arrive already sorted by the server.
func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
return c.ReadMultiple(ctx, []*prompb.Query{query}, sortSeries)
}
// ReadMultiple reads from a remote endpoint using multiple queries in a single request.
// The sortSeries parameter is only respected in the case of a samples response;
// chunked responses arrive already sorted by the server.
// Returns a single SeriesSet with interleaved series from all queries.
func (c *Client) ReadMultiple(ctx context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
c.readQueries.Inc()
defer c.readQueries.Dec()
req := &prompb.ReadRequest{
// TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it.
Queries: []*prompb.Query{query},
AcceptedResponseTypes: AcceptedResponseTypes,
Queries: queries,
AcceptedResponseTypes: c.acceptedResponseTypes,
}
httpResp, cancel, start, err := c.executeReadRequest(ctx, req)
if err != nil {
return nil, err
}
return c.handleReadResponse(httpResp, req, queries, sortSeries, start, cancel)
}
// executeReadRequest creates and executes an HTTP request for reading data.
func (c *Client) executeReadRequest(ctx context.Context, req *prompb.ReadRequest) (*http.Response, context.CancelFunc, time.Time, error) {
data, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to marshal read request: %w", err)
return nil, nil, time.Time{}, fmt.Errorf("unable to marshal read request: %w", err)
}
compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest(http.MethodPost, c.urlString, bytes.NewReader(compressed))
if err != nil {
return nil, fmt.Errorf("unable to create request: %w", err)
return nil, nil, time.Time{}, fmt.Errorf("unable to create request: %w", err)
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Add("Accept-Encoding", "snappy")
@ -375,9 +403,14 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("error sending request: %w", err)
return nil, nil, time.Time{}, fmt.Errorf("error sending request: %w", err)
}
return httpResp, cancel, start, nil
}
// handleReadResponse processes the HTTP response and returns a SeriesSet.
func (c *Client) handleReadResponse(httpResp *http.Response, req *prompb.ReadRequest, queries []*prompb.Query, sortSeries bool, start time.Time, cancel context.CancelFunc) (storage.SeriesSet, error) {
if httpResp.StatusCode/100 != 2 {
// Make an attempt at getting an error message.
body, _ := io.ReadAll(httpResp.Body)
@ -402,7 +435,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds())
s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil)
return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) {
return c.handleChunkedResponseImpl(s, httpResp, queries, func(err error) {
code := strconv.Itoa(httpResp.StatusCode)
if !errors.Is(err, io.EOF) {
code = "aborted_stream"
@ -443,8 +476,60 @@ func (*Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.Res
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
}
// This client does not batch queries so there's always only 1 result.
res := resp.Results[0]
return FromQueryResult(sortSeries, res), nil
return combineQueryResults(resp.Results, sortSeries)
}
// combineQueryResults combines multiple query results into a single SeriesSet,
// handling both sorted and unsorted cases appropriately.
func combineQueryResults(results []*prompb.QueryResult, sortSeries bool) (storage.SeriesSet, error) {
if len(results) == 0 {
return &concreteSeriesSet{series: nil, cur: 0}, nil
}
if len(results) == 1 {
return FromQueryResult(sortSeries, results[0]), nil
}
// Multiple queries case - combine all results
if sortSeries {
// When sorting is requested, use MergeSeriesSet which can efficiently merge sorted inputs
var allSeriesSets []storage.SeriesSet
for _, result := range results {
seriesSet := FromQueryResult(sortSeries, result)
if err := seriesSet.Err(); err != nil {
return nil, fmt.Errorf("error reading series from query result: %w", err)
}
allSeriesSets = append(allSeriesSets, seriesSet)
}
return storage.NewMergeSeriesSet(allSeriesSets, 0, storage.ChainedSeriesMerge), nil
}
// When sorting is not requested, just concatenate all series without using MergeSeriesSet
// since MergeSeriesSet requires sorted inputs
var allSeries []storage.Series
for _, result := range results {
seriesSet := FromQueryResult(sortSeries, result)
for seriesSet.Next() {
allSeries = append(allSeries, seriesSet.At())
}
if err := seriesSet.Err(); err != nil {
return nil, fmt.Errorf("error reading series from query result: %w", err)
}
}
return &concreteSeriesSet{series: allSeries, cur: 0}, nil
}
// handleChunkedResponseImpl handles chunked responses for both single and multiple queries.
func (*Client) handleChunkedResponseImpl(s *ChunkedReader, httpResp *http.Response, queries []*prompb.Query, onClose func(error)) storage.SeriesSet {
// For multiple queries in chunked response, we'll still use the existing infrastructure
// but we need to provide the timestamp range that covers all queries
var minStartTs, maxEndTs int64 = math.MaxInt64, math.MinInt64
for _, query := range queries {
minStartTs = min(minStartTs, query.StartTimestampMs)
maxEndTs = max(maxEndTs, query.EndTimestampMs)
}
return NewChunkedSeriesSet(s, httpResp.Body, minStartTs, maxEndTs, onClose)
}

View File

@ -16,6 +16,7 @@ package remote
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
@ -30,8 +31,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/testutil"
)
var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
@ -474,3 +478,619 @@ func delayedResponseHTTPHandler(t *testing.T, delay time.Duration) http.HandlerF
require.NoError(t, err)
}
}
func TestReadMultipleErrorHandling(t *testing.T) {
m := &mockedRemoteClient{
store: []*prompb.TimeSeries{
{Labels: []prompb.Label{{Name: "job", Value: "prometheus"}}},
},
b: labels.NewScratchBuilder(0),
}
// Test with invalid matcher - should return error
queries := []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_Type(999), Name: "job", Value: "prometheus"}, // invalid matcher type
},
},
}
result, err := m.ReadMultiple(context.Background(), queries, true)
require.Error(t, err)
require.Nil(t, result)
}
func TestReadMultiple(t *testing.T) {
const sampleIntervalMs = 250
// Helper function to calculate series multiplier based on labels
getSeriesMultiplier := func(labels []prompb.Label) uint64 {
// Create a simple hash from labels to generate unique values per series
labelHash := uint64(0)
for _, label := range labels {
for _, b := range label.Name + label.Value {
labelHash = labelHash*31 + uint64(b)
}
}
return labelHash % sampleIntervalMs
}
// Helper function to generate a complete time series with samples at 250ms intervals
// Each series gets different sample values based on a hash of their labels
generateSeries := func(labels []prompb.Label, startMs, endMs int64) *prompb.TimeSeries {
seriesMultiplier := getSeriesMultiplier(labels)
var samples []prompb.Sample
for ts := startMs; ts <= endMs; ts += sampleIntervalMs {
samples = append(samples, prompb.Sample{
Timestamp: ts,
Value: float64(ts + int64(seriesMultiplier)), // Unique value per series
})
}
return &prompb.TimeSeries{
Labels: labels,
Samples: samples,
}
}
m := &mockedRemoteClient{
store: []*prompb.TimeSeries{
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 0, 10000),
generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 0, 10000),
generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 0, 10000),
generateSeries([]prompb.Label{{Name: "instance", Value: "localhost:9090"}}, 0, 10000),
},
b: labels.NewScratchBuilder(0),
}
testCases := []struct {
name string
queries []*prompb.Query
expectedResults []*prompb.TimeSeries
}{
{
name: "single query",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
},
},
},
expectedResults: []*prompb.TimeSeries{
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
},
},
{
name: "multiple queries - different matchers",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
},
},
{
StartTimestampMs: 1500,
EndTimestampMs: 2500,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "node_exporter"},
},
},
},
expectedResults: []*prompb.TimeSeries{
generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 1500, 2500),
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
},
},
{
name: "multiple queries - overlapping results",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_RE, Name: "job", Value: "prometheus|node_exporter"},
},
},
{
StartTimestampMs: 1500,
EndTimestampMs: 2500,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"},
},
},
},
expectedResults: []*prompb.TimeSeries{
generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 1500, 2500),
generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 1000, 2000),
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
},
},
{
name: "query with no results",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "nonexistent"},
},
},
},
expectedResults: nil, // empty result
},
{
name: "empty query list",
queries: []*prompb.Query{},
expectedResults: nil,
},
{
name: "three queries with mixed results",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
},
},
{
StartTimestampMs: 1500,
EndTimestampMs: 2500,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "nonexistent"},
},
},
{
StartTimestampMs: 2000,
EndTimestampMs: 3000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "instance", Value: "localhost:9090"},
},
},
},
expectedResults: []*prompb.TimeSeries{
generateSeries([]prompb.Label{{Name: "instance", Value: "localhost:9090"}}, 2000, 3000),
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
},
},
{
name: "same matchers with overlapping time ranges",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 5000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"},
},
},
{
StartTimestampMs: 3000,
EndTimestampMs: 8000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"},
},
},
},
expectedResults: []*prompb.TimeSeries{
generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 1000, 8000),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
m.reset()
result, err := m.ReadMultiple(context.Background(), tc.queries, true)
require.NoError(t, err)
// Verify the queries were stored correctly
require.Equal(t, tc.queries, m.gotMultiple)
// Verify the combined result matches expected
var got []*prompb.TimeSeries
for result.Next() {
series := result.At()
var samples []prompb.Sample
iterator := series.Iterator(nil)
// Collect actual samples
for iterator.Next() != chunkenc.ValNone {
ts, value := iterator.At()
samples = append(samples, prompb.Sample{
Timestamp: ts,
Value: value,
})
}
require.NoError(t, iterator.Err())
got = append(got, &prompb.TimeSeries{
Labels: prompb.FromLabels(series.Labels(), nil),
Samples: samples,
})
}
require.NoError(t, result.Err())
require.ElementsMatch(t, tc.expectedResults, got)
})
}
}
func TestReadMultipleSorting(t *testing.T) {
// Test data with labels designed to test sorting behavior
// When sorted: aaa < bbb < ccc
// When unsorted: order depends on processing order
m := &mockedRemoteClient{
store: []*prompb.TimeSeries{
{Labels: []prompb.Label{{Name: "series", Value: "ccc"}}}, // Will be returned by query 1
{Labels: []prompb.Label{{Name: "series", Value: "aaa"}}}, // Will be returned by query 2
{Labels: []prompb.Label{{Name: "series", Value: "bbb"}}}, // Will be returned by both queries (overlapping)
},
b: labels.NewScratchBuilder(0),
}
testCases := []struct {
name string
queries []*prompb.Query
sortSeries bool
expectedOrder []string
}{
{
name: "multiple queries with sortSeries=true - should be sorted",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "ccc|bbb"}, // Returns: ccc, bbb (unsorted in store)
},
},
{
StartTimestampMs: 1500,
EndTimestampMs: 2500,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "aaa|bbb"}, // Returns: aaa, bbb (unsorted in store)
},
},
},
sortSeries: true,
expectedOrder: []string{"aaa", "bbb", "ccc"}, // Should be sorted after merge
},
{
name: "multiple queries with sortSeries=false - concatenates without deduplication",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 2000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "ccc|bbb"}, // Returns: ccc, bbb (unsorted)
},
},
{
StartTimestampMs: 1500,
EndTimestampMs: 2500,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "aaa|bbb"}, // Returns: aaa, bbb (unsorted in store)
},
},
},
sortSeries: false,
expectedOrder: []string{"ccc", "bbb", "aaa", "bbb"}, // Concatenated results - duplicates included
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
m.reset()
result, err := m.ReadMultiple(context.Background(), tc.queries, tc.sortSeries)
require.NoError(t, err)
// Collect the actual results
var actualOrder []string
for result.Next() {
series := result.At()
seriesValue := series.Labels().Get("series")
actualOrder = append(actualOrder, seriesValue)
}
require.NoError(t, result.Err())
// Verify the expected order matches actual order
// For sortSeries=true: results should be in sorted order
// For sortSeries=false: results should be in concatenated order (with duplicates)
testutil.RequireEqual(t, tc.expectedOrder, actualOrder)
})
}
}
func TestReadMultipleWithChunks(t *testing.T) {
tests := []struct {
name string
queries []*prompb.Query
responseType string
mockHandler func(*testing.T, []*prompb.Query) http.HandlerFunc
expectedSeriesCount int
validateSampleCounts []int // expected samples per series
}{
{
name: "multiple queries with chunked responses",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 5000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
},
},
{
StartTimestampMs: 6000,
EndTimestampMs: 10000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "node_exporter"},
},
},
},
responseType: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse",
mockHandler: createChunkedResponseHandler,
expectedSeriesCount: 6, // 3 chunks per query (2 queries * 3 series per query)
validateSampleCounts: []int{4, 5, 1, 4, 5, 1},
},
{
name: "sampled response multiple queries",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 3000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
},
},
{
StartTimestampMs: 4000,
EndTimestampMs: 6000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "node_exporter"},
},
},
},
responseType: "application/x-protobuf",
mockHandler: createSampledResponseHandler,
expectedSeriesCount: 4, // 2 series per query * 2 queries
validateSampleCounts: []int{2, 2, 2, 2},
},
{
name: "single query with multiple chunks",
queries: []*prompb.Query{
{
StartTimestampMs: 0,
EndTimestampMs: 15000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "__name__", Value: "cpu_usage"},
},
},
},
responseType: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse",
mockHandler: createChunkedResponseHandler,
expectedSeriesCount: 3,
validateSampleCounts: []int{5, 5, 5},
},
{
name: "overlapping series from multiple queries",
queries: []*prompb.Query{
{
StartTimestampMs: 1000,
EndTimestampMs: 5000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "__name__", Value: "up"},
},
},
{
StartTimestampMs: 3000,
EndTimestampMs: 7000,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "__name__", Value: "up"},
},
},
},
responseType: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse",
mockHandler: createOverlappingSeriesHandler,
expectedSeriesCount: 2, // Each query creates a separate series entry
validateSampleCounts: []int{4, 4}, // Actual samples returned by handler
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
server := httptest.NewServer(tc.mockHandler(t, tc.queries))
defer server.Close()
u, err := url.Parse(server.URL)
require.NoError(t, err)
cfg := &ClientConfig{
URL: &config_util.URL{URL: u},
Timeout: model.Duration(5 * time.Second),
ChunkedReadLimit: config.DefaultChunkedReadLimit,
}
client, err := NewReadClient("test", cfg)
require.NoError(t, err)
// Test ReadMultiple
result, err := client.ReadMultiple(context.Background(), tc.queries, false)
require.NoError(t, err)
// Collect all series and validate
var allSeries []storage.Series
var totalSamples int
for result.Next() {
series := result.At()
allSeries = append(allSeries, series)
// Verify we have some labels
require.Positive(t, series.Labels().Len())
// Count samples in this series
it := series.Iterator(nil)
var sampleCount int
for it.Next() != chunkenc.ValNone {
sampleCount++
}
require.NoError(t, it.Err())
totalSamples += sampleCount
require.Equalf(t, tc.validateSampleCounts[len(allSeries)-1], sampleCount, "Series %d sample count mismatch", len(allSeries))
}
require.NoError(t, result.Err())
// Validate total counts
require.Len(t, allSeries, tc.expectedSeriesCount, "Series count mismatch")
})
}
}
// createChunkedResponseHandler creates a mock handler for chunked responses.
func createChunkedResponseHandler(t *testing.T, queries []*prompb.Query) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
flusher, ok := w.(http.Flusher)
require.True(t, ok)
cw := NewChunkedWriter(w, flusher)
// For each query, simulate multiple chunks
for queryIndex := range queries {
chunks := buildTestChunks(t) // Creates 3 chunks with 5 samples each
for chunkIndex, chunk := range chunks {
// Create unique labels for each series in each query
var labels []prompb.Label
if queryIndex == 0 {
labels = []prompb.Label{
{Name: "job", Value: "prometheus"},
{Name: "instance", Value: fmt.Sprintf("localhost:%d", 9090+chunkIndex)},
}
} else {
labels = []prompb.Label{
{Name: "job", Value: "node_exporter"},
{Name: "instance", Value: fmt.Sprintf("localhost:%d", 9100+chunkIndex)},
}
}
cSeries := prompb.ChunkedSeries{
Labels: labels,
Chunks: []prompb.Chunk{chunk},
}
readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(queryIndex),
}
b, err := proto.Marshal(&readResp)
require.NoError(t, err)
_, err = cw.Write(b)
require.NoError(t, err)
}
}
})
}
// createSampledResponseHandler creates a mock handler for sampled responses.
func createSampledResponseHandler(t *testing.T, queries []*prompb.Query) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/x-protobuf")
var results []*prompb.QueryResult
for queryIndex, query := range queries {
var timeseries []*prompb.TimeSeries
// Create 2 series per query
for seriesIndex := 0; seriesIndex < 2; seriesIndex++ {
var labels []prompb.Label
if queryIndex == 0 {
labels = []prompb.Label{
{Name: "job", Value: "prometheus"},
{Name: "instance", Value: fmt.Sprintf("localhost:%d", 9090+seriesIndex)},
}
} else {
labels = []prompb.Label{
{Name: "job", Value: "node_exporter"},
{Name: "instance", Value: fmt.Sprintf("localhost:%d", 9100+seriesIndex)},
}
}
// Create 2 samples per series within query time range
samples := []prompb.Sample{
{Timestamp: query.StartTimestampMs, Value: float64(queryIndex*10 + seriesIndex)},
{Timestamp: query.EndTimestampMs, Value: float64(queryIndex*10 + seriesIndex + 1)},
}
timeseries = append(timeseries, &prompb.TimeSeries{
Labels: labels,
Samples: samples,
})
}
results = append(results, &prompb.QueryResult{Timeseries: timeseries})
}
resp := &prompb.ReadResponse{Results: results}
data, err := proto.Marshal(resp)
require.NoError(t, err)
compressed := snappy.Encode(nil, data)
_, err = w.Write(compressed)
require.NoError(t, err)
})
}
// createOverlappingSeriesHandler creates responses with same series from multiple queries.
func createOverlappingSeriesHandler(t *testing.T, queries []*prompb.Query) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
flusher, ok := w.(http.Flusher)
require.True(t, ok)
cw := NewChunkedWriter(w, flusher)
// Same series labels for both queries (will be merged)
commonLabels := []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "job", Value: "prometheus"},
}
// Send response for each query with the same series
for queryIndex := range queries {
chunk := buildTestChunks(t)[0] // Use first chunk with 5 samples
cSeries := prompb.ChunkedSeries{
Labels: commonLabels,
Chunks: []prompb.Chunk{chunk},
}
readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(queryIndex),
}
b, err := proto.Marshal(&readResp)
require.NoError(t, err)
_, err = cw.Write(b)
require.NoError(t, err)
}
})
}

View File

@ -194,9 +194,10 @@ func TestSeriesSetFilter(t *testing.T) {
}
type mockedRemoteClient struct {
got *prompb.Query
store []*prompb.TimeSeries
b labels.ScratchBuilder
got *prompb.Query
gotMultiple []*prompb.Query
store []*prompb.TimeSeries
b labels.ScratchBuilder
}
func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
@ -224,15 +225,69 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSe
}
}
if !notMatch {
q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels})
if notMatch {
continue
}
// Filter samples by query time range
var filteredSamples []prompb.Sample
for _, sample := range s.Samples {
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp <= query.EndTimestampMs {
filteredSamples = append(filteredSamples, sample)
}
}
q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels, Samples: filteredSamples})
}
return FromQueryResult(sortSeries, q), nil
}
func (c *mockedRemoteClient) ReadMultiple(_ context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
// Store the queries for verification
c.gotMultiple = make([]*prompb.Query, len(queries))
copy(c.gotMultiple, queries)
// Simulate the same behavior as the real client
var results []*prompb.QueryResult
for _, query := range queries {
matchers, err := FromLabelMatchers(query.Matchers)
if err != nil {
return nil, err
}
q := &prompb.QueryResult{}
for _, s := range c.store {
l := s.ToLabels(&c.b, nil)
var notMatch bool
for _, m := range matchers {
v := l.Get(m.Name)
if !m.Matches(v) {
notMatch = true
break
}
}
if notMatch {
continue
}
// Filter samples by query time range
var filteredSamples []prompb.Sample
for _, sample := range s.Samples {
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp <= query.EndTimestampMs {
filteredSamples = append(filteredSamples, sample)
}
}
q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels, Samples: filteredSamples})
}
results = append(results, q)
}
// Use the same logic as the real client
return combineQueryResults(results, sortSeries)
}
func (c *mockedRemoteClient) reset() {
c.got = nil
c.gotMultiple = nil
}
// NOTE: We don't need to test ChunkQuerier as it's uses querier for all operations anyway.