mirror of
https://github.com/prometheus/prometheus.git
synced 2026-04-19 12:41:04 +02:00
tsdb: move lastHistogramValue/lastFloatHistogramValue from memSeries to appenders
Store the last appended histogram and float histogram in the HistogramAppender and FloatHistogramAppender respectively, instead of keeping redundant pointers in memSeries. HistogramAppender gains LastHistogram()/SetLastHistogram() and FloatHistogramAppender gains LastFloatHistogram()/SetLastFloatHistogram(). All callers in head_append.go, head.go, head_read.go and head_wal.go are updated to type-assert on series.app and call the new accessors. The chunkSnapshotRecord fields are left unchanged (WAL on-disk format must not change); snapshot load now initialises the appender from the snapshot record via the new setters. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
971e64756e
commit
7b0428aa6a
@ -179,12 +179,28 @@ type FloatHistogramAppender struct {
|
||||
t, tDelta int64
|
||||
sum, cnt, zCnt xorValue
|
||||
pBuckets, nBuckets []xorValue
|
||||
|
||||
// lastValue is the most recently appended float histogram, retained for
|
||||
// duplicate detection and stale-marker tracking.
|
||||
lastValue *histogram.FloatHistogram
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader {
|
||||
return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
// LastFloatHistogram returns the most recently appended float histogram, or nil
|
||||
// if none has been appended yet.
|
||||
func (a *FloatHistogramAppender) LastFloatHistogram() *histogram.FloatHistogram {
|
||||
return a.lastValue
|
||||
}
|
||||
|
||||
// SetLastFloatHistogram sets the last float histogram value. Used to restore
|
||||
// state after loading the appender from a snapshot.
|
||||
func (a *FloatHistogramAppender) SetLastFloatHistogram(fh *histogram.FloatHistogram) {
|
||||
a.lastValue = fh
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
|
||||
a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
|
||||
}
|
||||
@ -606,6 +622,7 @@ func (a *FloatHistogramAppender) appendFloatHistogram(t int64, h *histogram.Floa
|
||||
|
||||
a.t = t
|
||||
a.tDelta = tDelta
|
||||
a.lastValue = h
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) {
|
||||
|
||||
@ -203,12 +203,28 @@ type HistogramAppender struct {
|
||||
sum float64
|
||||
leading uint8
|
||||
trailing uint8
|
||||
|
||||
// lastValue is the most recently appended histogram, retained for
|
||||
// duplicate detection and stale-marker tracking.
|
||||
lastValue *histogram.Histogram
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
|
||||
return CounterResetHeader(a.b.bytes()[histogramFlagPos] & CounterResetHeaderMask)
|
||||
}
|
||||
|
||||
// LastHistogram returns the most recently appended histogram, or nil if none
|
||||
// has been appended yet.
|
||||
func (a *HistogramAppender) LastHistogram() *histogram.Histogram {
|
||||
return a.lastValue
|
||||
}
|
||||
|
||||
// SetLastHistogram sets the last histogram value. Used to restore state after
|
||||
// loading the appender from a snapshot.
|
||||
func (a *HistogramAppender) SetLastHistogram(h *histogram.Histogram) {
|
||||
a.lastValue = h
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
|
||||
a.b.bytes()[histogramFlagPos] = (a.b.bytes()[histogramFlagPos] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
|
||||
}
|
||||
@ -659,6 +675,7 @@ func (a *HistogramAppender) appendHistogram(t int64, h *histogram.Histogram) {
|
||||
copy(a.nBuckets, h.NegativeBuckets)
|
||||
// Note that the bucket deltas were already updated above.
|
||||
a.sum = h.Sum
|
||||
a.lastValue = h
|
||||
}
|
||||
|
||||
// recode converts the current chunk to accommodate an expansion of the set of
|
||||
|
||||
28
tsdb/head.go
28
tsdb/head.go
@ -2141,9 +2141,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
||||
defer s.locks[refShard].Unlock()
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
if value.IsStaleNaN(series.lastValue) || series.isHistogramStale() {
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
@ -2232,9 +2230,7 @@ func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
|
||||
h.series.hashes[hashShard].del(hash, series.ref)
|
||||
h.series.locks[hashShard].Unlock()
|
||||
|
||||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
if value.IsStaleNaN(series.lastValue) || series.isHistogramStale() {
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
@ -2292,9 +2288,7 @@ func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64)
|
||||
}
|
||||
|
||||
// Check if the series is still stale.
|
||||
isStale := value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum))
|
||||
isStale := value.IsStaleNaN(series.lastValue) || series.isHistogramStale()
|
||||
|
||||
if !isStale {
|
||||
return
|
||||
@ -2484,10 +2478,6 @@ type memSeries struct {
|
||||
// We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates.
|
||||
lastValue float64
|
||||
|
||||
// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
|
||||
lastHistogramValue *histogram.Histogram
|
||||
lastFloatHistogramValue *histogram.FloatHistogram
|
||||
|
||||
// Current appender for the head chunk. Set when a new head chunk is cut.
|
||||
// It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit
|
||||
// (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series).
|
||||
@ -2497,6 +2487,18 @@ type memSeries struct {
|
||||
txs *txRing
|
||||
}
|
||||
|
||||
// isHistogramStale reports whether the most recently appended histogram or float
|
||||
// histogram sample is a stale marker. The series lock must be held.
|
||||
func (s *memSeries) isHistogramStale() bool {
|
||||
if app, ok := s.app.(*chunkenc.HistogramAppender); ok && app.LastHistogram() != nil {
|
||||
return value.IsStaleNaN(app.LastHistogram().Sum)
|
||||
}
|
||||
if app, ok := s.app.(*chunkenc.FloatHistogramAppender); ok && app.LastFloatHistogram() != nil {
|
||||
return value.IsStaleNaN(app.LastFloatHistogram().Sum)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// memSeriesOOOFields contains the fields required by memSeries
|
||||
// to handle out-of-order data.
|
||||
type memSeriesOOOFields struct {
|
||||
|
||||
@ -660,7 +660,8 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
|
||||
// like federation and erroring out at that time would be extremely noisy.
|
||||
// This only checks against the latest in-order sample.
|
||||
// The OOO headchunk has its own method to detect these duplicates.
|
||||
if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil {
|
||||
switch s.app.(type) {
|
||||
case *chunkenc.HistogramAppender, *chunkenc.FloatHistogramAppender:
|
||||
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
|
||||
}
|
||||
if math.Float64bits(s.lastValue) != math.Float64bits(v) {
|
||||
@ -705,7 +706,8 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMax
|
||||
// like federation and erroring out at that time would be extremely noisy.
|
||||
// This only checks against the latest in-order sample.
|
||||
// The OOO headchunk has its own method to detect these duplicates.
|
||||
if !h.Equals(s.lastHistogramValue) {
|
||||
app, ok := s.app.(*chunkenc.HistogramAppender)
|
||||
if !ok || !h.Equals(app.LastHistogram()) {
|
||||
return false, 0, storage.ErrDuplicateSampleForTimestamp
|
||||
}
|
||||
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
|
||||
@ -747,7 +749,8 @@ func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogr
|
||||
// like federation and erroring out at that time would be extremely noisy.
|
||||
// This only checks against the latest in-order sample.
|
||||
// The OOO headchunk has its own method to detect these duplicates.
|
||||
if !fh.Equals(s.lastFloatHistogramValue) {
|
||||
app, ok := s.app.(*chunkenc.FloatHistogramAppender)
|
||||
if !ok || !fh.Equals(app.LastFloatHistogram()) {
|
||||
return false, 0, storage.ErrDuplicateSampleForTimestamp
|
||||
}
|
||||
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
|
||||
@ -1352,8 +1355,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
|
||||
// sample for this same series in this same batch
|
||||
// (because any such sample would have triggered a new
|
||||
// batch).
|
||||
switch {
|
||||
case series.lastHistogramValue != nil:
|
||||
switch series.app.(type) {
|
||||
case *chunkenc.HistogramAppender:
|
||||
b.histograms = append(b.histograms, record.RefHistogramSample{
|
||||
Ref: series.ref,
|
||||
T: s.T,
|
||||
@ -1365,7 +1368,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
|
||||
acc.histogramsAppended++
|
||||
series.Unlock()
|
||||
continue
|
||||
case series.lastFloatHistogramValue != nil:
|
||||
case *chunkenc.FloatHistogramAppender:
|
||||
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
|
||||
Ref: series.ref,
|
||||
T: s.T,
|
||||
@ -1541,9 +1544,10 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC
|
||||
default:
|
||||
newlyStale := value.IsStaleNaN(s.H.Sum)
|
||||
staleToNonStale := false
|
||||
if series.lastHistogramValue != nil {
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
|
||||
if app, ok2 := series.app.(*chunkenc.HistogramAppender); ok2 && app.LastHistogram() != nil {
|
||||
prevSum := app.LastHistogram().Sum
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(prevSum)
|
||||
staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.H.Sum)
|
||||
}
|
||||
// TODO(krajorama,ywwg): pass ST when available in WAL.
|
||||
ok, chunkCreated = series.appendHistogram(0, s.T, s.H, a.appendID, acc.appendChunkOpts)
|
||||
@ -1652,9 +1656,10 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo
|
||||
default:
|
||||
newlyStale := value.IsStaleNaN(s.FH.Sum)
|
||||
staleToNonStale := false
|
||||
if series.lastFloatHistogramValue != nil {
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
|
||||
if app, ok2 := series.app.(*chunkenc.FloatHistogramAppender); ok2 && app.LastFloatHistogram() != nil {
|
||||
prevSum := app.LastFloatHistogram().Sum
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(prevSum)
|
||||
staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.FH.Sum)
|
||||
}
|
||||
// TODO(krajorama,ywwg): pass ST when available in WAL.
|
||||
ok, chunkCreated = series.appendFloatHistogram(0, s.T, s.FH, a.appendID, acc.appendChunkOpts)
|
||||
@ -1853,8 +1858,6 @@ func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts)
|
||||
c.maxTime = t
|
||||
|
||||
s.lastValue = v
|
||||
s.lastHistogramValue = nil
|
||||
s.lastFloatHistogramValue = nil
|
||||
|
||||
if appendID > 0 {
|
||||
s.txs.add(appendID)
|
||||
@ -1892,9 +1895,6 @@ func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendI
|
||||
|
||||
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed
|
||||
|
||||
s.lastHistogramValue = h
|
||||
s.lastFloatHistogramValue = nil
|
||||
|
||||
if appendID > 0 {
|
||||
s.txs.add(appendID)
|
||||
}
|
||||
@ -1949,9 +1949,6 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr
|
||||
|
||||
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed.
|
||||
|
||||
s.lastHistogramValue = nil
|
||||
s.lastFloatHistogramValue = fh
|
||||
|
||||
if appendID > 0 {
|
||||
s.txs.add(appendID)
|
||||
}
|
||||
|
||||
@ -270,9 +270,7 @@ func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.Ser
|
||||
continue
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(s.lastValue) ||
|
||||
(s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) ||
|
||||
(s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) {
|
||||
if value.IsStaleNaN(s.lastValue) || s.isHistogramStale() {
|
||||
series = append(series, s)
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
@ -711,17 +711,19 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
var chunkCreated, newlyStale, staleToNonStale bool
|
||||
if s.h != nil {
|
||||
newlyStale = value.IsStaleNaN(s.h.Sum)
|
||||
if ms.lastHistogramValue != nil {
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum)
|
||||
if app, ok := ms.app.(*chunkenc.HistogramAppender); ok && app.LastHistogram() != nil {
|
||||
prevSum := app.LastHistogram().Sum
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(prevSum)
|
||||
staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.h.Sum)
|
||||
}
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
_, chunkCreated = ms.appendHistogram(0, s.t, s.h, 0, appendChunkOpts)
|
||||
} else {
|
||||
newlyStale = value.IsStaleNaN(s.fh.Sum)
|
||||
if ms.lastFloatHistogramValue != nil {
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum)
|
||||
staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum)
|
||||
if app, ok := ms.app.(*chunkenc.FloatHistogramAppender); ok && app.LastFloatHistogram() != nil {
|
||||
prevSum := app.LastFloatHistogram().Sum
|
||||
newlyStale = newlyStale && !value.IsStaleNaN(prevSum)
|
||||
staleToNonStale = value.IsStaleNaN(prevSum) && !value.IsStaleNaN(s.fh.Sum)
|
||||
}
|
||||
// TODO(krajorama,ywwg): Pass ST when available in WBL.
|
||||
_, chunkCreated = ms.appendFloatHistogram(0, s.t, s.fh, 0, appendChunkOpts)
|
||||
@ -1221,9 +1223,17 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
||||
buf.PutBE64int64(0)
|
||||
buf.PutBEFloat64(s.lastValue)
|
||||
case chunkenc.EncHistogram:
|
||||
record.EncodeHistogram(&buf, s.lastHistogramValue)
|
||||
var lastH *histogram.Histogram
|
||||
if app, ok := s.app.(*chunkenc.HistogramAppender); ok {
|
||||
lastH = app.LastHistogram()
|
||||
}
|
||||
record.EncodeHistogram(&buf, lastH)
|
||||
default: // chunkenc.FloatHistogram.
|
||||
record.EncodeFloatHistogram(&buf, s.lastFloatHistogramValue)
|
||||
var lastFH *histogram.FloatHistogram
|
||||
if app, ok := s.app.(*chunkenc.FloatHistogramAppender); ok {
|
||||
lastFH = app.LastFloatHistogram()
|
||||
}
|
||||
record.EncodeFloatHistogram(&buf, lastFH)
|
||||
}
|
||||
}
|
||||
s.Unlock()
|
||||
@ -1661,14 +1671,6 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
|
||||
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
|
||||
series.headChunks = csr.mc
|
||||
series.lastValue = csr.lastValue
|
||||
series.lastHistogramValue = csr.lastHistogramValue
|
||||
series.lastFloatHistogramValue = csr.lastFloatHistogramValue
|
||||
|
||||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
h.numStaleSeries.Inc()
|
||||
}
|
||||
|
||||
app, err := series.headChunks.chunk.Appender()
|
||||
if err != nil {
|
||||
@ -1677,6 +1679,22 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
|
||||
}
|
||||
series.app = app
|
||||
|
||||
// Restore the last histogram pointer so that duplicate detection
|
||||
// and stale-marker tracking work after snapshot load.
|
||||
if csr.lastHistogramValue != nil {
|
||||
if histApp, ok := app.(*chunkenc.HistogramAppender); ok {
|
||||
histApp.SetLastHistogram(csr.lastHistogramValue)
|
||||
}
|
||||
} else if csr.lastFloatHistogramValue != nil {
|
||||
if fhApp, ok := app.(*chunkenc.FloatHistogramAppender); ok {
|
||||
fhApp.SetLastFloatHistogram(csr.lastFloatHistogramValue)
|
||||
}
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(series.lastValue) || series.isHistogramStale() {
|
||||
h.numStaleSeries.Inc()
|
||||
}
|
||||
|
||||
h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime)
|
||||
}
|
||||
}(i, recordChan)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user