Merge pull request #16721 from AxcelXander/fix-issue-14414-metadata-test

remote: Add metadata validation to TestSampleDelivery for v2 protocol
This commit is contained in:
AxcelXander 2025-07-29 06:18:21 -07:00 committed by GitHub
parent 1ada3ced5a
commit a85618854a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -291,6 +291,9 @@ func TestSampleDelivery(t *testing.T) {
c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
if tc.protoMsg == config.RemoteWriteProtoMsgV2 && len(metadata) > 0 {
c.expectMetadataForBatch(metadata, series, samples[:len(samples)/2], exemplars[:len(exemplars)/2], histograms[:len(histograms)/2], floatHistograms[:len(floatHistograms)/2])
}
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
@ -961,6 +964,7 @@ type TestWriteClient struct {
expectedHistograms map[string][]prompb.Histogram
expectedFloatHistograms map[string][]prompb.Histogram
receivedMetadata map[string][]prompb.MetricMetadata
expectedMetadata map[string][]prompb.MetricMetadata
writesReceived int
mtx sync.Mutex
protoMsg config.RemoteWriteProtoMsg
@ -979,6 +983,7 @@ func NewTestWriteClient(protoMsg config.RemoteWriteProtoMsg) *TestWriteClient {
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
expectedMetadata: map[string][]prompb.MetricMetadata{},
protoMsg: protoMsg,
storeWait: 0,
returnError: nil,
@ -1051,6 +1056,43 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
}
}
func (c *TestWriteClient) expectMetadataForBatch(metadata []record.RefMetadata, series []record.RefSeries, samples []record.RefSample, exemplars []record.RefExemplar, histograms []record.RefHistogramSample, floatHistograms []record.RefFloatHistogramSample) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedMetadata = map[string][]prompb.MetricMetadata{}
c.receivedMetadata = map[string][]prompb.MetricMetadata{}
// Collect refs that have data in this batch.
refsWithData := make(map[chunks.HeadSeriesRef]struct{})
for _, s := range samples {
refsWithData[s.Ref] = struct{}{}
}
for _, e := range exemplars {
refsWithData[e.Ref] = struct{}{}
}
for _, h := range histograms {
refsWithData[h.Ref] = struct{}{}
}
for _, fh := range floatHistograms {
refsWithData[fh.Ref] = struct{}{}
}
// Only expect metadata for series that have data in this batch.
for _, m := range metadata {
if _, ok := refsWithData[m.Ref]; !ok {
continue
}
tsID := getSeriesIDFromRef(series[m.Ref])
c.expectedMetadata[tsID] = append(c.expectedMetadata[tsID], prompb.MetricMetadata{
MetricFamilyName: tsID,
Type: prompb.FromMetadataType(record.ToMetricType(m.Type)),
Help: m.Help,
Unit: m.Unit,
})
}
}
func deepLen[M any](ms ...map[string][]M) int {
l := 0
for _, m := range ms {
@ -1068,12 +1110,17 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Durati
defer cancel()
if err := runutil.Retry(500*time.Millisecond, ctx.Done(), func() error {
c.mtx.Lock()
exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms)
got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms)
exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms) + len(c.expectedMetadata)
got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms) + func() int {
if len(c.receivedMetadata) == 0 {
return 0
}
return len(c.expectedMetadata) // Count unique series that have metadata.
}()
c.mtx.Unlock()
if got < exp {
return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms, got %v", exp, got)
return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms/metadata, got %v", exp, got)
}
return nil
}); err != nil {
@ -1095,6 +1142,12 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Durati
for ts, expectedFloatHistogram := range c.expectedFloatHistograms {
require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts)
}
for ts, expectedMetadata := range c.expectedMetadata {
require.NotEmpty(tb, c.receivedMetadata[ts], "No metadata received for series %s", ts)
// For metadata, we only check that we got at least one entry with the right content
// since v2 protocol sends metadata with each data point
require.Equal(tb, expectedMetadata[0], c.receivedMetadata[ts][0], ts)
}
}
func (c *TestWriteClient) SetStoreWait(w time.Duration) {
@ -1193,7 +1246,7 @@ func (c *TestWriteClient) Endpoint() string {
func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(v2Req.Timeseries)),
// TODO handle metadata?
Metadata: []prompb.MetricMetadata{},
}
b := labels.NewScratchBuilder(0)
for i, rts := range v2Req.Timeseries {
@ -1231,6 +1284,21 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro
}
req.Timeseries[i].Histograms[j] = prompb.FromIntHistogram(h.Timestamp, h.ToIntHistogram())
}
// Convert v2 metadata to v1 format.
if rts.Metadata.Type != writev2.Metadata_METRIC_TYPE_UNSPECIFIED {
labels := rts.ToLabels(&b, v2Req.Symbols)
metadata := rts.ToMetadata(v2Req.Symbols)
metricFamilyName := labels.String()
req.Metadata = append(req.Metadata, prompb.MetricMetadata{
MetricFamilyName: metricFamilyName,
Type: prompb.FromMetadataType(metadata.Type),
Help: metadata.Help,
Unit: metadata.Unit,
})
}
}
return req, nil
}