From 7cf585527fbceffab97f74007d8f9a61486878f1 Mon Sep 17 00:00:00 2001 From: Darkknight Date: Fri, 22 Aug 2025 23:03:52 +0530 Subject: [PATCH] remote_write: add metric for unexpected metadata in populateV2TimeSeries (#17034) add metric to track unexpected metadata seen in populateV2TimeSeries, which would indicate metadata incorrectly routed in queue_manager code paths --------- Signed-off-by: leegin Signed-off-by: Darkknight --- storage/remote/queue_manager.go | 82 ++++++++++++++++------------ storage/remote/queue_manager_test.go | 22 ++++++++ 2 files changed, 69 insertions(+), 35 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index db602b8dc3..dd5b9c2a31 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -64,35 +64,36 @@ const ( type queueManagerMetrics struct { reg prometheus.Registerer - samplesTotal prometheus.Counter - exemplarsTotal prometheus.Counter - histogramsTotal prometheus.Counter - metadataTotal prometheus.Counter - failedSamplesTotal prometheus.Counter - failedExemplarsTotal prometheus.Counter - failedHistogramsTotal prometheus.Counter - failedMetadataTotal prometheus.Counter - retriedSamplesTotal prometheus.Counter - retriedExemplarsTotal prometheus.Counter - retriedHistogramsTotal prometheus.Counter - retriedMetadataTotal prometheus.Counter - droppedSamplesTotal *prometheus.CounterVec - droppedExemplarsTotal *prometheus.CounterVec - droppedHistogramsTotal *prometheus.CounterVec - enqueueRetriesTotal prometheus.Counter - sentBatchDuration prometheus.Histogram - highestSentTimestamp *maxTimestamp - pendingSamples prometheus.Gauge - pendingExemplars prometheus.Gauge - pendingHistograms prometheus.Gauge - shardCapacity prometheus.Gauge - numShards prometheus.Gauge - maxNumShards prometheus.Gauge - minNumShards prometheus.Gauge - desiredNumShards prometheus.Gauge - sentBytesTotal prometheus.Counter - metadataBytesTotal prometheus.Counter - maxSamplesPerSend prometheus.Gauge + samplesTotal prometheus.Counter + exemplarsTotal prometheus.Counter + histogramsTotal prometheus.Counter + metadataTotal prometheus.Counter + failedSamplesTotal prometheus.Counter + failedExemplarsTotal prometheus.Counter + failedHistogramsTotal prometheus.Counter + failedMetadataTotal prometheus.Counter + retriedSamplesTotal prometheus.Counter + retriedExemplarsTotal prometheus.Counter + retriedHistogramsTotal prometheus.Counter + retriedMetadataTotal prometheus.Counter + droppedSamplesTotal *prometheus.CounterVec + droppedExemplarsTotal *prometheus.CounterVec + droppedHistogramsTotal *prometheus.CounterVec + enqueueRetriesTotal prometheus.Counter + sentBatchDuration prometheus.Histogram + highestSentTimestamp *maxTimestamp + pendingSamples prometheus.Gauge + pendingExemplars prometheus.Gauge + pendingHistograms prometheus.Gauge + shardCapacity prometheus.Gauge + numShards prometheus.Gauge + maxNumShards prometheus.Gauge + minNumShards prometheus.Gauge + desiredNumShards prometheus.Gauge + sentBytesTotal prometheus.Counter + metadataBytesTotal prometheus.Counter + maxSamplesPerSend prometheus.Gauge + unexpectedMetadataTotal prometheus.Counter } func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics { @@ -313,6 +314,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "The maximum number of samples to be sent, in a single request, to the remote storage. Note that, when sending of exemplars over remote write is enabled, exemplars count towards this limt.", ConstLabels: constLabels, }) + m.unexpectedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "unexpected_metadata_total", + Help: "Total number of unexpected metadata entries in populateV2TimeSeries indicating routing bugs.", + ConstLabels: constLabels, + }) return m } @@ -349,6 +357,7 @@ func (m *queueManagerMetrics) register() { m.sentBytesTotal, m.metadataBytesTotal, m.maxSamplesPerSend, + m.unexpectedMetadataTotal, ) } } @@ -384,6 +393,7 @@ func (m *queueManagerMetrics) unregister() { m.reg.Unregister(m.sentBytesTotal) m.reg.Unregister(m.metadataBytesTotal) m.reg.Unregister(m.maxSamplesPerSend) + m.reg.Unregister(m.unexpectedMetadataTotal) } } @@ -1543,8 +1553,11 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } _ = s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, encBuf, compr) case config.RemoteWriteProtoMsgV2: - nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms) + nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata := populateV2TimeSeries(&symbolTable, batch, pendingDataV2, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms + if nUnexpectedMetadata > 0 { + s.qm.metrics.unexpectedMetadataTotal.Add(float64(nUnexpectedMetadata)) + } _ = s.sendV2Samples(ctx, pendingDataV2[:n], symbolTable.Symbols(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, encBuf, compr) symbolTable.Reset() } @@ -1911,8 +1924,8 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 return accumulatedStats, err } -func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) { - var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int +func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int, int) { + var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] if d.metadata != nil { @@ -1960,11 +1973,10 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromFloatHistogram(d.timestamp, d.floatHistogram)) nPendingHistograms++ case tMetadata: - // TODO: log or return an error? - // we shouldn't receive metadata type data here, it should already be inserted into the timeSeries + nUnexpectedMetadata++ } } - return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata + return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, nUnexpectedMetadata } func (t *QueueManager) sendWriteRequestWithBackoff(ctx context.Context, attempt func(int) error, onRetry func()) error { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index a02d3dd3b0..5ac2d14c94 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -2331,3 +2331,25 @@ func BenchmarkBuildTimeSeries(b *testing.B) { require.NotNil(b, result) } } + +func TestPopulateV2TimeSeries_UnexpectedMetadata(t *testing.T) { + symbolTable := writev2.NewSymbolTable() + pendingData := make([]writev2.TimeSeries, 4) + + batch := []timeSeries{ + {sType: tSample, seriesLabels: labels.FromStrings("__name__", "metric1")}, + {sType: tMetadata, seriesLabels: labels.FromStrings("__name__", "metric2")}, + {sType: tSample, seriesLabels: labels.FromStrings("__name__", "metric3")}, + {sType: tMetadata, seriesLabels: labels.FromStrings("__name__", "metric4")}, + } + + nSamples, nExemplars, nHistograms, nMetadata, nUnexpected := populateV2TimeSeries( + &symbolTable, batch, pendingData, false, false, + ) + + require.Equal(t, 2, nSamples, "Should count 2 samples") + require.Equal(t, 0, nExemplars, "Should count 0 exemplars") + require.Equal(t, 0, nHistograms, "Should count 0 histograms") + require.Equal(t, 0, nMetadata, "Should count 0 processed metadata") + require.Equal(t, 2, nUnexpected, "Should count 2 unexpected metadata") +}