diff --git a/tsdb/head.go b/tsdb/head.go index 7763d272b7..ab2a8575d2 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -68,6 +69,7 @@ var ( type Head struct { chunkRange atomic.Int64 numSeries atomic.Uint64 + numStaleSeries atomic.Uint64 minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection. minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked. minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. @@ -360,6 +362,7 @@ func (h *Head) resetWLReplayResources() { type headMetrics struct { activeAppenders prometheus.Gauge series prometheus.GaugeFunc + staleSeries prometheus.GaugeFunc seriesCreated prometheus.Counter seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter @@ -406,6 +409,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { }, func() float64 { return float64(h.NumSeries()) }), + staleSeries: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_stale_series", + Help: "Total number of stale series in the head block.", + }, func() float64 { + return float64(h.NumStaleSeries()) + }), seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", Help: "Total number of series created in the head", @@ -1607,7 +1616,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) + deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) @@ -1645,11 +1654,16 @@ func (h *Head) Tombstones() (tombstones.Reader, error) { return h.tombstones, nil } -// NumSeries returns the number of active series in the head. +// NumSeries returns the number of series tracked in the head. func (h *Head) NumSeries() uint64 { return h.numSeries.Load() } +// NumStaleSeries returns the number of stale series in the head. +func (h *Head) NumStaleSeries() uint64 { + return h.numStaleSeries.Load() +} + var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") // Meta returns meta information about the head. @@ -1929,7 +1943,7 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct // and there's no easy way to cast maps. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. -func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { +func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { var ( deleted = map[storage.SeriesRef]struct{}{} affected = map[labels.Label]struct{}{} @@ -1987,6 +2001,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( defer s.locks[refShard].Unlock() } + if value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + numStaleSeries.Dec() + } + deleted[storage.SeriesRef(series.ref)] = struct{}{} series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) s.hashes[hashShard].del(hash, series.ref) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 05299f048d..fa44f752f2 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1222,6 +1222,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { acc.floatsAppended-- } default: + newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) + staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1230,6 +1232,12 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { // The sample is an exact duplicate, and should be silently dropped. acc.floatsAppended-- @@ -1310,6 +1318,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { acc.histogramsAppended-- } default: + newlyStale := value.IsStaleNaN(s.H.Sum) + staleToNonStale := false + if series.lastHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) + } ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1318,6 +1332,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { acc.histogramsAppended-- acc.histoOOORejected++ @@ -1398,6 +1418,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { acc.histogramsAppended-- } default: + newlyStale := value.IsStaleNaN(s.FH.Sum) + staleToNonStale := false + if series.lastFloatHistogramValue != nil { + newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) + staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) + } ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { @@ -1406,6 +1432,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { if s.T > acc.inOrderMaxt { acc.inOrderMaxt = s.T } + if newlyStale { + a.head.numStaleSeries.Inc() + } + if staleToNonStale { + a.head.numStaleSeries.Dec() + } } else { acc.histogramsAppended-- acc.histoOOORejected++ diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 73f67f4e8a..b1856c0419 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6435,7 +6435,7 @@ func TestStripeSeries_gc(t *testing.T) { s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) hash := ms1.lset.Hash() - s.gc(0, 0) + s.gc(0, 0, nil) // Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series got := s.getByHash(hash, ms1.lset) @@ -6866,3 +6866,130 @@ func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(sto wg.Wait() } + +func TestHead_NumStaleSeries(t *testing.T) { + head, _ := newTestHead(t, 1000, compression.None, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + // Initially, no series should be stale. + require.Equal(t, uint64(0), head.NumStaleSeries()) + + appendSample := func(lbls labels.Labels, ts int64, val float64) { + app := head.Appender(context.Background()) + _, err := app.Append(0, lbls, ts, val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + appendHistogram := func(lbls labels.Labels, ts int64, val *histogram.Histogram) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, lbls, ts, val, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + appendFloatHistogram := func(lbls labels.Labels, ts int64, val *histogram.FloatHistogram) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, lbls, ts, nil, val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + verifySeriesCounts := func(numStaleSeries, numSeries int) { + require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries()) + require.Equal(t, uint64(numSeries), head.NumSeries()) + } + + // Create some series with normal samples. + series1 := labels.FromStrings("name", "series1", "label", "value1") + series2 := labels.FromStrings("name", "series2", "label", "value2") + series3 := labels.FromStrings("name", "series3", "label", "value3") + + // Add normal samples to all series. + appendSample(series1, 100, 1) + appendSample(series2, 100, 2) + appendSample(series3, 100, 3) + // Still no stale series. + verifySeriesCounts(0, 3) + + // Make series1 stale by appending a stale sample. Now we should have 1 stale series. + appendSample(series1, 200, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(1, 3) + + // Make series2 stale as well. + appendSample(series2, 200, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(2, 3) + + // Add a non-stale sample to series1. It should not be counted as stale now. + appendSample(series1, 300, 10) + verifySeriesCounts(1, 3) + + // Test that series3 doesn't become stale when we add another normal sample. + appendSample(series3, 200, 10) + verifySeriesCounts(1, 3) + + // Test histogram stale samples as well. + series4 := labels.FromStrings("name", "series4", "type", "histogram") + h := tsdbutil.GenerateTestHistograms(1)[0] + appendHistogram(series4, 100, h) + verifySeriesCounts(1, 4) + + // Make histogram series stale. + staleHist := h.Copy() + staleHist.Sum = math.Float64frombits(value.StaleNaN) + appendHistogram(series4, 200, staleHist) + verifySeriesCounts(2, 4) + + // Test float histogram stale samples. + series5 := labels.FromStrings("name", "series5", "type", "float_histogram") + fh := tsdbutil.GenerateTestFloatHistograms(1)[0] + appendFloatHistogram(series5, 100, fh) + verifySeriesCounts(2, 5) + + // Make float histogram series stale. + staleFH := fh.Copy() + staleFH.Sum = math.Float64frombits(value.StaleNaN) + appendFloatHistogram(series5, 200, staleFH) + verifySeriesCounts(3, 5) + + // Make histogram sample non-stale and stale back again. + appendHistogram(series4, 210, h) + verifySeriesCounts(2, 5) + appendHistogram(series4, 220, staleHist) + verifySeriesCounts(3, 5) + + // Make float histogram sample non-stale and stale back again. + appendFloatHistogram(series5, 210, fh) + verifySeriesCounts(2, 5) + appendFloatHistogram(series5, 220, staleFH) + verifySeriesCounts(3, 5) + + // Series 1 and 3 are not stale at this point. Add a new sample to series 1 and series 5, + // so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series. + appendSample(series1, 400, 10) + appendFloatHistogram(series5, 400, staleFH) + verifySeriesCounts(3, 5) + + // Test garbage collection behavior - stale series should be decremented when GC'd. + // Force a garbage collection by truncating old data. + require.NoError(t, head.Truncate(300)) + + // After truncation, run GC to collect old chunks/series. + head.gc() + + // series 1 and series 5 are left. + verifySeriesCounts(1, 2) + + // Test creating a new series for each of float, histogram, float histogram that starts as stale. + // This should be counted as stale. + series6 := labels.FromStrings("name", "series6", "direct", "stale") + series7 := labels.FromStrings("name", "series7", "direct", "stale") + series8 := labels.FromStrings("name", "series8", "direct", "stale") + appendSample(series6, 400, math.Float64frombits(value.StaleNaN)) + verifySeriesCounts(2, 3) + appendHistogram(series7, 400, staleHist) + verifySeriesCounts(3, 4) + appendFloatHistogram(series8, 400, staleFH) + verifySeriesCounts(4, 5) +}