tsdb: Refactor staleness marker handling

With the fixed commit order, we can now handle the conversion of float
staleness markers to histogram staleness markers in a more direct way.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2025-09-09 18:37:49 +02:00
parent 385d2800c9
commit b1fbf4f1e2

View File

@ -408,21 +408,27 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
} }
} }
s.Lock()
if value.IsStaleNaN(v) { if value.IsStaleNaN(v) {
// TODO(krajorama): reorganize Commit() to handle samples in append order // If we have added a sample before with this same appender, we
// not floats first and then histograms. Then we could do this conversion // can check the previously used type and turn a stale float
// in commit. This code should move into Commit(). // sample into a stale histogram sample or stale float histogram
switch { // sample as appropriate. This prevents an unnecessary creation
case s.lastHistogramValue != nil: // of a new batch. However, since other appenders might append
s.Unlock() // to the same series concurrently, this is not perfect but just
// an optimization for the more likely case.
switch a.typesInBatch[s.ref] {
case stHistogram, stCustomBucketHistogram:
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil)
case s.lastFloatHistogramValue != nil: case stFloatHistogram, stCustomBucketFloatHistogram:
s.Unlock()
return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v}) return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v})
} }
// Note that a series reference not yet in the map will come out
// as stNone, but since we do not handle that case separately,
// we do not need to check for the difference between "unknown
// series" and "known series with stNone".
} }
s.Lock()
defer s.Unlock() defer s.Unlock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL. // to skip that sample from the WAL and write only in the WBL.
@ -780,11 +786,10 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
} }
} }
var created bool
s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil { if s == nil {
var err error var err error
s, created, err = a.getOrCreate(lset) s, _, err = a.getOrCreate(lset)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -793,14 +798,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
switch { switch {
case h != nil: case h != nil:
s.Lock() s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastHistogramValue = &histogram.Histogram{}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL. // to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow) _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow)
@ -833,14 +830,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
b.histogramSeries = append(b.histogramSeries, s) b.histogramSeries = append(b.histogramSeries, s)
case fh != nil: case fh != nil:
s.Lock() s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL. // to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow) _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow)
@ -892,11 +881,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
return 0, storage.ErrCTNewerThanSample return 0, storage.ErrCTNewerThanSample
} }
var created bool
s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil { if s == nil {
var err error var err error
s, created, err = a.getOrCreate(lset) s, _, err = a.getOrCreate(lset)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -913,14 +901,6 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
CustomValues: h.CustomValues, CustomValues: h.CustomValues,
} }
s.Lock() s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastHistogramValue = zeroHistogram
}
// For CTZeroSamples OOO is not allowed. // For CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation. // We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow)
@ -963,14 +943,6 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
CustomValues: fh.CustomValues, CustomValues: fh.CustomValues,
} }
s.Lock() s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastFloatHistogramValue = zeroFloatHistogram
}
// We set it to true to make this implementation as close as possible to the float implementation. // We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples. isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples.
if err != nil { if err != nil {
@ -1318,6 +1290,60 @@ func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext)
series = b.floatSeries[i] series = b.floatSeries[i]
series.Lock() series.Lock()
if value.IsStaleNaN(s.V) {
// If a float staleness marker had been appended for a
// series that got a histogram or float histogram
// appended before via this same appender, it would not
// show up here because we had already converted it. We
// end up here for two reasons: (1) This is the very
// first sample for this series appended via this
// appender. (2) A float sample was appended to this
// series before via this same appender.
//
// In either case, we need to check the previous sample
// in the memSeries to append the appropriately typed
// staleness marker. This is obviously so in case (1).
// In case (2), we would usually expect a float sample
// as the previous sample, but there might be concurrent
// appends that have added a histogram sample in the
// meantime. (This will probably lead to OOO shenanigans
// anyway, but that's a different story.)
//
// If the last sample in the memSeries is indeed a
// float, we don't have to do anything special here and
// just go on with the normal commit for a float sample.
// However, if the last sample in the memSeries is a
// histogram or float histogram, we have to convert the
// staleness marker to a histogram (or float histogram,
// respectively), and just add it at the end of the
// histograms (or float histograms) in the same batch,
// to be committed later in commitHistograms (or
// commitFloatHistograms). The latter is fine because we
// know there is no other histogram (or float histogram)
// sample for this same series in this same batch
// (because any such sample would have triggered a new
// batch).
switch {
case series.lastHistogramValue != nil:
b.histograms = append(b.histograms, record.RefHistogramSample{
Ref: series.ref,
T: s.T,
H: &histogram.Histogram{Sum: s.V},
})
b.histogramSeries = append(b.histogramSeries, series)
series.Unlock()
continue
case series.lastFloatHistogramValue != nil:
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
Ref: series.ref,
T: s.T,
FH: &histogram.FloatHistogram{Sum: s.V},
})
b.floatHistogramSeries = append(b.floatHistogramSeries, series)
series.Unlock()
continue
}
}
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err != nil { if err != nil {
handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected) handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected)
@ -1414,6 +1440,15 @@ func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitConte
series = b.histogramSeries[i] series = b.histogramSeries[i]
series.Lock() series.Lock()
// At this point, we could encounter a histogram staleness
// marker that should better be a float staleness marker or a
// float histogram staleness marker. This can only happen with
// concurrent appenders appending to the same series _and_ doing
// so in a mixed-type scenario. This case is expected to be very
// rare, so we do not bother here to convert the staleness
// marker. The worst case is that we need to cut a new chunk
// just for the staleness marker.
oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow) oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err != nil { if err != nil {
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
@ -1514,6 +1549,15 @@ func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommit
series = b.floatHistogramSeries[i] series = b.floatHistogramSeries[i]
series.Lock() series.Lock()
// At this point, we could encounter a float histogram staleness
// marker that should better be a float staleness marker or an
// integer histogram staleness marker. This can only happen with
// concurrent appenders appending to the same series _and_ doing
// so in a mixed-type scenario. This case is expected to be very
// rare, so we do not bother here to convert the staleness
// marker. The worst case is that we need to cut a new chunk
// just for the staleness marker.
oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow) oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err != nil { if err != nil {
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
@ -1680,6 +1724,8 @@ func (a *headAppender) Commit() (err error) {
}() }()
for _, b := range a.batches { for _, b := range a.batches {
// Do not change the order of these calls. The staleness marker
// handling depends on it.
a.commitFloats(b, acc) a.commitFloats(b, acc)
a.commitHistograms(b, acc) a.commitHistograms(b, acc)
a.commitFloatHistograms(b, acc) a.commitFloatHistograms(b, acc)