From 46cfc9fb9910763a497c930b031d47bc4c76e78c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 17 Sep 2025 19:14:56 +0200 Subject: [PATCH 1/5] tsdb: Extend TestDataNotAvailableAfterRollback This exposes the ommission of float histograms from the rollback. Signed-off-by: beorn7 --- tsdb/db_test.go | 78 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 5224b2800f..b05dfb4440 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -300,19 +300,87 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { }() app := db.Appender(context.Background()) - _, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Append(0, labels.FromStrings("type", "float"), 0, 0) + require.NoError(t, err) + + _, err = app.AppendHistogram( + 0, labels.FromStrings("type", "histogram"), 0, + &histogram.Histogram{Count: 42, Sum: math.NaN()}, nil, + ) + require.NoError(t, err) + + _, err = app.AppendHistogram( + 0, labels.FromStrings("type", "floathistogram"), 0, + nil, &histogram.FloatHistogram{Count: 42, Sum: math.NaN()}, + ) require.NoError(t, err) err = app.Rollback() require.NoError(t, err) - querier, err := db.Querier(0, 1) + for _, typ := range []string{"float", "histogram", "floathistogram"} { + querier, err := db.Querier(0, 1) + require.NoError(t, err) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "type", typ)) + require.Equal(t, map[string][]chunks.Sample{}, seriesSet) + } + + sr, err := wlog.NewSegmentsReader(db.head.wal.Dir()) require.NoError(t, err) - defer querier.Close() + defer func() { + require.NoError(t, sr.Close()) + }() - seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + // Read records from WAL and check for expected count of series and samples. + var ( + r = wlog.NewReader(sr) + dec = record.NewDecoder(labels.NewSymbolTable()) - require.Equal(t, map[string][]chunks.Sample{}, seriesSet) + walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) + + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) + + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + case record.HistogramSamples, record.CustomBucketsHistogramSamples: + var histograms []record.RefHistogramSample + histograms, err = dec.HistogramSamples(rec, histograms) + require.NoError(t, err) + walHistogramCount += len(histograms) + + case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: + var floatHistograms []record.RefFloatHistogramSample + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + require.NoError(t, err) + walFloatHistogramCount += len(floatHistograms) + + default: + } + } + + // Check that only series get stored after calling Rollback. + require.Equal(t, 3, walSeriesCount, "series should have been written to WAL") + require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") + require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") + require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL") + require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL") } func TestDBAppenderAddRef(t *testing.T) { From 7e82bdb75bd2727b5d8d54c746880ef762b02247 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 21 Aug 2025 23:29:19 +0200 Subject: [PATCH 2/5] tsdb: Fix commit order for mixed-typed series Fixes https://github.com/prometheus/prometheus/issues/15177 The basic idea here is to divide the samples to be commited into (sub) batches whenever we detect that the same series receives a sample of a type different from the previous one. We then commit those batches one after another, and we log them to the WAL one after another, so that we hit both birds with the same stone. The cost of the stone is that we have to track the sample type of each series in a map. Given the amount of things we already track in the appender, I hope that it won't make a dent. Note that this even addresses the NHCB special case in the WAL. This does a few other things that I could not resist to pick up on the go: - It adds more zeropool.Pools and uses the existing ones more consistently. My understanding is that this was merely an oversight. Maybe the additional pool usage will compensate for the increased memory demand of the map. - Create the synthetic zero sample for histograms a bit more carefully. So far, we created a sample that always went into its own chunk. Now we create a sample that is compatible enough with the following sample to go into the same chunk. This changed the test results quite a bit. But IMHO it makes much more sense now. - Continuing past efforts, I changed more namings of `Samples` into `Floats` to keep things consistent and less confusing. (Histogram samples are also samples.) I still avoided changing names in other packages. - I added a few shortcuts `h := a.head`, saving many characters. TODOs: - Address @krajorama's TODOs about commit order and staleness handling. Signed-off-by: beorn7 --- tsdb/db_test.go | 26 +-- tsdb/head.go | 3 +- tsdb/head_append.go | 506 ++++++++++++++++++++++++++++---------------- tsdb/head_test.go | 100 ++++----- 4 files changed, 381 insertions(+), 254 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index b05dfb4440..8e649982fc 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4924,10 +4924,7 @@ func TestMetadataAssertInMemoryData(t *testing.T) { } // TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the -// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This -// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed. -// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a -// histogram in a single write request. +// same series when there are multiple encodings. With issue #15177 fixed, this now all works as expected. func TestMultipleEncodingsCommitOrder(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -5001,26 +4998,19 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { s := addSample(app, int64(i), chunkenc.ValFloat) expSamples = append(expSamples, s) } - // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the - // same batch. for i := 110; i < 120; i++ { s := addSample(app, int64(i), chunkenc.ValHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the - // same batch. for i := 120; i < 130; i++ { s := addSample(app, int64(i), chunkenc.ValFloatHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as in-order as their timestamps are greater than the max timestamp for float - // samples in the same batch. for i := 140; i < 150; i++ { s := addSample(app, int64(i), chunkenc.ValFloatHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150 - // because float samples are processed first and these samples are in-order wrt to the float samples in the batch. + // These samples will be marked as out-of-order. for i := 130; i < 135; i++ { s := addSample(app, int64(i), chunkenc.ValFloat) expSamples = append(expSamples, s) @@ -5032,8 +5022,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - // oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO. - verifySamples(100, 150, expSamples, 20) + // oooCount = 5 for the samples 130 to 134. + verifySamples(100, 150, expSamples, 5) // Append and commit some in-order histograms by themselves. app = db.Appender(context.Background()) @@ -5043,8 +5033,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { } require.NoError(t, app.Commit()) - // oooCount remains at 20 as no new OOO samples have been added. - verifySamples(100, 160, expSamples, 20) + // oooCount remains at 5. + verifySamples(100, 160, expSamples, 5) // Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples // with newer timestamps have already been committed. @@ -5072,8 +5062,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - // oooCount = 50 as we've added 30 more OOO samples. - verifySamples(50, 160, expSamples, 50) + // oooCount = 35 as we've added 30 more OOO samples. + verifySamples(50, 160, expSamples, 35) } // TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start diff --git a/tsdb/head.go b/tsdb/head.go index 957cad52e4..45f425cea9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -86,7 +86,8 @@ type Head struct { exemplarMetrics *ExemplarMetrics exemplars ExemplarStorage logger *slog.Logger - appendPool zeropool.Pool[[]record.RefSample] + refSeriesPool zeropool.Pool[[]record.RefSeries] + floatsPool zeropool.Pool[[]record.RefSample] exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef] histogramsPool zeropool.Pool[[]record.RefHistogramSample] floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] diff --git a/tsdb/head_append.go b/tsdb/head_append.go index a5d6e9127a..12cb34c883 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -164,13 +164,6 @@ func (h *Head) Appender(context.Context) storage.Appender { func (h *Head) appender() *headAppender { minValidTime := h.appendableMinValidTime() appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback. - - // Allocate the exemplars buffer only if exemplars are enabled. - var exemplarsBuf []exemplarWithSeriesRef - if h.opts.EnableExemplarStorage { - exemplarsBuf = h.getExemplarBuffer() - } - return &headAppender{ head: h, minValidTime: minValidTime, @@ -178,12 +171,9 @@ func (h *Head) appender() *headAppender { maxt: math.MinInt64, headMaxt: h.MaxTime(), oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(), - samples: h.getAppendBuffer(), - sampleSeries: h.getSeriesBuffer(), - exemplars: exemplarsBuf, - histograms: h.getHistogramBuffer(), - floatHistograms: h.getFloatHistogramBuffer(), - metadata: h.getMetadataBuffer(), + seriesRefs: h.getRefSeriesBuffer(), + series: h.getSeriesBuffer(), + typesInBatch: map[chunks.HeadSeriesRef]sampleType{}, appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, } @@ -213,16 +203,28 @@ func (h *Head) AppendableMinValidTime() (int64, bool) { return h.appendableMinValidTime(), true } -func (h *Head) getAppendBuffer() []record.RefSample { - b := h.appendPool.Get() +func (h *Head) getRefSeriesBuffer() []record.RefSeries { + b := h.refSeriesPool.Get() + if b == nil { + return make([]record.RefSeries, 0, 512) + } + return b +} + +func (h *Head) putRefSeriesBuffer(b []record.RefSeries) { + h.refSeriesPool.Put(b[:0]) +} + +func (h *Head) getFloatBuffer() []record.RefSample { + b := h.floatsPool.Get() if b == nil { return make([]record.RefSample, 0, 512) } return b } -func (h *Head) putAppendBuffer(b []record.RefSample) { - h.appendPool.Put(b[:0]) +func (h *Head) putFloatBuffer(b []record.RefSample) { + h.floatsPool.Put(b[:0]) } func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef { @@ -312,17 +314,30 @@ type exemplarWithSeriesRef struct { exemplar exemplar.Exemplar } -type headAppender struct { - head *Head - minValidTime int64 // No samples below this timestamp are allowed. - mint, maxt int64 - headMaxt int64 // We track it here to not take the lock for every sample appended. - oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. +// sampleType describes sample types we need to distinguish for append batching. +// We need separate types for everything that goes into a different WAL record +// type or into a different chunk encoding. +type sampleType byte - seriesRefs []record.RefSeries // New series records held by this appender. - series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs) - samples []record.RefSample // New float samples held by this appender. - sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). +const ( + stNone sampleType = iota // To mark that the sample type does not matter. + stFloat // All simple floats (counters, gauges, untyped). Goes to `floats`. + stHistogram // Native integer histograms with a standard exponential schema. Goes to `histograms`. + stCustomBucketHistogram // Native integer histograms with custom bucket boundaries. Goes to `histograms`. + stFloatHistogram // Native float histograms. Goes to `floatHistograms`. + stCustomBucketFloatHistogram // Native float histograms with custom bucket boundaries. Goes to `floatHistograms`. +) + +// appendBatch is used to partition all the appended data into batches that are +// "type clean", i.e. every series receives only samples of one type within the +// batch. Types in this regard are defined by the sampleType enum above. +// TODO(beorn7): The same concept could be extended to make sure every series in +// the batch has at most one metadata record. This is currently not implemented +// because it is unclear if it is needed at all. (Maybe we will remove metadata +// records altogether, see issue #15911.) +type appendBatch struct { + floats []record.RefSample // New float samples held by this appender. + floatSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). histograms []record.RefHistogramSample // New histogram samples held by this appender. histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender. @@ -330,6 +345,42 @@ type headAppender struct { metadata []record.RefMetadata // New metadata held by this appender. metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. exemplars []exemplarWithSeriesRef // New exemplars held by this appender. +} + +// close returns all the slices to the pools in Head and nil's them. +func (b *appendBatch) close(h *Head) { + h.putFloatBuffer(b.floats) + b.floats = nil + h.putSeriesBuffer(b.floatSeries) + b.floatSeries = nil + h.putHistogramBuffer(b.histograms) + b.histograms = nil + h.putSeriesBuffer(b.histogramSeries) + b.histogramSeries = nil + h.putFloatHistogramBuffer(b.floatHistograms) + b.floatHistograms = nil + h.putSeriesBuffer(b.floatHistogramSeries) + b.floatHistogramSeries = nil + h.putMetadataBuffer(b.metadata) + b.metadata = nil + h.putSeriesBuffer(b.metadataSeries) + b.metadataSeries = nil + h.putExemplarBuffer(b.exemplars) + b.exemplars = nil +} + +type headAppender struct { + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 + headMaxt int64 // We track it here to not take the lock for every sample appended. + oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. + + seriesRefs []record.RefSeries // New series records held by this appender. + series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs) + batches []*appendBatch // Holds all the other data to append. (In regular cases, there should be only one of these.) + + typesInBatch map[chunks.HeadSeriesRef]sampleType // Which (one) sample type each series holds in the most recent batch. appendID, cleanupAppendIDsBelow uint64 closed bool @@ -403,12 +454,13 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 a.maxt = t } - a.samples = append(a.samples, record.RefSample{ + b := a.getCurrentBatch(stFloat, s.ref) + b.floats = append(b.floats, record.RefSample{ Ref: s.ref, T: t, V: v, }) - a.sampleSeries = append(a.sampleSeries, s) + b.floatSeries = append(b.floatSeries, s) return storage.SeriesRef(s.ref), nil } @@ -448,8 +500,9 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab if ct > a.maxt { a.maxt = ct } - a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) - a.sampleSeries = append(a.sampleSeries, s) + b := a.getCurrentBatch(stFloat, s.ref) + b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) + b.floatSeries = append(b.floatSeries, s) return storage.SeriesRef(s.ref), nil } @@ -476,6 +529,65 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo return s, created, nil } +// getCurrentBatch returns the current batch if it fits the provided sampleType +// for the provided series. Otherwise, it adds a new batch and returns it. +func (a *headAppender) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch { + h := a.head + + newBatch := func() *appendBatch { + b := appendBatch{ + floats: h.getFloatBuffer(), + floatSeries: h.getSeriesBuffer(), + histograms: h.getHistogramBuffer(), + histogramSeries: h.getSeriesBuffer(), + floatHistograms: h.getFloatHistogramBuffer(), + floatHistogramSeries: h.getSeriesBuffer(), + metadata: h.getMetadataBuffer(), + metadataSeries: h.getSeriesBuffer(), + } + + // Allocate the exemplars buffer only if exemplars are enabled. + if h.opts.EnableExemplarStorage { + b.exemplars = h.getExemplarBuffer() + } + clear(a.typesInBatch) + if st != stNone { + a.typesInBatch[s] = st + } + a.batches = append(a.batches, &b) + return &b + } + + // First batch ever. Create it. + if len(a.batches) == 0 { + return newBatch() + } + + // TODO(beorn7): If we ever see that the a.typesInBatch map grows so + // large that it matters for total memory consumption, we could limit + // the batch size here, i.e. cut a new batch even without a type change. + // Something like: + // if len(a.typesInBatch > limit) { + // return newBatch() + // } + + lastBatch := a.batches[len(a.batches)-1] + if st == stNone { + // Type doesn't matter, last batch will always do. + return lastBatch + } + prevST, ok := a.typesInBatch[s] + switch { + case !ok: // New series. Add it to map and return current batch. + a.typesInBatch[s] = st + return lastBatch + case prevST == st: // Old series, same type. Just return batch. + return lastBatch + } + // An old series got a new type. Start new batch. + return newBatch() +} + // appendable checks whether the given sample is valid for appending to the series. // If the sample is valid and in-order, it returns false with no error. // If the sample belongs to the out-of-order chunk, it returns true with no error. @@ -638,7 +750,8 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, return 0, err } - a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) + b := a.getCurrentBatch(stNone, chunks.HeadSeriesRef(ref)) + b.exemplars = append(b.exemplars, exemplarWithSeriesRef{ref, e}) return storage.SeriesRef(s.ref), nil } @@ -707,12 +820,17 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } return 0, err } - a.histograms = append(a.histograms, record.RefHistogramSample{ + st := stHistogram + if h.UsesCustomBuckets() { + st = stCustomBucketHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.histograms = append(b.histograms, record.RefHistogramSample{ Ref: s.ref, T: t, H: h, }) - a.histogramSeries = append(a.histogramSeries, s) + b.histogramSeries = append(b.histogramSeries, s) case fh != nil: s.Lock() @@ -742,12 +860,17 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } return 0, err } - a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ + st := stFloatHistogram + if fh.UsesCustomBuckets() { + st = stCustomBucketFloatHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ Ref: s.ref, T: t, FH: fh, }) - a.floatHistogramSeries = append(a.floatHistogramSeries, s) + b.floatHistogramSeries = append(b.floatHistogramSeries, s) } if t < a.mint { @@ -784,6 +907,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l zeroHistogram := &histogram.Histogram{ // The CTZeroSample represents a counter reset by definition. CounterResetHint: histogram.CounterReset, + // Replicate other fields to avoid needless chunk creation. + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + CustomValues: h.CustomValues, } s.Lock() @@ -815,16 +942,25 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l s.pendingCommit = true s.Unlock() - a.histograms = append(a.histograms, record.RefHistogramSample{ + st := stHistogram + if h.UsesCustomBuckets() { + st = stCustomBucketHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.histograms = append(b.histograms, record.RefHistogramSample{ Ref: s.ref, T: ct, H: zeroHistogram, }) - a.histogramSeries = append(a.histogramSeries, s) + b.histogramSeries = append(b.histogramSeries, s) case fh != nil: zeroFloatHistogram := &histogram.FloatHistogram{ // The CTZeroSample represents a counter reset by definition. CounterResetHint: histogram.CounterReset, + // Replicate other fields to avoid needless chunk creation. + Schema: fh.Schema, + ZeroThreshold: fh.ZeroThreshold, + CustomValues: fh.CustomValues, } s.Lock() @@ -855,12 +991,17 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l s.pendingCommit = true s.Unlock() - a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ + st := stFloatHistogram + if fh.UsesCustomBuckets() { + st = stCustomBucketFloatHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ Ref: s.ref, T: ct, FH: zeroFloatHistogram, }) - a.floatHistogramSeries = append(a.floatHistogramSeries, s) + b.floatHistogramSeries = append(b.floatHistogramSeries, s) } if ct > a.maxt { @@ -889,13 +1030,14 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, s.Unlock() if hasNewMetadata { - a.metadata = append(a.metadata, record.RefMetadata{ + b := a.getCurrentBatch(stNone, s.ref) + b.metadata = append(b.metadata, record.RefMetadata{ Ref: s.ref, Type: record.GetMetricType(meta.Type), Unit: meta.Unit, Help: meta.Help, }) - a.metadataSeries = append(a.metadataSeries, s) + b.metadataSeries = append(b.metadataSeries, s) } return ref, nil @@ -932,66 +1074,68 @@ func (a *headAppender) log() error { return fmt.Errorf("log series: %w", err) } } - if len(a.metadata) > 0 { - rec = enc.Metadata(a.metadata, buf) - buf = rec[:0] + for _, b := range a.batches { + if len(b.metadata) > 0 { + rec = enc.Metadata(b.metadata, buf) + buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log metadata: %w", err) - } - } - if len(a.samples) > 0 { - rec = enc.Samples(a.samples, buf) - buf = rec[:0] - - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log samples: %w", err) - } - } - if len(a.histograms) > 0 { - var customBucketsHistograms []record.RefHistogramSample - rec, customBucketsHistograms = enc.HistogramSamples(a.histograms, buf) - buf = rec[:0] - if len(rec) > 0 { if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log histograms: %w", err) + return fmt.Errorf("log metadata: %w", err) } } + if len(b.floats) > 0 { + rec = enc.Samples(b.floats, buf) + buf = rec[:0] - if len(customBucketsHistograms) > 0 { - rec = enc.CustomBucketsHistogramSamples(customBucketsHistograms, buf) if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log custom buckets histograms: %w", err) + return fmt.Errorf("log samples: %w", err) } } - } - if len(a.floatHistograms) > 0 { - var customBucketsFloatHistograms []record.RefFloatHistogramSample - rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(a.floatHistograms, buf) - buf = rec[:0] - if len(rec) > 0 { - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log float histograms: %w", err) + if len(b.histograms) > 0 { + var customBucketsHistograms []record.RefHistogramSample + rec, customBucketsHistograms = enc.HistogramSamples(b.histograms, buf) + buf = rec[:0] + if len(rec) > 0 { + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log histograms: %w", err) + } + } + + if len(customBucketsHistograms) > 0 { + rec = enc.CustomBucketsHistogramSamples(customBucketsHistograms, buf) + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log custom buckets histograms: %w", err) + } } } + if len(b.floatHistograms) > 0 { + var customBucketsFloatHistograms []record.RefFloatHistogramSample + rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(b.floatHistograms, buf) + buf = rec[:0] + if len(rec) > 0 { + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log float histograms: %w", err) + } + } - if len(customBucketsFloatHistograms) > 0 { - rec = enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, buf) - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log custom buckets float histograms: %w", err) + if len(customBucketsFloatHistograms) > 0 { + rec = enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, buf) + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log custom buckets float histograms: %w", err) + } } } - } - // Exemplars should be logged after samples (float/native histogram/etc), - // otherwise it might happen that we send the exemplars in a remote write - // batch before the samples, which in turn means the exemplar is rejected - // for missing series, since series are created due to samples. - if len(a.exemplars) > 0 { - rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf) - buf = rec[:0] + // Exemplars should be logged after samples (float/native histogram/etc), + // otherwise it might happen that we send the exemplars in a remote write + // batch before the samples, which in turn means the exemplar is rejected + // for missing series, since series are created due to samples. + if len(b.exemplars) > 0 { + rec = enc.Exemplars(exemplarsForEncoding(b.exemplars), buf) + buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log exemplars: %w", err) + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log exemplars: %w", err) + } } } return nil @@ -1040,10 +1184,10 @@ type appenderCommitContext struct { enc record.Encoder } -// commitExemplars adds all exemplars from headAppender to the head's exemplar storage. -func (a *headAppender) commitExemplars() { +// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage. +func (a *headAppender) commitExemplars(b *appendBatch) { // No errors logging to WAL, so pass the exemplars along to the in memory storage. - for _, e := range a.exemplars { + for _, e := range b.exemplars { s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref)) if s == nil { // This is very unlikely to happen, but we have seen it in the wild. @@ -1147,9 +1291,9 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld } } -// commitSamples processes and commits the samples in the headAppender to the series. -// It handles both in-order and out-of-order samples, updating the appenderCommitContext -// with the results of the append operations. +// commitFloats processes and commits the samples in the provided batch to the +// series. It handles both in-order and out-of-order samples, updating the +// appenderCommitContext with the results of the append operations. // // The function iterates over the samples in the headAppender and attempts to append each sample // to its corresponding series. It handles various error cases such as out-of-order samples, @@ -1166,12 +1310,12 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld // operations on the series after appending the samples. // // There are also specific functions to commit histograms and float histograms. -func (a *headAppender) commitSamples(acc *appenderCommitContext) { +func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) { var ok, chunkCreated bool var series *memSeries - for i, s := range a.samples { - series = a.sampleSeries[i] + for i, s := range b.floats { + series = b.floatSeries[i] series.Lock() oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -1261,13 +1405,13 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { } } -// For details on the commitHistograms function, see the commitSamples docs. -func (a *headAppender) commitHistograms(acc *appenderCommitContext) { +// For details on the commitHistograms function, see the commitFloats docs. +func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitContext) { var ok, chunkCreated bool var series *memSeries - for i, s := range a.histograms { - series = a.histogramSeries[i] + for i, s := range b.histograms { + series = b.histogramSeries[i] series.Lock() oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -1361,13 +1505,13 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { } } -// For details on the commitFloatHistograms function, see the commitSamples docs. -func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { +// For details on the commitFloatHistograms function, see the commitFloats docs. +func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) { var ok, chunkCreated bool var series *memSeries - for i, s := range a.floatHistograms { - series = a.floatHistogramSeries[i] + for i, s := range b.floatHistograms { + series = b.floatHistogramSeries[i] series.Lock() oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -1461,14 +1605,14 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { } } -// commitMetadata commits the metadata for each series in the headAppender. +// commitMetadata commits the metadata for each series in the provided batch. // It iterates over the metadata slice and updates the corresponding series // with the new metadata information. The series is locked during the update // to ensure thread safety. -func (a *headAppender) commitMetadata() { +func commitMetadata(b *appendBatch) { var series *memSeries - for i, m := range a.metadata { - series = a.metadataSeries[i] + for i, m := range b.metadata { + series = b.metadataSeries[i] series.Lock() series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help} series.Unlock() @@ -1489,75 +1633,80 @@ func (a *headAppender) Commit() (err error) { if a.closed { return ErrAppenderClosed } - defer func() { a.closed = true }() + + h := a.head + + defer func() { + h.putRefSeriesBuffer(a.seriesRefs) + h.putSeriesBuffer(a.series) + a.closed = true + }() if err := a.log(); err != nil { _ = a.Rollback() // Most likely the same error will happen again. return fmt.Errorf("write to WAL: %w", err) } - if a.head.writeNotified != nil { - a.head.writeNotified.Notify() + if h.writeNotified != nil { + h.writeNotified.Notify() } - a.commitExemplars() - - defer a.head.metrics.activeAppenders.Dec() - defer a.head.putAppendBuffer(a.samples) - defer a.head.putSeriesBuffer(a.sampleSeries) - defer a.head.putExemplarBuffer(a.exemplars) - defer a.head.putHistogramBuffer(a.histograms) - defer a.head.putFloatHistogramBuffer(a.floatHistograms) - defer a.head.putMetadataBuffer(a.metadata) - defer a.head.iso.closeAppend(a.appendID) - acc := &appenderCommitContext{ - floatsAppended: len(a.samples), - histogramsAppended: len(a.histograms) + len(a.floatHistograms), - inOrderMint: math.MaxInt64, - inOrderMaxt: math.MinInt64, - oooMinT: math.MaxInt64, - oooMaxT: math.MinInt64, - oooCapMax: a.head.opts.OutOfOrderCapMax.Load(), + inOrderMint: math.MaxInt64, + inOrderMaxt: math.MinInt64, + oooMinT: math.MaxInt64, + oooMaxT: math.MinInt64, + oooCapMax: h.opts.OutOfOrderCapMax.Load(), appendChunkOpts: chunkOpts{ - chunkDiskMapper: a.head.chunkDiskMapper, - chunkRange: a.head.chunkRange.Load(), - samplesPerChunk: a.head.opts.SamplesPerChunk, + chunkDiskMapper: h.chunkDiskMapper, + chunkRange: h.chunkRange.Load(), + samplesPerChunk: h.opts.SamplesPerChunk, }, } + for _, b := range a.batches { + acc.floatsAppended += len(b.floats) + acc.histogramsAppended += len(b.histograms) + len(b.floatHistograms) + a.commitExemplars(b) + defer b.close(h) + } + defer h.metrics.activeAppenders.Dec() + defer h.iso.closeAppend(a.appendID) + defer func() { for i := range acc.oooRecords { - a.head.putBytesBuffer(acc.oooRecords[i][:0]) + h.putBytesBuffer(acc.oooRecords[i][:0]) } }() - a.commitSamples(acc) - a.commitHistograms(acc) - a.commitFloatHistograms(acc) - a.commitMetadata() + for _, b := range a.batches { + a.commitFloats(b, acc) + a.commitHistograms(b, acc) + a.commitFloatHistograms(b, acc) + commitMetadata(b) + } // Unmark all series as pending commit after all samples have been committed. a.unmarkCreatedSeriesAsPendingCommit() - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected)) - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected)) - a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected)) - a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected)) - a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended)) - a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended)) - a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted)) - a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted)) - a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt) - a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT) + h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected)) + h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected)) + h.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected)) + h.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected)) + h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended)) + h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended)) + h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted)) + h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted)) + h.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt) + h.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT) acc.collectOOORecords(a) - if a.head.wbl != nil { - if err := a.head.wbl.Log(acc.oooRecords...); err != nil { + if h.wbl != nil { + if err := h.wbl.Log(acc.oooRecords...); err != nil { // TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging // until we have found what samples become OOO. We can try having a metric for this failure. // Returning the error here is not correct because we have already put the samples into the memory, // hence the append/insert was a success. - a.head.logger.Error("Failed to log out of order samples into the WAL", "err", err) + h.logger.Error("Failed to log out of order samples into the WAL", "err", err) } } return nil @@ -2007,37 +2156,36 @@ func (a *headAppender) Rollback() (err error) { if a.closed { return ErrAppenderClosed } - defer func() { a.closed = true }() - defer a.head.metrics.activeAppenders.Dec() - defer a.head.iso.closeAppend(a.appendID) - defer a.head.putSeriesBuffer(a.sampleSeries) - defer a.unmarkCreatedSeriesAsPendingCommit() + h := a.head + defer func() { + a.unmarkCreatedSeriesAsPendingCommit() + h.iso.closeAppend(a.appendID) + h.metrics.activeAppenders.Dec() + a.closed = true + h.putRefSeriesBuffer(a.seriesRefs) + h.putSeriesBuffer(a.series) + }() var series *memSeries - for i := range a.samples { - series = a.sampleSeries[i] - series.Lock() - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() + fmt.Println("ROLLBACK") + for _, b := range a.batches { + for i := range b.floats { + series = b.floatSeries[i] + series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } + for i := range b.histograms { + series = b.histogramSeries[i] + series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } + b.close(h) } - for i := range a.histograms { - series = a.histogramSeries[i] - series.Lock() - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } - a.head.putAppendBuffer(a.samples) - a.head.putExemplarBuffer(a.exemplars) - a.head.putHistogramBuffer(a.histograms) - a.head.putFloatHistogramBuffer(a.floatHistograms) - a.head.putMetadataBuffer(a.metadata) - a.samples = nil - a.exemplars = nil - a.histograms = nil - a.metadata = nil - + a.batches = a.batches[:0] // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. return a.log() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8fe14c35f9..37aef96a66 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5336,8 +5336,6 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { samples []chunks.Sample expChunks int err error - // If this is empty, samples above will be taken instead of this. - addToExp []chunks.Sample }{ // Histograms that end up in the expected samples are copied here so that we // can independently set the CounterResetHint later. @@ -5377,43 +5375,29 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { samples: []chunks.Sample{sample{t: 100, fh: floatHists[4].Copy()}}, err: storage.ErrOutOfOrderSample, }, + // The three next tests all failed before #15177 was fixed. { - // Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also - // verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order. samples: []chunks.Sample{ sample{t: 400, f: 4}, - sample{t: 500, h: hists[5]}, // This won't be committed. + sample{t: 500, h: hists[5]}, sample{t: 600, f: 6}, }, - addToExp: []chunks.Sample{ - sample{t: 400, f: 4}, - sample{t: 600, f: 6}, - }, - expChunks: 7, // Only 1 new chunk for float64. + expChunks: 9, // Each of the three samples above creates a new chunk because the type changes. }, { - // Here the histogram is appended at the end, hence the first histogram is out of order. samples: []chunks.Sample{ - sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first. + sample{t: 700, h: hists[7]}, sample{t: 800, f: 8}, sample{t: 900, h: hists[9]}, }, - addToExp: []chunks.Sample{ - sample{t: 800, f: 8}, - sample{t: 900, h: hists[9].Copy()}, - }, - expChunks: 8, // float64 added to old chunk, only 1 new for histograms. + expChunks: 12, // Again each sample creates a new chunk. }, { - // Float histogram is appended at the end. samples: []chunks.Sample{ - sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram. + sample{t: 1000, fh: floatHists[7]}, sample{t: 1100, h: hists[9]}, }, - addToExp: []chunks.Sample{ - sample{t: 1100, h: hists[9].Copy()}, - }, - expChunks: 8, + expChunks: 14, // Even changes between float and integer histogram create new chunks. }, } @@ -5431,11 +5415,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { if a.err == nil { require.NoError(t, app.Commit()) - if len(a.addToExp) > 0 { - expResult = append(expResult, a.addToExp...) - } else { - expResult = append(expResult, a.samples...) - } + expResult = append(expResult, a.samples...) checkExpChunks(a.expChunks) } else { require.NoError(t, app.Rollback()) @@ -6751,7 +6731,27 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing func TestHeadAppender_AppendCT(t *testing.T) { testHistogram := tsdbutil.GenerateTestHistogram(1) + testHistogram.CounterResetHint = histogram.NotCounterReset testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1) + testFloatHistogram.CounterResetHint = histogram.NotCounterReset + // TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the + // following two zero histograms should be histogram.CounterReset. + testZeroHistogram := &histogram.Histogram{ + Schema: testHistogram.Schema, + ZeroThreshold: testHistogram.ZeroThreshold, + PositiveSpans: testHistogram.PositiveSpans, + NegativeSpans: testHistogram.NegativeSpans, + PositiveBuckets: []int64{0, 0, 0, 0}, + NegativeBuckets: []int64{0, 0, 0, 0}, + } + testZeroFloatHistogram := &histogram.FloatHistogram{ + Schema: testFloatHistogram.Schema, + ZeroThreshold: testFloatHistogram.ZeroThreshold, + PositiveSpans: testFloatHistogram.PositiveSpans, + NegativeSpans: testFloatHistogram.NegativeSpans, + PositiveBuckets: []float64{0, 0, 0, 0}, + NegativeBuckets: []float64{0, 0, 0, 0}, + } type appendableSamples struct { ts int64 fSample float64 @@ -6783,12 +6783,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, h: testHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - hNoCounterReset := *testHistogram - hNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &hNoCounterReset}, + sample{t: 101, h: testHistogram}, } }(), }, @@ -6799,12 +6797,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, fh: testFloatHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - fhNoCounterReset := *testFloatHistogram - fhNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &fhNoCounterReset}, + sample{t: 101, fh: testFloatHistogram}, } }(), }, @@ -6827,12 +6823,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, h: testHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - hNoCounterReset := *testHistogram - hNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &hNoCounterReset}, + sample{t: 101, h: testHistogram}, } }(), }, @@ -6843,12 +6837,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, fh: testFloatHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - fhNoCounterReset := *testFloatHistogram - fhNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &fhNoCounterReset}, + sample{t: 101, fh: testFloatHistogram}, } }(), }, @@ -6872,9 +6864,9 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 102, h: testHistogram, ct: 101}, }, expectedSamples: []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.UnknownCounterReset}}, + sample{t: 101, h: testZeroHistogram}, sample{t: 102, h: testHistogram}, }, }, @@ -6885,9 +6877,9 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 102, fh: testFloatHistogram, ct: 101}, }, expectedSamples: []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.UnknownCounterReset}}, + sample{t: 101, fh: testZeroFloatHistogram}, sample{t: 102, fh: testFloatHistogram}, }, }, @@ -6910,12 +6902,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, h: testHistogram, ct: 100}, }, expectedSamples: func() []chunks.Sample { - hNoCounterReset := *testHistogram - hNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &hNoCounterReset}, + sample{t: 101, h: testHistogram}, } }(), }, @@ -6926,12 +6916,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, fh: testFloatHistogram, ct: 100}, }, expectedSamples: func() []chunks.Sample { - fhNoCounterReset := *testFloatHistogram - fhNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &fhNoCounterReset}, + sample{t: 101, fh: testFloatHistogram}, } }(), }, From 385d2800c99a50be43ffd1fcc87fd0d872fe1920 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 4 Sep 2025 19:53:16 +0200 Subject: [PATCH 3/5] promqltest: Add regression test for mixed-sample commit order Regression test for: - https://github.com/prometheus/prometheus/issues/14172 - https://github.com/prometheus/prometheus/issues/15177 Test cases are by @krajorama, taken from commit b48bc9dc7e2ac553528763297cca73014357d542 . Signed-off-by: beorn7 --- promql/promqltest/testdata/native_histograms.test | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/promql/promqltest/testdata/native_histograms.test b/promql/promqltest/testdata/native_histograms.test index f0c510bd50..0aac43e0ad 100644 --- a/promql/promqltest/testdata/native_histograms.test +++ b/promql/promqltest/testdata/native_histograms.test @@ -1677,3 +1677,18 @@ eval instant at 1m histogram_count(histogram unless histogram_quantile(0.5, hist eval instant at 1m histogram_quantile(0.5, histogram unless histogram_count(histogram) == 0) {} 3.1748021039363987 +clear + +# Regression test for: +# https://github.com/prometheus/prometheus/issues/14172 +# https://github.com/prometheus/prometheus/issues/15177 +load 1m + mixed_metric1 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}} + mixed_metric2 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}} + +# The order of the float vs native histograms is preserved. +eval range from 0 to 8m step 1m mixed_metric1 + mixed_metric1{} 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}} {{schema:0 sum:18 count:10 buckets:[3 4 3]}} + +eval range from 0 to 5m step 1m mixed_metric2 + mixed_metric2 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} {{count:6 sum:8 buckets:[1 4 1]}} From b1fbf4f1e2abbfbd9b65a9e9ca7bb4bb48d11b8c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 9 Sep 2025 18:37:49 +0200 Subject: [PATCH 4/5] tsdb: Refactor staleness marker handling With the fixed commit order, we can now handle the conversion of float staleness markers to histogram staleness markers in a more direct way. Signed-off-by: beorn7 --- tsdb/head_append.go | 136 +++++++++++++++++++++++++++++--------------- 1 file changed, 91 insertions(+), 45 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 12cb34c883..a3bdee9684 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -408,21 +408,27 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } } - s.Lock() if value.IsStaleNaN(v) { - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we could do this conversion - // in commit. This code should move into Commit(). - switch { - case s.lastHistogramValue != nil: - s.Unlock() + // If we have added a sample before with this same appender, we + // can check the previously used type and turn a stale float + // sample into a stale histogram sample or stale float histogram + // sample as appropriate. This prevents an unnecessary creation + // of a new batch. However, since other appenders might append + // to the same series concurrently, this is not perfect but just + // an optimization for the more likely case. + switch a.typesInBatch[s.ref] { + case stHistogram, stCustomBucketHistogram: return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) - case s.lastFloatHistogramValue != nil: - s.Unlock() + case stFloatHistogram, stCustomBucketFloatHistogram: return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v}) } + // Note that a series reference not yet in the map will come out + // as stNone, but since we do not handle that case separately, + // we do not need to check for the difference between "unknown + // series" and "known series with stNone". } + s.Lock() defer s.Unlock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -780,11 +786,10 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } } - var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, created, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -793,14 +798,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels switch { case h != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = &histogram.Histogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -833,14 +830,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels b.histogramSeries = append(b.histogramSeries, s) case fh != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = &histogram.FloatHistogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -892,11 +881,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l return 0, storage.ErrCTNewerThanSample } - var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, created, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -913,14 +901,6 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l CustomValues: h.CustomValues, } s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = zeroHistogram - } - // For CTZeroSamples OOO is not allowed. // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -963,14 +943,6 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l CustomValues: fh.CustomValues, } s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = zeroFloatHistogram - } - // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples. if err != nil { @@ -1318,6 +1290,60 @@ func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) series = b.floatSeries[i] series.Lock() + if value.IsStaleNaN(s.V) { + // If a float staleness marker had been appended for a + // series that got a histogram or float histogram + // appended before via this same appender, it would not + // show up here because we had already converted it. We + // end up here for two reasons: (1) This is the very + // first sample for this series appended via this + // appender. (2) A float sample was appended to this + // series before via this same appender. + // + // In either case, we need to check the previous sample + // in the memSeries to append the appropriately typed + // staleness marker. This is obviously so in case (1). + // In case (2), we would usually expect a float sample + // as the previous sample, but there might be concurrent + // appends that have added a histogram sample in the + // meantime. (This will probably lead to OOO shenanigans + // anyway, but that's a different story.) + // + // If the last sample in the memSeries is indeed a + // float, we don't have to do anything special here and + // just go on with the normal commit for a float sample. + // However, if the last sample in the memSeries is a + // histogram or float histogram, we have to convert the + // staleness marker to a histogram (or float histogram, + // respectively), and just add it at the end of the + // histograms (or float histograms) in the same batch, + // to be committed later in commitHistograms (or + // commitFloatHistograms). The latter is fine because we + // know there is no other histogram (or float histogram) + // sample for this same series in this same batch + // (because any such sample would have triggered a new + // batch). + switch { + case series.lastHistogramValue != nil: + b.histograms = append(b.histograms, record.RefHistogramSample{ + Ref: series.ref, + T: s.T, + H: &histogram.Histogram{Sum: s.V}, + }) + b.histogramSeries = append(b.histogramSeries, series) + series.Unlock() + continue + case series.lastFloatHistogramValue != nil: + b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ + Ref: series.ref, + T: s.T, + FH: &histogram.FloatHistogram{Sum: s.V}, + }) + b.floatHistogramSeries = append(b.floatHistogramSeries, series) + series.Unlock() + continue + } + } oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected) @@ -1414,6 +1440,15 @@ func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitConte series = b.histogramSeries[i] series.Lock() + // At this point, we could encounter a histogram staleness + // marker that should better be a float staleness marker or a + // float histogram staleness marker. This can only happen with + // concurrent appenders appending to the same series _and_ doing + // so in a mixed-type scenario. This case is expected to be very + // rare, so we do not bother here to convert the staleness + // marker. The worst case is that we need to cut a new chunk + // just for the staleness marker. + oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) @@ -1514,6 +1549,15 @@ func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommit series = b.floatHistogramSeries[i] series.Lock() + // At this point, we could encounter a float histogram staleness + // marker that should better be a float staleness marker or an + // integer histogram staleness marker. This can only happen with + // concurrent appenders appending to the same series _and_ doing + // so in a mixed-type scenario. This case is expected to be very + // rare, so we do not bother here to convert the staleness + // marker. The worst case is that we need to cut a new chunk + // just for the staleness marker. + oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) @@ -1680,6 +1724,8 @@ func (a *headAppender) Commit() (err error) { }() for _, b := range a.batches { + // Do not change the order of these calls. The staleness marker + // handling depends on it. a.commitFloats(b, acc) a.commitHistograms(b, acc) a.commitFloatHistograms(b, acc) From bd0bf66f318942d44a8731c6f0f65202946f8bc2 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 17 Sep 2025 19:17:56 +0200 Subject: [PATCH 5/5] tsdb: Include floatHistograms in headAppender.Rollback() Signed-off-by: beorn7 --- tsdb/head_append.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index a3bdee9684..80357fdf13 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -2229,6 +2229,13 @@ func (a *headAppender) Rollback() (err error) { series.pendingCommit = false series.Unlock() } + for i := range b.floatHistograms { + series = b.floatHistogramSeries[i] + series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } b.close(h) } a.batches = a.batches[:0]