From 8a9f4ff4404e4e7e6c29616ce269633fd7ff2253 Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Thu, 14 May 2026 22:22:52 +0200 Subject: [PATCH] fix(tsdb): chunk overflow on ooo query (#18692) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(tsdb): chunk overflow on ooo query Protect against and fix overflow of chunks with more than 2^16-1 samples in case we're recoding chunks due to for example in-order and ooo samples overlap during compaction or query. Signed-off-by: György Krajcsovits --- tsdb/chunkenc/chunk.go | 7 ++ tsdb/chunkenc/chunk_test.go | 22 +++++ tsdb/chunkenc/float_histogram.go | 8 +- tsdb/chunkenc/float_histogram_test.go | 4 + tsdb/chunkenc/histogram.go | 8 +- tsdb/chunkenc/histogram_test.go | 4 + tsdb/chunkenc/xor.go | 2 + tsdb/chunkenc/xor2.go | 3 + tsdb/chunkenc/xor2_test.go | 4 + tsdb/chunkenc/xor_test.go | 4 + tsdb/head_append.go | 7 +- tsdb/querier.go | 14 ++- tsdb/querier_test.go | 128 ++++++++++++++++++++++++++ 13 files changed, 206 insertions(+), 9 deletions(-) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 3a405e8cf7..0c233aa552 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -57,6 +57,9 @@ func IsValidEncoding(e Encoding) bool { const ( // MaxBytesPerXORChunk is the maximum size an XOR chunk can be. MaxBytesPerXORChunk = 1024 + // MaxBytesPerXORChunkBeforeAppend is used for cutting new XOR chunks, to prevent going over MaxBytesPerXORChunk + // as a hard limit. We assume the next sample will be a maximally-sized sample (19 bytes). + MaxBytesPerXORChunkBeforeAppend = MaxBytesPerXORChunk - 19 // TargetBytesPerHistogramChunk sets a size target for each histogram chunk. TargetBytesPerHistogramChunk = 1024 // MinSamplesPerHistogramChunk sets a minimum sample count for histogram chunks. This is desirable because a single @@ -106,9 +109,13 @@ type Iterable interface { // Appender adds sample with start timestamp, timestamp, and value to a chunk. type Appender interface { + // Append may panic if the chunk is already at full capacity. It is the + // responsibility of the caller to decide how to cut new chunks before that. Append(st, t int64, v float64) // AppendHistogram and AppendFloatHistogram append a histogram sample to a histogram or float histogram chunk. + // Appending may panic if the chunk is already at full capacity. It is the + // responsibility of the caller to decide how to cut new chunks before that. // Appending a histogram may require creating a completely new chunk or recoding (changing) the current chunk. // The Appender prev is used to determine if there is a counter reset between the previous Appender and the current Appender. // The Appender prev is optional and only taken into account when the first sample is being appended. diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 4e19f15b42..98073a3027 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -20,6 +20,8 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" ) type triple struct { @@ -218,3 +220,23 @@ func (c fakeChunk) Encoding() Encoding { func (c fakeChunk) Reset([]byte) { c.t.Fatal("Reset should not be called") } + +func testChunkOverFlowPanics(t *testing.T, e Encoding, vt ValueType) { + chunk, err := NewEmptyChunk(e) + require.NoError(t, err) + app, err := chunk.Appender() + require.NoError(t, err) + + require.PanicsWithValue(t, "chunk capacity exceeded", func() { + for i := range int64(1000000) { + switch vt { + case ValFloat: + app.Append(0, i, float64(i)) + case ValHistogram: + app.AppendHistogram(nil, 0, i, &histogram.Histogram{Count: uint64(i), ZeroThreshold: 1e-128, ZeroCount: uint64(i)}, true) + case ValFloatHistogram: + app.AppendFloatHistogram(nil, 0, i, &histogram.FloatHistogram{Count: float64(i), ZeroThreshold: 1e-128, ZeroCount: float64(i)}, true) + } + } + }) +} diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 6af2fa68e2..da2b22dc5e 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -687,7 +687,13 @@ func (*FloatHistogramAppender) AppendHistogram(*HistogramAppender, int64, int64, } func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppender, _, t int64, h *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) { - if a.NumSamples() == 0 { + numSamples := a.NumSamples() + + if numSamples == math.MaxUint16 { + panic("chunk capacity exceeded") + } + + if numSamples == 0 { a.appendFloatHistogram(t, h) if h.CounterResetHint == histogram.GaugeType { a.setCounterResetHeader(GaugeType) diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index cbeb3171ce..6eb43395d1 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -1522,3 +1522,7 @@ func TestFloatHistogramIteratorReduceSchema(t *testing.T) { }) } } + +func TestFloatHistogramChunkOverFlowPanics(t *testing.T) { + testChunkOverFlowPanics(t, EncFloatHistogram, ValFloatHistogram) +} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 4e77f387d3..9b6f28e9f3 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -739,7 +739,13 @@ func (*HistogramAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, i } func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, _, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) { - if a.NumSamples() == 0 { + numSamples := a.NumSamples() + + if numSamples == math.MaxUint16 { + panic("chunk capacity exceeded") + } + + if numSamples == 0 { a.appendHistogram(t, h) if h.CounterResetHint == histogram.GaugeType { a.setCounterResetHeader(GaugeType) diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 6ac8500e64..37def5f0c9 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -1881,3 +1881,7 @@ func TestHistogramIteratorReduceSchema(t *testing.T) { }) } } + +func TestHistogramChunkOverFlowPanics(t *testing.T) { + testChunkOverFlowPanics(t, EncHistogram, ValHistogram) +} diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 5a9a59dc22..f381a82741 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -177,6 +177,8 @@ func (a *xorAppender) Append(_, t int64, v float64) { } a.writeVDelta(v) + case math.MaxUint16: + panic("chunk capacity exceeded") default: tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta) diff --git a/tsdb/chunkenc/xor2.go b/tsdb/chunkenc/xor2.go index d25cf4ca4a..bf8bb9103d 100644 --- a/tsdb/chunkenc/xor2.go +++ b/tsdb/chunkenc/xor2.go @@ -251,6 +251,9 @@ func (a *xor2Appender) Append(st, t int64, v float64) { putVarbitIntFast(a.b, stDiff) } + case math.MaxUint16: + panic("chunk capacity exceeded") + default: tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta) diff --git a/tsdb/chunkenc/xor2_test.go b/tsdb/chunkenc/xor2_test.go index 7ac6927a38..2bba534669 100644 --- a/tsdb/chunkenc/xor2_test.go +++ b/tsdb/chunkenc/xor2_test.go @@ -630,3 +630,7 @@ func TestXOR2DecodeFunctionsAcrossPadding(t *testing.T) { }, (*xor2Iterator).decodeNewLeadingTrailing) }) } + +func TestXOR2ChunkOverFlowPanics(t *testing.T) { + testChunkOverFlowPanics(t, EncXOR2, ValFloat) +} diff --git a/tsdb/chunkenc/xor_test.go b/tsdb/chunkenc/xor_test.go index b30c65283d..aaeb0654eb 100644 --- a/tsdb/chunkenc/xor_test.go +++ b/tsdb/chunkenc/xor_test.go @@ -40,3 +40,7 @@ func BenchmarkXorRead(b *testing.B) { _, _ = ts, v } } + +func TestXORChunkOverFlowPanics(t *testing.T) { + testChunkOverFlowPanics(t, EncXOR, ValFloat) +} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 6621363612..3bb9de9584 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1984,11 +1984,6 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { - // We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut - // a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a - // maximally-sized sample (19 bytes). - const maxBytesPerXORChunk = chunkenc.MaxBytesPerXORChunk - 19 - c = s.headChunks if c == nil { @@ -2007,7 +2002,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts } // Check the chunk size, unless we just created it and if the chunk is too large, cut a new one. - if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk { + if !chunkCreated && len(c.chunk.Bytes()) > chunkenc.MaxBytesPerXORChunkBeforeAppend { c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } diff --git a/tsdb/querier.go b/tsdb/querier.go index 0dc8a9259f..70868d7e55 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -1058,7 +1058,19 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { // not capable. st = p.currDelIter.AtST() needTS := st != 0 - if currentValueType != prevValueType || !hasTS && needTS { + overSizeChunk := func() bool { + switch currentValueType { + case chunkenc.ValFloat: + // In the TSDB head we also take into account the number of samples, but here we want to keep it + // simple and consistent with histograms. Also the size limit is checked before sample limit in + // the head as well. + return len(currentChunk.Bytes()) > chunkenc.MaxBytesPerXORChunkBeforeAppend + case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: + return len(currentChunk.Bytes()) > chunkenc.TargetBytesPerHistogramChunk && currentChunk.NumSamples() > chunkenc.MinSamplesPerHistogramChunk + } + return false + } + if currentValueType != prevValueType || !hasTS && needTS || overSizeChunk() { if prevValueType != chunkenc.ValNone { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 68320aee6e..abda417abf 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -3952,6 +3952,134 @@ func TestQueryWithOneChunkCompletelyDeleted(t *testing.T) { require.Equal(t, 1, seriesCount) } +// TestChunkQuerier_OverlappingInOrderAndOOOChunks verifies the chunks +// returned by the ChunkQuerier when an in-order chunk overlaps with many +// out-of-order chunks. All sample timestamps are distinct. The total +// number of samples is chosen to exceed math.MaxUint16 so that the +// querier must split the merged iterable into multiple output chunks. +func TestChunkQuerier_OverlappingInOrderAndOOOChunks(t *testing.T) { + for _, storeST := range []bool{false, true} { + t.Run(fmt.Sprintf("store-st=%v", storeST), func(t *testing.T) { + for _, tc := range []struct { + name string + valType chunkenc.ValueType + }{ + {"float", chunkenc.ValFloat}, + {"histogram", chunkenc.ValHistogram}, + {"float histogram", chunkenc.ValFloatHistogram}, + } { + t.Run(tc.name, func(t *testing.T) { + testChunkQuerierOverlappingInOrderAndOOOChunks(t, tc.valType, storeST) + }) + } + }) + } +} + +func testChunkQuerierOverlappingInOrderAndOOOChunks(t *testing.T, valType chunkenc.ValueType, storeST bool) { + const ( + oooCapMax = 32 + // Pick more OOO samples than any chunk encoding can hold so the + // querier is forced to cut the merged iterable into multiple chunks. + oooSamplesToAppend = int(math.MaxUint16) + 10 + firstIndex = 0 // Position of in-order sample at the start. + lastIndex = oooSamplesToAppend + 1 // Position of in-order sample at the end to overlap all OOO samples. + ) + + opts := DefaultOptions() + opts.OutOfOrderCapMax = oooCapMax + opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() + opts.EnableSTStorage = storeST + db := newTestDB(t, withOpts(opts)) + db.DisableCompactions() + + lbls := labels.FromStrings("foo", "bar") + + appendSample := func(app storage.Appender, ts int64) error { + switch valType { + case chunkenc.ValFloat: + _, err := app.Append(0, lbls, ts, float64(ts)) + return err + case chunkenc.ValHistogram: + _, err := app.AppendHistogram(0, lbls, ts, tsdbutil.GenerateTestHistogram(ts), nil) + return err + case chunkenc.ValFloatHistogram: + _, err := app.AppendHistogram(0, lbls, ts, nil, tsdbutil.GenerateTestFloatHistogram(ts)) + return err + default: + return fmt.Errorf("unsupported value type: %v", valType) + } + } + + // Append the two in-order samples at the start and end of the range, + // so that the in-order chunk spans the full range that the OOO samples + // will land in. + app := db.Appender(context.Background()) + for _, i := range []int{firstIndex, lastIndex} { + require.NoError(t, appendSample(app, int64(10000+i*10))) + } + require.NoError(t, app.Commit()) + + // Sanity check: the two in-order samples form a single in-memory head + // chunk covering the whole timestamp range, with no m-mapped chunks. + ms, _, err := db.head.getOrCreate(lbls.Hash(), lbls, false) + require.NoError(t, err) + require.NotNil(t, ms.headChunks) + require.Equal(t, 1, ms.headChunks.len()) + require.Nil(t, ms.headChunks.prev) + require.Empty(t, ms.mmappedChunks) + require.Equal(t, int64(10000+firstIndex*10), ms.headChunks.minTime) + require.Equal(t, int64(10000+lastIndex*10), ms.headChunks.maxTime) + require.Equal(t, 2, ms.headChunks.chunk.NumSamples()) + + // Append the OOO samples in the gap between the two in-order samples. + app = db.Appender(context.Background()) + for i := firstIndex + 1; i < lastIndex; i++ { + require.NoError(t, appendSample(app, int64(10000+i*10))) + } + require.NoError(t, app.Commit()) + + // Sanity check: the head holds the expected number of OOO chunks for + // the series. Each m-mapped OOO chunk has oooCapMax samples; whatever + // remains lives in the in-memory head OOO chunk. + require.NotNil(t, ms.ooo) + require.Len(t, ms.ooo.oooMmappedChunks, oooSamplesToAppend/oooCapMax) + require.Equal(t, oooSamplesToAppend%oooCapMax, ms.ooo.oooHeadChunk.chunk.NumSamples()) + + chunkQuerier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") + css := chunkQuerier.Select(context.Background(), false, nil, matcher) + + var seriesCount, chunkCount, sampleCount int + lastTS := int64(math.MinInt64) + for css.Next() { + seriesCount++ + series := css.At() + it := series.Iterator(nil) + for it.Next() { + chunkCount++ + chk := it.At() + cit := chk.Chunk.Iterator(nil) + for vt := cit.Next(); vt != chunkenc.ValNone; vt = cit.Next() { + require.Equal(t, valType, vt) + ts := cit.AtT() + require.Greater(t, ts, lastTS, "timestamps must be strictly increasing across the returned chunks") + lastTS = ts + sampleCount++ + } + require.NoError(t, cit.Err()) + } + require.NoError(t, it.Err()) + } + require.NoError(t, css.Err()) + + require.Equal(t, 1, seriesCount) + require.Greater(t, chunkCount, 1) + require.Equal(t, lastIndex-firstIndex+1, sampleCount) +} + func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { ir := mockReaderOfLabels{}