From 184c7eb9186aa8fea09920f2f8e8aa8a603da300 Mon Sep 17 00:00:00 2001 From: machine424 Date: Wed, 20 Aug 2025 23:08:47 +0200 Subject: [PATCH] storage/remote: compute highestTimestamp and dataIn at QueueManager level Because of relabelling, an endpoint can only select a subset of series that go through WriteStorage Having a highestTimestamp at WriteStorage level yields wrong values if the corresponding sample won't even make it to a remote queue. Currently PrometheusRemoteWriteBehind is based on that, and would fire if an endpoint is only interested in a subset of series that take time to appear. A "prometheus_remote_storage_queue_highest_timestamp_seconds" that only takes into account samples in the queue is introduced, and used in PrometheusRemoteWriteBehind and dashboards in documentation/prometheus-mixin Same applies to samplesIn/dataIn, QueueManager should know more about when to update those; when data is enqueued. That makes dataDropped unnecessary, thus help simplify the logic in QueueManager.calculateDesiredShards() Signed-off-by: machine424 --- cmd/prometheus/main_test.go | 84 +++++++++++++++++++ .../prometheus-mixin/alerts.libsonnet | 4 +- .../prometheus-mixin/dashboards.libsonnet | 10 +-- storage/remote/queue_manager.go | 52 ++++++------ storage/remote/queue_manager_test.go | 50 ++++++----- storage/remote/write.go | 23 ----- 6 files changed, 149 insertions(+), 74 deletions(-) diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index e37e012e0c..c0900439e7 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -21,6 +21,7 @@ import ( "io" "math" "net/http" + "net/http/httptest" "os" "os/exec" "path/filepath" @@ -881,3 +882,86 @@ scrape_configs: }) } } + +// This test verifies that metrics for the highest timestamps per queue account for relabelling. +// See: https://github.com/prometheus/prometheus/pull/17065. +func TestRemoteWrite_PerQueueMetricsAfterRelabeling(t *testing.T) { + t.Parallel() + + tmpDir := t.TempDir() + configFile := filepath.Join(tmpDir, "prometheus.yml") + + port := testutil.RandomUnprivilegedPort(t) + targetPort := testutil.RandomUnprivilegedPort(t) + + server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + panic("should never be reached") + })) + t.Cleanup(server.Close) + + // Simulate a remote write relabeling that doesn't yield any series. + config := fmt.Sprintf(` +global: + scrape_interval: 1s +scrape_configs: + - job_name: 'self' + static_configs: + - targets: ['localhost:%d'] + - job_name: 'target' + static_configs: + - targets: ['localhost:%d'] + +remote_write: + - url: %s + write_relabel_configs: + - source_labels: [job,__name__] + regex: 'target,special_metric' + action: keep +`, port, targetPort, server.URL) + require.NoError(t, os.WriteFile(configFile, []byte(config), 0o777)) + + prom := prometheusCommandWithLogging( + t, + configFile, + port, + fmt.Sprintf("--storage.tsdb.path=%s", tmpDir), + ) + require.NoError(t, prom.Start()) + + require.Eventually(t, func() bool { + r, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port)) + if err != nil { + return false + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + return false + } + + metrics, err := io.ReadAll(r.Body) + if err != nil { + return false + } + + gHighestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_highest_timestamp_in_seconds") + // The highest timestamp at storage level sees all samples, it should also consider the ones that are filtered out by relabeling. + if err != nil || gHighestTimestamp == 0 { + return false + } + + // The queue shouldn't see and send any sample, all samples are dropped due to relabeling, the metrics should reflect that. + droppedSamples, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeCounter, "prometheus_remote_storage_samples_dropped_total") + if err != nil || droppedSamples == 0 { + return false + } + + highestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_timestamp_seconds") + require.NoError(t, err) + require.Zero(t, highestTimestamp) + + highestSentTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_sent_timestamp_seconds") + require.NoError(t, err) + require.Zero(t, highestSentTimestamp) + return true + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/documentation/prometheus-mixin/alerts.libsonnet b/documentation/prometheus-mixin/alerts.libsonnet index 9a6de90d82..9f72b40243 100644 --- a/documentation/prometheus-mixin/alerts.libsonnet +++ b/documentation/prometheus-mixin/alerts.libsonnet @@ -212,8 +212,8 @@ # Without max_over_time, failed scrapes could create false negatives, see # https://www.robustperception.io/alerting-on-gauges-in-prometheus-2-0 for details. ( - max_over_time(prometheus_remote_storage_highest_timestamp_in_seconds{%(prometheusSelector)s}[5m]) - - ignoring(remote_name, url) group_right + max_over_time(prometheus_remote_storage_queue_highest_timestamp_seconds{%(prometheusSelector)s}[5m]) + - max_over_time(prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(prometheusSelector)s}[5m]) ) > 120 diff --git a/documentation/prometheus-mixin/dashboards.libsonnet b/documentation/prometheus-mixin/dashboards.libsonnet index adf5da5c12..c947aca3cd 100644 --- a/documentation/prometheus-mixin/dashboards.libsonnet +++ b/documentation/prometheus-mixin/dashboards.libsonnet @@ -527,7 +527,7 @@ local row = panel.row; ; local timestampComparison = - panel.timeSeries.new('Highest Timestamp In vs. Highest Timestamp Sent') + panel.timeSeries.new('Highest Enqueued Timestamp vs. Highest Timestamp Sent') + panelTimeSeriesStdOptions + panel.timeSeries.standardOptions.withUnit('short') + panel.timeSeries.queryOptions.withTargets([ @@ -535,9 +535,9 @@ local row = panel.row; '$datasource', ||| ( - prometheus_remote_storage_highest_timestamp_in_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance"} + prometheus_remote_storage_queue_highest_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"} - - ignoring(remote_name, url) group_right(instance) (prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"} != 0) + prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"} ) ||| % $._config ) @@ -555,9 +555,9 @@ local row = panel.row; '$datasource', ||| clamp_min( - rate(prometheus_remote_storage_highest_timestamp_in_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance"}[5m]) + rate(prometheus_remote_storage_queue_highest_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}[5m]) - - ignoring (remote_name, url) group_right(instance) rate(prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}[5m]) + rate(prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}[5m]) , 0) ||| % $._config ) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index dccbbd4ab7..c3321107cc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -82,6 +82,7 @@ type queueManagerMetrics struct { droppedHistogramsTotal *prometheus.CounterVec enqueueRetriesTotal prometheus.Counter sentBatchDuration prometheus.Histogram + highestTimestamp *maxTimestamp highestSentTimestamp *maxTimestamp pendingSamples prometheus.Gauge pendingExemplars prometheus.Gauge @@ -228,12 +229,21 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, }) + m.highestTimestamp = &maxTimestamp{ + Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_highest_timestamp_seconds", + Help: "Highest timestamp that was enqueued, in seconds since epoch. Initialized to 0 when no data has been received yet.", + ConstLabels: constLabels, + }), + } m.highestSentTimestamp = &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_highest_sent_timestamp_seconds", - Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.", + Help: "Highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.", ConstLabels: constLabels, }), } @@ -338,6 +348,7 @@ func (m *queueManagerMetrics) register() { m.droppedHistogramsTotal, m.enqueueRetriesTotal, m.sentBatchDuration, + m.highestTimestamp, m.highestSentTimestamp, m.pendingSamples, m.pendingExemplars, @@ -373,6 +384,7 @@ func (m *queueManagerMetrics) unregister() { m.reg.Unregister(m.droppedHistogramsTotal) m.reg.Unregister(m.enqueueRetriesTotal) m.reg.Unregister(m.sentBatchDuration) + m.reg.Unregister(m.highestTimestamp) m.reg.Unregister(m.highestSentTimestamp) m.reg.Unregister(m.pendingSamples) m.reg.Unregister(m.pendingExemplars) @@ -440,11 +452,10 @@ type QueueManager struct { quit chan struct{} wg sync.WaitGroup - dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate + dataIn, dataOut, dataOutDuration *ewmaRate - metrics *queueManagerMetrics - interner *pool - highestRecvTimestamp *maxTimestamp + metrics *queueManagerMetrics + interner *pool } // NewQueueManager builds a new QueueManager and starts a new @@ -458,7 +469,6 @@ func NewQueueManager( readerMetrics *wlog.LiveReaderMetrics, logger *slog.Logger, dir string, - samplesIn *ewmaRate, cfg config.QueueConfig, mCfg config.MetadataConfig, externalLabels labels.Labels, @@ -466,7 +476,6 @@ func NewQueueManager( client WriteClient, flushDeadline time.Duration, interner *pool, - highestRecvTimestamp *maxTimestamp, sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, @@ -506,14 +515,12 @@ func NewQueueManager( reshardChan: make(chan int), quit: make(chan struct{}), - dataIn: samplesIn, - dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration), + dataIn: newEWMARate(ewmaWeight, shardUpdateDuration), dataOut: newEWMARate(ewmaWeight, shardUpdateDuration), dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), - metrics: metrics, - interner: interner, - highestRecvTimestamp: highestRecvTimestamp, + metrics: metrics, + interner: interner, protoMsg: protoMsg, compr: compression.Snappy, // Hardcoded for now, but scaffolding exists for likely future use. @@ -703,7 +710,6 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { - t.dataDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { t.logger.Info("Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -765,8 +771,6 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[e.Ref] if !ok { - // Track dropped exemplars in the same EWMA for sharding calc. - t.dataDropped.incr(1) if _, ok := t.droppedSeries[e.Ref]; !ok { t.logger.Info("Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref) t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -822,7 +826,6 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { - t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -877,7 +880,6 @@ outer: t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { - t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() @@ -1107,8 +1109,8 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool { // outlined in this functions implementation. It is up to the caller to reshard, or not, // based on the return value. func (t *QueueManager) calculateDesiredShards() int { + t.dataIn.tick() t.dataOut.tick() - t.dataDropped.tick() t.dataOutDuration.tick() // We use the number of incoming samples as a prediction of how much work we @@ -1118,13 +1120,12 @@ func (t *QueueManager) calculateDesiredShards() int { var ( dataInRate = t.dataIn.rate() dataOutRate = t.dataOut.rate() - dataKeptRatio = dataOutRate / (t.dataDropped.rate() + dataOutRate) dataOutDuration = t.dataOutDuration.rate() / float64(time.Second) - dataPendingRate = dataInRate*dataKeptRatio - dataOutRate + dataPendingRate = dataInRate - dataOutRate highestSent = t.metrics.highestSentTimestamp.Get() - highestRecv = t.highestRecvTimestamp.Get() + highestRecv = t.metrics.highestTimestamp.Get() delay = highestRecv - highestSent - dataPending = delay * dataInRate * dataKeptRatio + dataPending = delay * dataInRate ) if dataOutRate <= 0 { @@ -1136,13 +1137,12 @@ func (t *QueueManager) calculateDesiredShards() int { backlogCatchup = 0.05 * dataPending // Calculate Time to send one sample, averaged across all sends done this tick. timePerSample = dataOutDuration / dataOutRate - desiredShards = timePerSample * (dataInRate*dataKeptRatio + backlogCatchup) + desiredShards = timePerSample * (dataInRate + backlogCatchup) ) t.metrics.desiredNumShards.Set(desiredShards) t.logger.Debug("QueueManager.calculateDesiredShards", "dataInRate", dataInRate, "dataOutRate", dataOutRate, - "dataKeptRatio", dataKeptRatio, "dataPendingRate", dataPendingRate, "dataPending", dataPending, "dataOutDuration", dataOutDuration, @@ -1331,7 +1331,11 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool { case tHistogram, tFloatHistogram: s.qm.metrics.pendingHistograms.Inc() s.enqueuedHistograms.Inc() + default: + return true } + s.qm.metrics.highestTimestamp.Set(float64(data.timestamp / 1000)) + s.qm.dataIn.incr(1) return true } } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 58c7840821..f1cc399e53 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -53,17 +53,6 @@ import ( const defaultFlushDeadline = 1 * time.Minute -func newHighestTimestampMetric() *maxTimestamp { - return &maxTimestamp{ - Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "highest_timestamp_in_seconds", - Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet", - }), - } -} - func TestBasicContentNegotiation(t *testing.T) { t.Parallel() queueConfig := config.DefaultQueueConfig @@ -323,7 +312,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg) + m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), nil, false, false, false, protoMsg) return m } @@ -783,7 +772,7 @@ func TestDisableReshardOnRetry(t *testing.T) { } ) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1) + m := NewQueueManager(metrics, nil, nil, nil, "", cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1) m.StoreSeries(fakeSeries, 0) // Attempt to samples while the manager is running. We immediately stop the @@ -1460,7 +1449,8 @@ func BenchmarkStoreSeries(b *testing.B) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1) + + m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1) m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs @@ -1560,9 +1550,8 @@ func TestCalculateDesiredShards(t *testing.T) { addSamples := func(s int64, ts time.Duration) { pendingSamples += s samplesIn.incr(s) - samplesIn.tick() - m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix())) + m.metrics.highestTimestamp.Set(float64(startedAt.Add(ts).Unix())) } // helper function for sending samples. @@ -1619,7 +1608,6 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { prevShards int dataIn int64 // Quantities normalised to seconds. dataOut int64 - dataDropped int64 dataOutDuration float64 backlog float64 expectedShards int @@ -1766,11 +1754,9 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { t.Run(tc.name, func(t *testing.T) { m.numShards = tc.prevShards forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second)) - samplesIn.tick() forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second)) - forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second)) forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration))) - m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value. + m.metrics.highestTimestamp.value = tc.backlog // Not Set() because it can only increase value. require.Equal(t, tc.expectedShards, m.calculateDesiredShards()) }) @@ -2481,3 +2467,27 @@ func TestPopulateV2TimeSeries_TypeAndUnitLabels(t *testing.T) { }) } } + +func TestHighestTimestampOnAppend(t *testing.T) { + for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} { + t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { + nSamples := 11 * config.DefaultQueueConfig.Capacity + nSeries := 3 + samples, series := createTimeseries(nSamples, nSeries) + + _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg) + m.Start() + defer m.Stop() + + require.Equal(t, 0.0, m.metrics.highestTimestamp.Get()) + + m.StoreSeries(series, 0) + require.True(t, m.Append(samples)) + + // Check that Append sets the highest timestamp correctly. + highestTs := float64((nSamples - 1) / 1000) + require.Greater(t, highestTs, 0.0) + require.Equal(t, highestTs, m.metrics.highestTimestamp.Get()) + }) + } +} diff --git a/storage/remote/write.go b/storage/remote/write.go index b4458deac2..4bc7f3ac22 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -66,11 +66,9 @@ type WriteStorage struct { externalLabels labels.Labels dir string queues map[string]*QueueManager - samplesIn *ewmaRate flushDeadline time.Duration interner *pool scraper ReadyScrapeManager - quit chan struct{} // For timestampTracker. highestTimestamp *maxTimestamp @@ -89,11 +87,9 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, logger: logger, reg: reg, flushDeadline: flushDeadline, - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), dir: dir, interner: newPool(), scraper: sm, - quit: make(chan struct{}), highestTimestamp: &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -107,23 +103,9 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, if reg != nil { reg.MustRegister(rws.highestTimestamp) } - go rws.run() return rws } -func (rws *WriteStorage) run() { - ticker := time.NewTicker(shardUpdateDuration) - defer ticker.Stop() - for { - select { - case <-ticker.C: - rws.samplesIn.tick() - case <-rws.quit: - return - } - } -} - func (rws *WriteStorage) Notify() { rws.mtx.Lock() defer rws.mtx.Unlock() @@ -201,7 +183,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.liveReaderMetrics, rws.logger, rws.dir, - rws.samplesIn, rwConf.QueueConfig, rwConf.MetadataConfig, conf.GlobalConfig.ExternalLabels, @@ -209,7 +190,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { c, rws.flushDeadline, rws.interner, - rws.highestTimestamp, rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, @@ -270,7 +250,6 @@ func (rws *WriteStorage) Close() error { for _, q := range rws.queues { q.Stop() } - close(rws.quit) rws.watcherMetrics.Unregister() rws.liveReaderMetrics.Unregister() @@ -346,8 +325,6 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { - t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) - samplesIn.Add(float64(t.samples)) exemplarsIn.Add(float64(t.exemplars)) histogramsIn.Add(float64(t.histograms))