mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-06 22:27:17 +02:00
Merge 288c8ce273
into 25aee26a57
This commit is contained in:
commit
57ffa3e25a
@ -19,6 +19,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptrace"
|
"net/http/httptrace"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -112,8 +113,9 @@ type Client struct {
|
|||||||
Client *http.Client
|
Client *http.Client
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
|
||||||
retryOnRateLimit bool
|
retryOnRateLimit bool
|
||||||
chunkedReadLimit uint64
|
chunkedReadLimit uint64
|
||||||
|
acceptedResponseTypes []prompb.ReadRequest_ResponseType
|
||||||
|
|
||||||
readQueries prometheus.Gauge
|
readQueries prometheus.Gauge
|
||||||
readQueriesTotal *prometheus.CounterVec
|
readQueriesTotal *prometheus.CounterVec
|
||||||
@ -125,23 +127,25 @@ type Client struct {
|
|||||||
|
|
||||||
// ClientConfig configures a client.
|
// ClientConfig configures a client.
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
URL *config_util.URL
|
URL *config_util.URL
|
||||||
Timeout model.Duration
|
Timeout model.Duration
|
||||||
HTTPClientConfig config_util.HTTPClientConfig
|
HTTPClientConfig config_util.HTTPClientConfig
|
||||||
SigV4Config *sigv4.SigV4Config
|
SigV4Config *sigv4.SigV4Config
|
||||||
AzureADConfig *azuread.AzureADConfig
|
AzureADConfig *azuread.AzureADConfig
|
||||||
GoogleIAMConfig *googleiam.Config
|
GoogleIAMConfig *googleiam.Config
|
||||||
Headers map[string]string
|
Headers map[string]string
|
||||||
RetryOnRateLimit bool
|
RetryOnRateLimit bool
|
||||||
WriteProtoMsg config.RemoteWriteProtoMsg
|
WriteProtoMsg config.RemoteWriteProtoMsg
|
||||||
ChunkedReadLimit uint64
|
ChunkedReadLimit uint64
|
||||||
RoundRobinDNS bool
|
RoundRobinDNS bool
|
||||||
|
AcceptedResponseTypes []prompb.ReadRequest_ResponseType
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can
|
// ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can
|
||||||
// also fall back to the SAMPLES method if necessary.
|
// also fall back to the SAMPLES method if necessary.
|
||||||
type ReadClient interface {
|
type ReadClient interface {
|
||||||
Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error)
|
Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error)
|
||||||
|
ReadMultiple(ctx context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReadClient creates a new client for remote read.
|
// NewReadClient creates a new client for remote read.
|
||||||
@ -157,15 +161,22 @@ func NewReadClient(name string, conf *ClientConfig, optFuncs ...config_util.HTTP
|
|||||||
}
|
}
|
||||||
httpClient.Transport = otelhttp.NewTransport(t)
|
httpClient.Transport = otelhttp.NewTransport(t)
|
||||||
|
|
||||||
|
// Set accepted response types, default to existing behavior if not specified
|
||||||
|
acceptedResponseTypes := conf.AcceptedResponseTypes
|
||||||
|
if len(acceptedResponseTypes) == 0 {
|
||||||
|
acceptedResponseTypes = AcceptedResponseTypes
|
||||||
|
}
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
remoteName: name,
|
remoteName: name,
|
||||||
urlString: conf.URL.String(),
|
urlString: conf.URL.String(),
|
||||||
Client: httpClient,
|
Client: httpClient,
|
||||||
timeout: time.Duration(conf.Timeout),
|
timeout: time.Duration(conf.Timeout),
|
||||||
chunkedReadLimit: conf.ChunkedReadLimit,
|
chunkedReadLimit: conf.ChunkedReadLimit,
|
||||||
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
|
acceptedResponseTypes: acceptedResponseTypes,
|
||||||
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
|
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
|
||||||
readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
|
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
|
||||||
|
readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -337,27 +348,44 @@ func (c *Client) Endpoint() string {
|
|||||||
return c.urlString
|
return c.urlString
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a sampled response;
|
// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a samples response;
|
||||||
// chunked responses arrive already sorted by the server.
|
// chunked responses arrive already sorted by the server.
|
||||||
func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
||||||
|
return c.ReadMultiple(ctx, []*prompb.Query{query}, sortSeries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadMultiple reads from a remote endpoint using multiple queries in a single request.
|
||||||
|
// The sortSeries parameter is only respected in the case of a samples response;
|
||||||
|
// chunked responses arrive already sorted by the server.
|
||||||
|
// Returns a single SeriesSet with interleaved series from all queries.
|
||||||
|
func (c *Client) ReadMultiple(ctx context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
||||||
c.readQueries.Inc()
|
c.readQueries.Inc()
|
||||||
defer c.readQueries.Dec()
|
defer c.readQueries.Dec()
|
||||||
|
|
||||||
req := &prompb.ReadRequest{
|
req := &prompb.ReadRequest{
|
||||||
// TODO: Support batching multiple queries into one read request,
|
Queries: queries,
|
||||||
// as the protobuf interface allows for it.
|
AcceptedResponseTypes: c.acceptedResponseTypes,
|
||||||
Queries: []*prompb.Query{query},
|
|
||||||
AcceptedResponseTypes: AcceptedResponseTypes,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
httpResp, cancel, start, err := c.executeReadRequest(ctx, req, "Remote Read Multiple")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.handleReadResponse(httpResp, req, queries, sortSeries, start, cancel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// executeReadRequest creates and executes an HTTP request for reading data.
|
||||||
|
func (c *Client) executeReadRequest(ctx context.Context, req *prompb.ReadRequest, spanName string) (*http.Response, context.CancelFunc, time.Time, error) {
|
||||||
data, err := proto.Marshal(req)
|
data, err := proto.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to marshal read request: %w", err)
|
return nil, nil, time.Time{}, fmt.Errorf("unable to marshal read request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
compressed := snappy.Encode(nil, data)
|
compressed := snappy.Encode(nil, data)
|
||||||
httpReq, err := http.NewRequest(http.MethodPost, c.urlString, bytes.NewReader(compressed))
|
httpReq, err := http.NewRequest(http.MethodPost, c.urlString, bytes.NewReader(compressed))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to create request: %w", err)
|
return nil, nil, time.Time{}, fmt.Errorf("unable to create request: %w", err)
|
||||||
}
|
}
|
||||||
httpReq.Header.Add("Content-Encoding", "snappy")
|
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||||
httpReq.Header.Add("Accept-Encoding", "snappy")
|
httpReq.Header.Add("Accept-Encoding", "snappy")
|
||||||
@ -368,16 +396,21 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
|
|||||||
errTimeout := fmt.Errorf("%w: request timed out after %s", context.DeadlineExceeded, c.timeout)
|
errTimeout := fmt.Errorf("%w: request timed out after %s", context.DeadlineExceeded, c.timeout)
|
||||||
ctx, cancel := context.WithTimeoutCause(ctx, c.timeout, errTimeout)
|
ctx, cancel := context.WithTimeoutCause(ctx, c.timeout, errTimeout)
|
||||||
|
|
||||||
ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient))
|
ctx, span := otel.Tracer("").Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindClient))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
|
httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return nil, fmt.Errorf("error sending request: %w", err)
|
return nil, nil, time.Time{}, fmt.Errorf("error sending request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return httpResp, cancel, start, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleReadResponse processes the HTTP response and returns a SeriesSet.
|
||||||
|
func (c *Client) handleReadResponse(httpResp *http.Response, req *prompb.ReadRequest, queries []*prompb.Query, sortSeries bool, start time.Time, cancel context.CancelFunc) (storage.SeriesSet, error) {
|
||||||
if httpResp.StatusCode/100 != 2 {
|
if httpResp.StatusCode/100 != 2 {
|
||||||
// Make an attempt at getting an error message.
|
// Make an attempt at getting an error message.
|
||||||
body, _ := io.ReadAll(httpResp.Body)
|
body, _ := io.ReadAll(httpResp.Body)
|
||||||
@ -393,16 +426,16 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
|
|||||||
|
|
||||||
switch {
|
switch {
|
||||||
case strings.HasPrefix(contentType, "application/x-protobuf"):
|
case strings.HasPrefix(contentType, "application/x-protobuf"):
|
||||||
c.readQueriesDuration.WithLabelValues("sampled").Observe(time.Since(start).Seconds())
|
c.readQueriesDuration.WithLabelValues("samples").Observe(time.Since(start).Seconds())
|
||||||
c.readQueriesTotal.WithLabelValues("sampled", strconv.Itoa(httpResp.StatusCode)).Inc()
|
c.readQueriesTotal.WithLabelValues("samples", strconv.Itoa(httpResp.StatusCode)).Inc()
|
||||||
ss, err := c.handleSampledResponse(req, httpResp, sortSeries)
|
ss, err := c.handleSamplesResponseImpl(req, httpResp, sortSeries)
|
||||||
cancel()
|
cancel()
|
||||||
return ss, err
|
return ss, err
|
||||||
case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"):
|
case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"):
|
||||||
c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds())
|
c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil)
|
s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil)
|
||||||
return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) {
|
return c.handleChunkedResponseImpl(s, httpResp, queries, func(err error) {
|
||||||
code := strconv.Itoa(httpResp.StatusCode)
|
code := strconv.Itoa(httpResp.StatusCode)
|
||||||
if !errors.Is(err, io.EOF) {
|
if !errors.Is(err, io.EOF) {
|
||||||
code = "aborted_stream"
|
code = "aborted_stream"
|
||||||
@ -418,7 +451,8 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.Response, sortSeries bool) (storage.SeriesSet, error) {
|
// handleSamplesResponseImpl handles samples responses for both single and multiple queries.
|
||||||
|
func (c *Client) handleSamplesResponseImpl(req *prompb.ReadRequest, httpResp *http.Response, sortSeries bool) (storage.SeriesSet, error) {
|
||||||
compressed, err := io.ReadAll(httpResp.Body)
|
compressed, err := io.ReadAll(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err)
|
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err)
|
||||||
@ -443,8 +477,60 @@ func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.R
|
|||||||
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
|
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
|
||||||
}
|
}
|
||||||
|
|
||||||
// This client does not batch queries so there's always only 1 result.
|
return combineQueryResults(resp.Results, sortSeries)
|
||||||
res := resp.Results[0]
|
}
|
||||||
|
|
||||||
return FromQueryResult(sortSeries, res), nil
|
// combineQueryResults combines multiple query results into a single SeriesSet,
|
||||||
|
// handling both sorted and unsorted cases appropriately.
|
||||||
|
func combineQueryResults(results []*prompb.QueryResult, sortSeries bool) (storage.SeriesSet, error) {
|
||||||
|
if len(results) == 0 {
|
||||||
|
return &concreteSeriesSet{series: nil, cur: 0}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(results) == 1 {
|
||||||
|
return FromQueryResult(sortSeries, results[0]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multiple queries case - combine all results
|
||||||
|
if sortSeries {
|
||||||
|
// When sorting is requested, use MergeSeriesSet which can efficiently merge sorted inputs
|
||||||
|
var allSeriesSets []storage.SeriesSet
|
||||||
|
for _, result := range results {
|
||||||
|
seriesSet := FromQueryResult(sortSeries, result)
|
||||||
|
if err := seriesSet.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("error reading series from query result: %w", err)
|
||||||
|
}
|
||||||
|
allSeriesSets = append(allSeriesSets, seriesSet)
|
||||||
|
}
|
||||||
|
return storage.NewMergeSeriesSet(allSeriesSets, 0, storage.ChainedSeriesMerge), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// When sorting is not requested, just concatenate all series without using MergeSeriesSet
|
||||||
|
// since MergeSeriesSet requires sorted inputs
|
||||||
|
var allSeries []storage.Series
|
||||||
|
for _, result := range results {
|
||||||
|
seriesSet := FromQueryResult(sortSeries, result)
|
||||||
|
for seriesSet.Next() {
|
||||||
|
allSeries = append(allSeries, seriesSet.At())
|
||||||
|
}
|
||||||
|
if err := seriesSet.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("error reading series from query result: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &concreteSeriesSet{series: allSeries, cur: 0}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleChunkedResponseImpl handles chunked responses for both single and multiple queries.
|
||||||
|
func (c *Client) handleChunkedResponseImpl(s *ChunkedReader, httpResp *http.Response, queries []*prompb.Query, onClose func(error)) storage.SeriesSet {
|
||||||
|
// For multiple queries in chunked response, we'll still use the existing infrastructure
|
||||||
|
// but we need to provide the timestamp range that covers all queries
|
||||||
|
var minStartTs, maxEndTs int64 = math.MaxInt64, math.MinInt64
|
||||||
|
|
||||||
|
for _, query := range queries {
|
||||||
|
minStartTs = min(minStartTs, query.StartTimestampMs)
|
||||||
|
maxEndTs = max(maxEndTs, query.EndTimestampMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewChunkedSeriesSet(s, httpResp.Body, minStartTs, maxEndTs, onClose)
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/util/annotations"
|
"github.com/prometheus/prometheus/util/annotations"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
@ -194,9 +195,10 @@ func TestSeriesSetFilter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type mockedRemoteClient struct {
|
type mockedRemoteClient struct {
|
||||||
got *prompb.Query
|
got *prompb.Query
|
||||||
store []*prompb.TimeSeries
|
gotMultiple []*prompb.Query
|
||||||
b labels.ScratchBuilder
|
store []*prompb.TimeSeries
|
||||||
|
b labels.ScratchBuilder
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
||||||
@ -224,15 +226,69 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !notMatch {
|
if notMatch {
|
||||||
q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels})
|
continue
|
||||||
}
|
}
|
||||||
|
// Filter samples by query time range
|
||||||
|
var filteredSamples []prompb.Sample
|
||||||
|
for _, sample := range s.Samples {
|
||||||
|
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp <= query.EndTimestampMs {
|
||||||
|
filteredSamples = append(filteredSamples, sample)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels, Samples: filteredSamples})
|
||||||
}
|
}
|
||||||
return FromQueryResult(sortSeries, q), nil
|
return FromQueryResult(sortSeries, q), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *mockedRemoteClient) ReadMultiple(_ context.Context, queries []*prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
||||||
|
// Store the queries for verification
|
||||||
|
c.gotMultiple = make([]*prompb.Query, len(queries))
|
||||||
|
copy(c.gotMultiple, queries)
|
||||||
|
|
||||||
|
// Simulate the same behavior as the real client
|
||||||
|
var results []*prompb.QueryResult
|
||||||
|
for _, query := range queries {
|
||||||
|
matchers, err := FromLabelMatchers(query.Matchers)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
q := &prompb.QueryResult{}
|
||||||
|
for _, s := range c.store {
|
||||||
|
l := s.ToLabels(&c.b, nil)
|
||||||
|
var notMatch bool
|
||||||
|
|
||||||
|
for _, m := range matchers {
|
||||||
|
v := l.Get(m.Name)
|
||||||
|
if !m.Matches(v) {
|
||||||
|
notMatch = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if notMatch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Filter samples by query time range
|
||||||
|
var filteredSamples []prompb.Sample
|
||||||
|
for _, sample := range s.Samples {
|
||||||
|
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp <= query.EndTimestampMs {
|
||||||
|
filteredSamples = append(filteredSamples, sample)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels, Samples: filteredSamples})
|
||||||
|
}
|
||||||
|
results = append(results, q)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the same logic as the real client
|
||||||
|
return combineQueryResults(results, sortSeries)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *mockedRemoteClient) reset() {
|
func (c *mockedRemoteClient) reset() {
|
||||||
c.got = nil
|
c.got = nil
|
||||||
|
c.gotMultiple = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: We don't need to test ChunkQuerier as it's uses querier for all operations anyway.
|
// NOTE: We don't need to test ChunkQuerier as it's uses querier for all operations anyway.
|
||||||
@ -494,3 +550,336 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadMultiple(t *testing.T) {
|
||||||
|
const sampleIntervalMs = 250
|
||||||
|
|
||||||
|
// Helper function to calculate series multiplier based on labels
|
||||||
|
getSeriesMultiplier := func(labels []prompb.Label) uint64 {
|
||||||
|
// Create a simple hash from labels to generate unique values per series
|
||||||
|
labelHash := uint64(0)
|
||||||
|
for _, label := range labels {
|
||||||
|
for _, b := range label.Name + label.Value {
|
||||||
|
labelHash = labelHash*31 + uint64(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return labelHash % sampleIntervalMs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to generate a complete time series with samples at 250ms intervals
|
||||||
|
// Each series gets different sample values based on a hash of their labels
|
||||||
|
generateSeries := func(labels []prompb.Label, startMs, endMs int64) *prompb.TimeSeries {
|
||||||
|
seriesMultiplier := getSeriesMultiplier(labels)
|
||||||
|
|
||||||
|
var samples []prompb.Sample
|
||||||
|
for ts := startMs; ts <= endMs; ts += sampleIntervalMs {
|
||||||
|
samples = append(samples, prompb.Sample{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: float64(ts + int64(seriesMultiplier)), // Unique value per series
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return &prompb.TimeSeries{
|
||||||
|
Labels: labels,
|
||||||
|
Samples: samples,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m := &mockedRemoteClient{
|
||||||
|
store: []*prompb.TimeSeries{
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 0, 10000),
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 0, 10000),
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 0, 10000),
|
||||||
|
generateSeries([]prompb.Label{{Name: "instance", Value: "localhost:9090"}}, 0, 10000),
|
||||||
|
},
|
||||||
|
b: labels.NewScratchBuilder(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queries []*prompb.Query
|
||||||
|
expectedResults []*prompb.TimeSeries
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "single query",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedResults: []*prompb.TimeSeries{
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple queries - different matchers",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1500,
|
||||||
|
EndTimestampMs: 2500,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "node_exporter"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedResults: []*prompb.TimeSeries{
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 1500, 2500),
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple queries - overlapping results",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_RE, Name: "job", Value: "prometheus|node_exporter"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1500,
|
||||||
|
EndTimestampMs: 2500,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedResults: []*prompb.TimeSeries{
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 1500, 2500),
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "node_exporter"}}, 1000, 2000),
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with no results",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "nonexistent"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedResults: nil, // empty result
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty query list",
|
||||||
|
queries: []*prompb.Query{},
|
||||||
|
expectedResults: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "three queries with mixed results",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "prometheus"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1500,
|
||||||
|
EndTimestampMs: 2500,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "job", Value: "nonexistent"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTimestampMs: 2000,
|
||||||
|
EndTimestampMs: 3000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "instance", Value: "localhost:9090"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedResults: []*prompb.TimeSeries{
|
||||||
|
generateSeries([]prompb.Label{{Name: "instance", Value: "localhost:9090"}}, 2000, 3000),
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "prometheus"}}, 1000, 2000),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "same matchers with overlapping time ranges",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 5000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTimestampMs: 3000,
|
||||||
|
EndTimestampMs: 8000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedResults: []*prompb.TimeSeries{
|
||||||
|
generateSeries([]prompb.Label{{Name: "job", Value: "cadvisor"}, {Name: "region", Value: "us"}}, 1000, 8000),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
m.reset()
|
||||||
|
|
||||||
|
result, err := m.ReadMultiple(context.Background(), tc.queries, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify the queries were stored correctly
|
||||||
|
require.Equal(t, tc.queries, m.gotMultiple)
|
||||||
|
|
||||||
|
// Verify the combined result matches expected
|
||||||
|
var got []*prompb.TimeSeries
|
||||||
|
for result.Next() {
|
||||||
|
series := result.At()
|
||||||
|
var samples []prompb.Sample
|
||||||
|
iterator := series.Iterator(nil)
|
||||||
|
|
||||||
|
// Collect actual samples
|
||||||
|
for iterator.Next() != chunkenc.ValNone {
|
||||||
|
ts, value := iterator.At()
|
||||||
|
samples = append(samples, prompb.Sample{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, iterator.Err())
|
||||||
|
|
||||||
|
got = append(got, &prompb.TimeSeries{
|
||||||
|
Labels: prompb.FromLabels(series.Labels(), nil),
|
||||||
|
Samples: samples,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, result.Err())
|
||||||
|
|
||||||
|
require.ElementsMatch(t, tc.expectedResults, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadMultipleErrorHandling(t *testing.T) {
|
||||||
|
m := &mockedRemoteClient{
|
||||||
|
store: []*prompb.TimeSeries{
|
||||||
|
{Labels: []prompb.Label{{Name: "job", Value: "prometheus"}}},
|
||||||
|
},
|
||||||
|
b: labels.NewScratchBuilder(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with invalid matcher - should return error
|
||||||
|
queries := []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_Type(999), Name: "job", Value: "prometheus"}, // invalid matcher type
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := m.ReadMultiple(context.Background(), queries, true)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadMultipleSorting(t *testing.T) {
|
||||||
|
// Test data with labels designed to test sorting behavior
|
||||||
|
// When sorted: aaa < bbb < ccc
|
||||||
|
// When unsorted: order depends on processing order
|
||||||
|
m := &mockedRemoteClient{
|
||||||
|
store: []*prompb.TimeSeries{
|
||||||
|
{Labels: []prompb.Label{{Name: "series", Value: "ccc"}}}, // Will be returned by query 1
|
||||||
|
{Labels: []prompb.Label{{Name: "series", Value: "aaa"}}}, // Will be returned by query 2
|
||||||
|
{Labels: []prompb.Label{{Name: "series", Value: "bbb"}}}, // Will be returned by both queries (overlapping)
|
||||||
|
},
|
||||||
|
b: labels.NewScratchBuilder(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queries []*prompb.Query
|
||||||
|
sortSeries bool
|
||||||
|
expectedOrder []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "multiple queries with sortSeries=true - should be sorted",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "ccc|bbb"}, // Returns: ccc, bbb (unsorted in store)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1500,
|
||||||
|
EndTimestampMs: 2500,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "aaa|bbb"}, // Returns: aaa, bbb (unsorted in store)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
sortSeries: true,
|
||||||
|
expectedOrder: []string{"aaa", "bbb", "ccc"}, // Should be sorted after merge
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple queries with sortSeries=false - concatenates without deduplication",
|
||||||
|
queries: []*prompb.Query{
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1000,
|
||||||
|
EndTimestampMs: 2000,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "ccc|bbb"}, // Returns: ccc, bbb (unsorted)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTimestampMs: 1500,
|
||||||
|
EndTimestampMs: 2500,
|
||||||
|
Matchers: []*prompb.LabelMatcher{
|
||||||
|
{Type: prompb.LabelMatcher_RE, Name: "series", Value: "aaa|bbb"}, // Returns: aaa, bbb (unsorted)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
sortSeries: false,
|
||||||
|
expectedOrder: []string{"ccc", "bbb", "aaa", "bbb"}, // Concatenated results - duplicates included
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
m.reset()
|
||||||
|
|
||||||
|
result, err := m.ReadMultiple(context.Background(), tc.queries, tc.sortSeries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Collect the actual results
|
||||||
|
var actualOrder []string
|
||||||
|
for result.Next() {
|
||||||
|
series := result.At()
|
||||||
|
seriesValue := series.Labels().Get("series")
|
||||||
|
actualOrder = append(actualOrder, seriesValue)
|
||||||
|
}
|
||||||
|
require.NoError(t, result.Err())
|
||||||
|
|
||||||
|
// Verify the expected order matches actual order
|
||||||
|
// For sortSeries=true: results should be in sorted order
|
||||||
|
// For sortSeries=false: results should be in concatenated order (with duplicates)
|
||||||
|
testutil.RequireEqual(t, tc.expectedOrder, actualOrder)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user