From b1fbf4f1e2abbfbd9b65a9e9ca7bb4bb48d11b8c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 9 Sep 2025 18:37:49 +0200 Subject: [PATCH] tsdb: Refactor staleness marker handling With the fixed commit order, we can now handle the conversion of float staleness markers to histogram staleness markers in a more direct way. Signed-off-by: beorn7 --- tsdb/head_append.go | 136 +++++++++++++++++++++++++++++--------------- 1 file changed, 91 insertions(+), 45 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 12cb34c883..a3bdee9684 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -408,21 +408,27 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } } - s.Lock() if value.IsStaleNaN(v) { - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we could do this conversion - // in commit. This code should move into Commit(). - switch { - case s.lastHistogramValue != nil: - s.Unlock() + // If we have added a sample before with this same appender, we + // can check the previously used type and turn a stale float + // sample into a stale histogram sample or stale float histogram + // sample as appropriate. This prevents an unnecessary creation + // of a new batch. However, since other appenders might append + // to the same series concurrently, this is not perfect but just + // an optimization for the more likely case. + switch a.typesInBatch[s.ref] { + case stHistogram, stCustomBucketHistogram: return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) - case s.lastFloatHistogramValue != nil: - s.Unlock() + case stFloatHistogram, stCustomBucketFloatHistogram: return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v}) } + // Note that a series reference not yet in the map will come out + // as stNone, but since we do not handle that case separately, + // we do not need to check for the difference between "unknown + // series" and "known series with stNone". } + s.Lock() defer s.Unlock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -780,11 +786,10 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } } - var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, created, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -793,14 +798,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels switch { case h != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = &histogram.Histogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -833,14 +830,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels b.histogramSeries = append(b.histogramSeries, s) case fh != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = &histogram.FloatHistogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -892,11 +881,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l return 0, storage.ErrCTNewerThanSample } - var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, created, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -913,14 +901,6 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l CustomValues: h.CustomValues, } s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = zeroHistogram - } - // For CTZeroSamples OOO is not allowed. // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -963,14 +943,6 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l CustomValues: fh.CustomValues, } s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = zeroFloatHistogram - } - // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples. if err != nil { @@ -1318,6 +1290,60 @@ func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) series = b.floatSeries[i] series.Lock() + if value.IsStaleNaN(s.V) { + // If a float staleness marker had been appended for a + // series that got a histogram or float histogram + // appended before via this same appender, it would not + // show up here because we had already converted it. We + // end up here for two reasons: (1) This is the very + // first sample for this series appended via this + // appender. (2) A float sample was appended to this + // series before via this same appender. + // + // In either case, we need to check the previous sample + // in the memSeries to append the appropriately typed + // staleness marker. This is obviously so in case (1). + // In case (2), we would usually expect a float sample + // as the previous sample, but there might be concurrent + // appends that have added a histogram sample in the + // meantime. (This will probably lead to OOO shenanigans + // anyway, but that's a different story.) + // + // If the last sample in the memSeries is indeed a + // float, we don't have to do anything special here and + // just go on with the normal commit for a float sample. + // However, if the last sample in the memSeries is a + // histogram or float histogram, we have to convert the + // staleness marker to a histogram (or float histogram, + // respectively), and just add it at the end of the + // histograms (or float histograms) in the same batch, + // to be committed later in commitHistograms (or + // commitFloatHistograms). The latter is fine because we + // know there is no other histogram (or float histogram) + // sample for this same series in this same batch + // (because any such sample would have triggered a new + // batch). + switch { + case series.lastHistogramValue != nil: + b.histograms = append(b.histograms, record.RefHistogramSample{ + Ref: series.ref, + T: s.T, + H: &histogram.Histogram{Sum: s.V}, + }) + b.histogramSeries = append(b.histogramSeries, series) + series.Unlock() + continue + case series.lastFloatHistogramValue != nil: + b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ + Ref: series.ref, + T: s.T, + FH: &histogram.FloatHistogram{Sum: s.V}, + }) + b.floatHistogramSeries = append(b.floatHistogramSeries, series) + series.Unlock() + continue + } + } oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected) @@ -1414,6 +1440,15 @@ func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitConte series = b.histogramSeries[i] series.Lock() + // At this point, we could encounter a histogram staleness + // marker that should better be a float staleness marker or a + // float histogram staleness marker. This can only happen with + // concurrent appenders appending to the same series _and_ doing + // so in a mixed-type scenario. This case is expected to be very + // rare, so we do not bother here to convert the staleness + // marker. The worst case is that we need to cut a new chunk + // just for the staleness marker. + oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) @@ -1514,6 +1549,15 @@ func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommit series = b.floatHistogramSeries[i] series.Lock() + // At this point, we could encounter a float histogram staleness + // marker that should better be a float staleness marker or an + // integer histogram staleness marker. This can only happen with + // concurrent appenders appending to the same series _and_ doing + // so in a mixed-type scenario. This case is expected to be very + // rare, so we do not bother here to convert the staleness + // marker. The worst case is that we need to cut a new chunk + // just for the staleness marker. + oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) @@ -1680,6 +1724,8 @@ func (a *headAppender) Commit() (err error) { }() for _, b := range a.batches { + // Do not change the order of these calls. The staleness marker + // handling depends on it. a.commitFloats(b, acc) a.commitHistograms(b, acc) a.commitFloatHistograms(b, acc)