From 7b0428aa6aaa3c9e2f793295381beeba26ef034d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Mon, 13 Apr 2026 11:52:02 +0200 Subject: [PATCH] tsdb: move lastHistogramValue/lastFloatHistogramValue from memSeries to appenders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Store the last appended histogram and float histogram in the HistogramAppender and FloatHistogramAppender respectively, instead of keeping redundant pointers in memSeries. HistogramAppender gains LastHistogram()/SetLastHistogram() and FloatHistogramAppender gains LastFloatHistogram()/SetLastFloatHistogram(). All callers in head_append.go, head.go, head_read.go and head_wal.go are updated to type-assert on series.app and call the new accessors. The chunkSnapshotRecord fields are left unchanged (WAL on-disk format must not change); snapshot load now initialises the appender from the snapshot record via the new setters. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: György Krajcsovits --- tsdb/chunkenc/float_histogram.go | 17 +++++++++++ tsdb/chunkenc/histogram.go | 17 +++++++++++ tsdb/head.go | 28 +++++++++--------- tsdb/head_append.go | 37 +++++++++++------------ tsdb/head_read.go | 4 +-- tsdb/head_wal.go | 50 ++++++++++++++++++++++---------- 6 files changed, 101 insertions(+), 52 deletions(-) diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 6af2fa68e2..f4f33da42f 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -179,12 +179,28 @@ type FloatHistogramAppender struct { t, tDelta int64 sum, cnt, zCnt xorValue pBuckets, nBuckets []xorValue + + // lastValue is the most recently appended float histogram, retained for + // duplicate detection and stale-marker tracking. + lastValue *histogram.FloatHistogram } func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader { return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask) } +// LastFloatHistogram returns the most recently appended float histogram, or nil +// if none has been appended yet. +func (a *FloatHistogramAppender) LastFloatHistogram() *histogram.FloatHistogram { + return a.lastValue +} + +// SetLastFloatHistogram sets the last float histogram value. Used to restore +// state after loading the appender from a snapshot. +func (a *FloatHistogramAppender) SetLastFloatHistogram(fh *histogram.FloatHistogram) { + a.lastValue = fh +} + func (a *FloatHistogramAppender) setCounterResetHeader(cr CounterResetHeader) { a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask) } @@ -606,6 +622,7 @@ func (a *FloatHistogramAppender) appendFloatHistogram(t int64, h *histogram.Floa a.t = t a.tDelta = tDelta + a.lastValue = h } func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) { diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 4e77f387d3..ca4ef5ee86 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -203,12 +203,28 @@ type HistogramAppender struct { sum float64 leading uint8 trailing uint8 + + // lastValue is the most recently appended histogram, retained for + // duplicate detection and stale-marker tracking. + lastValue *histogram.Histogram } func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader { return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask) } +// LastHistogram returns the most recently appended histogram, or nil if none +// has been appended yet. +func (a *HistogramAppender) LastHistogram() *histogram.Histogram { + return a.lastValue +} + +// SetLastHistogram sets the last histogram value. Used to restore state after +// loading the appender from a snapshot. +func (a *HistogramAppender) SetLastHistogram(h *histogram.Histogram) { + a.lastValue = h +} + func (a *HistogramAppender) setCounterResetHeader(cr CounterResetHeader) { a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask) } @@ -659,6 +675,7 @@ func (a *HistogramAppender) appendHistogram(t int64, h *histogram.Histogram) { copy(a.nBuckets, h.NegativeBuckets) // Note that the bucket deltas were already updated above. a.sum = h.Sum + a.lastValue = h } // recode converts the current chunk to accommodate an expansion of the set of diff --git a/tsdb/head.go b/tsdb/head.go index 73f2c999b2..14de5a816c 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2141,9 +2141,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( defer s.locks[refShard].Unlock() } - if value.IsStaleNaN(series.lastValue) || - (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || - (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + if value.IsStaleNaN(series.lastValue) || series.isHistogramStale() { staleSeriesDeleted++ } @@ -2232,9 +2230,7 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) { h.series.hashes[hashShard].del(hash, series.ref) h.series.locks[hashShard].Unlock() - if value.IsStaleNaN(series.lastValue) || - (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || - (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + if value.IsStaleNaN(series.lastValue) || series.isHistogramStale() { staleSeriesDeleted++ } @@ -2292,9 +2288,7 @@ func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) } // Check if the series is still stale. - isStale := value.IsStaleNaN(series.lastValue) || - (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || - (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) + isStale := value.IsStaleNaN(series.lastValue) || series.isHistogramStale() if !isStale { return @@ -2484,10 +2478,6 @@ type memSeries struct { // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. lastValue float64 - // We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates. - lastHistogramValue *histogram.Histogram - lastFloatHistogramValue *histogram.FloatHistogram - // Current appender for the head chunk. Set when a new head chunk is cut. // It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit // (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series). @@ -2497,6 +2487,18 @@ type memSeries struct { txs *txRing } +// isHistogramStale reports whether the most recently appended histogram or float +// histogram sample is a stale marker. The series lock must be held. +func (s *memSeries) isHistogramStale() bool { + if app, ok := s.app.(*chunkenc.HistogramAppender); ok && app.LastHistogram() != nil { + return value.IsStaleNaN(app.LastHistogram().Sum) + } + if app, ok := s.app.(*chunkenc.FloatHistogramAppender); ok && app.LastFloatHistogram() != nil { + return value.IsStaleNaN(app.LastFloatHistogram().Sum) + } + return false +} + // memSeriesOOOFields contains the fields required by memSeries // to handle out-of-order data. type memSeriesOOOFields struct { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index c7143d8d96..ec9a706398 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -660,7 +660,8 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil { + switch s.app.(type) { + case *chunkenc.HistogramAppender, *chunkenc.FloatHistogramAppender: return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v) } if math.Float64bits(s.lastValue) != math.Float64bits(v) { @@ -705,7 +706,8 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMax // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if !h.Equals(s.lastHistogramValue) { + app, ok := s.app.(*chunkenc.HistogramAppender) + if !ok || !h.Equals(app.LastHistogram()) { return false, 0, storage.ErrDuplicateSampleForTimestamp } // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. @@ -747,7 +749,8 @@ func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogr // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if !fh.Equals(s.lastFloatHistogramValue) { + app, ok := s.app.(*chunkenc.FloatHistogramAppender) + if !ok || !fh.Equals(app.LastFloatHistogram()) { return false, 0, storage.ErrDuplicateSampleForTimestamp } // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. @@ -1352,8 +1355,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte // sample for this same series in this same batch // (because any such sample would have triggered a new // batch). - switch { - case series.lastHistogramValue != nil: + switch series.app.(type) { + case *chunkenc.HistogramAppender: b.histograms = append(b.histograms, record.RefHistogramSample{ Ref: series.ref, T: s.T, @@ -1365,7 +1368,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte acc.histogramsAppended++ series.Unlock() continue - case series.lastFloatHistogramValue != nil: + case *chunkenc.FloatHistogramAppender: b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ Ref: series.ref, T: s.T, @@ -1541,9 +1544,10 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC default: newlyStale := value.IsStaleNaN(s.H.Sum) staleToNonStale := false - if series.lastHistogramValue != nil { - newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) - staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) + if app, ok2 := series.app.(*chunkenc.HistogramAppender); ok2 && app.LastHistogram() != nil { + prevSum := app.LastHistogram().Sum + newlyStale = newlyStale && !value.IsStaleNaN(prevSum) + staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.H.Sum) } // TODO(krajorama,ywwg): pass ST when available in WAL. ok, chunkCreated = series.appendHistogram(0, s.T, s.H, a.appendID, acc.appendChunkOpts) @@ -1652,9 +1656,10 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo default: newlyStale := value.IsStaleNaN(s.FH.Sum) staleToNonStale := false - if series.lastFloatHistogramValue != nil { - newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) - staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) + if app, ok2 := series.app.(*chunkenc.FloatHistogramAppender); ok2 && app.LastFloatHistogram() != nil { + prevSum := app.LastFloatHistogram().Sum + newlyStale = newlyStale && !value.IsStaleNaN(prevSum) + staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.FH.Sum) } // TODO(krajorama,ywwg): pass ST when available in WAL. ok, chunkCreated = series.appendFloatHistogram(0, s.T, s.FH, a.appendID, acc.appendChunkOpts) @@ -1853,8 +1858,6 @@ func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts) c.maxTime = t s.lastValue = v - s.lastHistogramValue = nil - s.lastFloatHistogramValue = nil if appendID > 0 { s.txs.add(appendID) @@ -1892,9 +1895,6 @@ func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendI newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed - s.lastHistogramValue = h - s.lastFloatHistogramValue = nil - if appendID > 0 { s.txs.add(appendID) } @@ -1949,9 +1949,6 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed. - s.lastHistogramValue = nil - s.lastFloatHistogramValue = fh - if appendID > 0 { s.txs.add(appendID) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index f0a1331fbb..798f9bbf44 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -270,9 +270,7 @@ func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.Ser continue } - if value.IsStaleNaN(s.lastValue) || - (s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) || - (s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) { + if value.IsStaleNaN(s.lastValue) || s.isHistogramStale() { series = append(series, s) } s.Unlock() diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index e9700dd82e..6ddc01d0ea 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -711,17 +711,19 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp var chunkCreated, newlyStale, staleToNonStale bool if s.h != nil { newlyStale = value.IsStaleNaN(s.h.Sum) - if ms.lastHistogramValue != nil { - newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum) - staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum) + if app, ok := ms.app.(*chunkenc.HistogramAppender); ok && app.LastHistogram() != nil { + prevSum := app.LastHistogram().Sum + newlyStale = newlyStale && !value.IsStaleNaN(prevSum) + staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.h.Sum) } // TODO(krajorama,ywwg): Pass ST when available in WBL. _, chunkCreated = ms.appendHistogram(0, s.t, s.h, 0, appendChunkOpts) } else { newlyStale = value.IsStaleNaN(s.fh.Sum) - if ms.lastFloatHistogramValue != nil { - newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) - staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum) + if app, ok := ms.app.(*chunkenc.FloatHistogramAppender); ok && app.LastFloatHistogram() != nil { + prevSum := app.LastFloatHistogram().Sum + newlyStale = newlyStale && !value.IsStaleNaN(prevSum) + staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.fh.Sum) } // TODO(krajorama,ywwg): Pass ST when available in WBL. _, chunkCreated = ms.appendFloatHistogram(0, s.t, s.fh, 0, appendChunkOpts) @@ -1221,9 +1223,17 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutBE64int64(0) buf.PutBEFloat64(s.lastValue) case chunkenc.EncHistogram: - record.EncodeHistogram(&buf, s.lastHistogramValue) + var lastH *histogram.Histogram + if app, ok := s.app.(*chunkenc.HistogramAppender); ok { + lastH = app.LastHistogram() + } + record.EncodeHistogram(&buf, lastH) default: // chunkenc.FloatHistogram. - record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue) + var lastFH *histogram.FloatHistogram + if app, ok := s.app.(*chunkenc.FloatHistogramAppender); ok { + lastFH = app.LastFloatHistogram() + } + record.EncodeFloatHistogram(&buf, lastFH) } } s.Unlock() @@ -1661,14 +1671,6 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie series.nextAt = csr.mc.maxTime // This will create a new chunk on append. series.headChunks = csr.mc series.lastValue = csr.lastValue - series.lastHistogramValue = csr.lastHistogramValue - series.lastFloatHistogramValue = csr.lastFloatHistogramValue - - if value.IsStaleNaN(series.lastValue) || - (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || - (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { - h.numStaleSeries.Inc() - } app, err := series.headChunks.chunk.Appender() if err != nil { @@ -1677,6 +1679,22 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie } series.app = app + // Restore the last histogram pointer so that duplicate detection + // and stale-marker tracking work after snapshot load. + if csr.lastHistogramValue != nil { + if histApp, ok := app.(*chunkenc.HistogramAppender); ok { + histApp.SetLastHistogram(csr.lastHistogramValue) + } + } else if csr.lastFloatHistogramValue != nil { + if fhApp, ok := app.(*chunkenc.FloatHistogramAppender); ok { + fhApp.SetLastFloatHistogram(csr.lastFloatHistogramValue) + } + } + + if value.IsStaleNaN(series.lastValue) || series.isHistogramStale() { + h.numStaleSeries.Inc() + } + h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime) } }(i, recordChan)