mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-15 17:46:49 +02:00
fix(tsdb): chunk overflow on ooo query (#18692)
* fix(tsdb): chunk overflow on ooo query Protect against and fix overflow of chunks with more than 2^16-1 samples in case we're recoding chunks due to for example in-order and ooo samples overlap during compaction or query. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
cf4505c6cd
commit
8a9f4ff440
@ -57,6 +57,9 @@ func IsValidEncoding(e Encoding) bool {
|
||||
const (
|
||||
// MaxBytesPerXORChunk is the maximum size an XOR chunk can be.
|
||||
MaxBytesPerXORChunk = 1024
|
||||
// MaxBytesPerXORChunkBeforeAppend is used for cutting new XOR chunks, to prevent going over MaxBytesPerXORChunk
|
||||
// as a hard limit. We assume the next sample will be a maximally-sized sample (19 bytes).
|
||||
MaxBytesPerXORChunkBeforeAppend = MaxBytesPerXORChunk - 19
|
||||
// TargetBytesPerHistogramChunk sets a size target for each histogram chunk.
|
||||
TargetBytesPerHistogramChunk = 1024
|
||||
// MinSamplesPerHistogramChunk sets a minimum sample count for histogram chunks. This is desirable because a single
|
||||
@ -106,9 +109,13 @@ type Iterable interface {
|
||||
|
||||
// Appender adds sample with start timestamp, timestamp, and value to a chunk.
|
||||
type Appender interface {
|
||||
// Append may panic if the chunk is already at full capacity. It is the
|
||||
// responsibility of the caller to decide how to cut new chunks before that.
|
||||
Append(st, t int64, v float64)
|
||||
|
||||
// AppendHistogram and AppendFloatHistogram append a histogram sample to a histogram or float histogram chunk.
|
||||
// Appending may panic if the chunk is already at full capacity. It is the
|
||||
// responsibility of the caller to decide how to cut new chunks before that.
|
||||
// Appending a histogram may require creating a completely new chunk or recoding (changing) the current chunk.
|
||||
// The Appender prev is used to determine if there is a counter reset between the previous Appender and the current Appender.
|
||||
// The Appender prev is optional and only taken into account when the first sample is being appended.
|
||||
|
||||
@ -20,6 +20,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
)
|
||||
|
||||
type triple struct {
|
||||
@ -218,3 +220,23 @@ func (c fakeChunk) Encoding() Encoding {
|
||||
func (c fakeChunk) Reset([]byte) {
|
||||
c.t.Fatal("Reset should not be called")
|
||||
}
|
||||
|
||||
func testChunkOverFlowPanics(t *testing.T, e Encoding, vt ValueType) {
|
||||
chunk, err := NewEmptyChunk(e)
|
||||
require.NoError(t, err)
|
||||
app, err := chunk.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.PanicsWithValue(t, "chunk capacity exceeded", func() {
|
||||
for i := range int64(1000000) {
|
||||
switch vt {
|
||||
case ValFloat:
|
||||
app.Append(0, i, float64(i))
|
||||
case ValHistogram:
|
||||
app.AppendHistogram(nil, 0, i, &histogram.Histogram{Count: uint64(i), ZeroThreshold: 1e-128, ZeroCount: uint64(i)}, true)
|
||||
case ValFloatHistogram:
|
||||
app.AppendFloatHistogram(nil, 0, i, &histogram.FloatHistogram{Count: float64(i), ZeroThreshold: 1e-128, ZeroCount: float64(i)}, true)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -687,7 +687,13 @@ func (*FloatHistogramAppender) AppendHistogram(*HistogramAppender, int64, int64,
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppender, _, t int64, h *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) {
|
||||
if a.NumSamples() == 0 {
|
||||
numSamples := a.NumSamples()
|
||||
|
||||
if numSamples == math.MaxUint16 {
|
||||
panic("chunk capacity exceeded")
|
||||
}
|
||||
|
||||
if numSamples == 0 {
|
||||
a.appendFloatHistogram(t, h)
|
||||
if h.CounterResetHint == histogram.GaugeType {
|
||||
a.setCounterResetHeader(GaugeType)
|
||||
|
||||
@ -1522,3 +1522,7 @@ func TestFloatHistogramIteratorReduceSchema(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFloatHistogramChunkOverFlowPanics(t *testing.T) {
|
||||
testChunkOverFlowPanics(t, EncFloatHistogram, ValFloatHistogram)
|
||||
}
|
||||
|
||||
@ -739,7 +739,13 @@ func (*HistogramAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, i
|
||||
}
|
||||
|
||||
func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, _, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
|
||||
if a.NumSamples() == 0 {
|
||||
numSamples := a.NumSamples()
|
||||
|
||||
if numSamples == math.MaxUint16 {
|
||||
panic("chunk capacity exceeded")
|
||||
}
|
||||
|
||||
if numSamples == 0 {
|
||||
a.appendHistogram(t, h)
|
||||
if h.CounterResetHint == histogram.GaugeType {
|
||||
a.setCounterResetHeader(GaugeType)
|
||||
|
||||
@ -1881,3 +1881,7 @@ func TestHistogramIteratorReduceSchema(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistogramChunkOverFlowPanics(t *testing.T) {
|
||||
testChunkOverFlowPanics(t, EncHistogram, ValHistogram)
|
||||
}
|
||||
|
||||
@ -177,6 +177,8 @@ func (a *xorAppender) Append(_, t int64, v float64) {
|
||||
}
|
||||
|
||||
a.writeVDelta(v)
|
||||
case math.MaxUint16:
|
||||
panic("chunk capacity exceeded")
|
||||
default:
|
||||
tDelta = uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
|
||||
@ -251,6 +251,9 @@ func (a *xor2Appender) Append(st, t int64, v float64) {
|
||||
putVarbitIntFast(a.b, stDiff)
|
||||
}
|
||||
|
||||
case math.MaxUint16:
|
||||
panic("chunk capacity exceeded")
|
||||
|
||||
default:
|
||||
tDelta = uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
|
||||
@ -630,3 +630,7 @@ func TestXOR2DecodeFunctionsAcrossPadding(t *testing.T) {
|
||||
}, (*xor2Iterator).decodeNewLeadingTrailing)
|
||||
})
|
||||
}
|
||||
|
||||
func TestXOR2ChunkOverFlowPanics(t *testing.T) {
|
||||
testChunkOverFlowPanics(t, EncXOR2, ValFloat)
|
||||
}
|
||||
|
||||
@ -40,3 +40,7 @@ func BenchmarkXorRead(b *testing.B) {
|
||||
_, _ = ts, v
|
||||
}
|
||||
}
|
||||
|
||||
func TestXORChunkOverFlowPanics(t *testing.T) {
|
||||
testChunkOverFlowPanics(t, EncXOR, ValFloat)
|
||||
}
|
||||
|
||||
@ -1984,11 +1984,6 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
// This should be called only when appending data.
|
||||
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
// We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut
|
||||
// a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a
|
||||
// maximally-sized sample (19 bytes).
|
||||
const maxBytesPerXORChunk = chunkenc.MaxBytesPerXORChunk - 19
|
||||
|
||||
c = s.headChunks
|
||||
|
||||
if c == nil {
|
||||
@ -2007,7 +2002,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
||||
}
|
||||
|
||||
// Check the chunk size, unless we just created it and if the chunk is too large, cut a new one.
|
||||
if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk {
|
||||
if !chunkCreated && len(c.chunk.Bytes()) > chunkenc.MaxBytesPerXORChunkBeforeAppend {
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
@ -1058,7 +1058,19 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
||||
// not capable.
|
||||
st = p.currDelIter.AtST()
|
||||
needTS := st != 0
|
||||
if currentValueType != prevValueType || !hasTS && needTS {
|
||||
overSizeChunk := func() bool {
|
||||
switch currentValueType {
|
||||
case chunkenc.ValFloat:
|
||||
// In the TSDB head we also take into account the number of samples, but here we want to keep it
|
||||
// simple and consistent with histograms. Also the size limit is checked before sample limit in
|
||||
// the head as well.
|
||||
return len(currentChunk.Bytes()) > chunkenc.MaxBytesPerXORChunkBeforeAppend
|
||||
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
|
||||
return len(currentChunk.Bytes()) > chunkenc.TargetBytesPerHistogramChunk && currentChunk.NumSamples() > chunkenc.MinSamplesPerHistogramChunk
|
||||
}
|
||||
return false
|
||||
}
|
||||
if currentValueType != prevValueType || !hasTS && needTS || overSizeChunk() {
|
||||
if prevValueType != chunkenc.ValNone {
|
||||
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
|
||||
}
|
||||
|
||||
@ -3952,6 +3952,134 @@ func TestQueryWithOneChunkCompletelyDeleted(t *testing.T) {
|
||||
require.Equal(t, 1, seriesCount)
|
||||
}
|
||||
|
||||
// TestChunkQuerier_OverlappingInOrderAndOOOChunks verifies the chunks
|
||||
// returned by the ChunkQuerier when an in-order chunk overlaps with many
|
||||
// out-of-order chunks. All sample timestamps are distinct. The total
|
||||
// number of samples is chosen to exceed math.MaxUint16 so that the
|
||||
// querier must split the merged iterable into multiple output chunks.
|
||||
func TestChunkQuerier_OverlappingInOrderAndOOOChunks(t *testing.T) {
|
||||
for _, storeST := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("store-st=%v", storeST), func(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
valType chunkenc.ValueType
|
||||
}{
|
||||
{"float", chunkenc.ValFloat},
|
||||
{"histogram", chunkenc.ValHistogram},
|
||||
{"float histogram", chunkenc.ValFloatHistogram},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
testChunkQuerierOverlappingInOrderAndOOOChunks(t, tc.valType, storeST)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testChunkQuerierOverlappingInOrderAndOOOChunks(t *testing.T, valType chunkenc.ValueType, storeST bool) {
|
||||
const (
|
||||
oooCapMax = 32
|
||||
// Pick more OOO samples than any chunk encoding can hold so the
|
||||
// querier is forced to cut the merged iterable into multiple chunks.
|
||||
oooSamplesToAppend = int(math.MaxUint16) + 10
|
||||
firstIndex = 0 // Position of in-order sample at the start.
|
||||
lastIndex = oooSamplesToAppend + 1 // Position of in-order sample at the end to overlap all OOO samples.
|
||||
)
|
||||
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = oooCapMax
|
||||
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
||||
opts.EnableSTStorage = storeST
|
||||
db := newTestDB(t, withOpts(opts))
|
||||
db.DisableCompactions()
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
|
||||
appendSample := func(app storage.Appender, ts int64) error {
|
||||
switch valType {
|
||||
case chunkenc.ValFloat:
|
||||
_, err := app.Append(0, lbls, ts, float64(ts))
|
||||
return err
|
||||
case chunkenc.ValHistogram:
|
||||
_, err := app.AppendHistogram(0, lbls, ts, tsdbutil.GenerateTestHistogram(ts), nil)
|
||||
return err
|
||||
case chunkenc.ValFloatHistogram:
|
||||
_, err := app.AppendHistogram(0, lbls, ts, nil, tsdbutil.GenerateTestFloatHistogram(ts))
|
||||
return err
|
||||
default:
|
||||
return fmt.Errorf("unsupported value type: %v", valType)
|
||||
}
|
||||
}
|
||||
|
||||
// Append the two in-order samples at the start and end of the range,
|
||||
// so that the in-order chunk spans the full range that the OOO samples
|
||||
// will land in.
|
||||
app := db.Appender(context.Background())
|
||||
for _, i := range []int{firstIndex, lastIndex} {
|
||||
require.NoError(t, appendSample(app, int64(10000+i*10)))
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Sanity check: the two in-order samples form a single in-memory head
|
||||
// chunk covering the whole timestamp range, with no m-mapped chunks.
|
||||
ms, _, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ms.headChunks)
|
||||
require.Equal(t, 1, ms.headChunks.len())
|
||||
require.Nil(t, ms.headChunks.prev)
|
||||
require.Empty(t, ms.mmappedChunks)
|
||||
require.Equal(t, int64(10000+firstIndex*10), ms.headChunks.minTime)
|
||||
require.Equal(t, int64(10000+lastIndex*10), ms.headChunks.maxTime)
|
||||
require.Equal(t, 2, ms.headChunks.chunk.NumSamples())
|
||||
|
||||
// Append the OOO samples in the gap between the two in-order samples.
|
||||
app = db.Appender(context.Background())
|
||||
for i := firstIndex + 1; i < lastIndex; i++ {
|
||||
require.NoError(t, appendSample(app, int64(10000+i*10)))
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Sanity check: the head holds the expected number of OOO chunks for
|
||||
// the series. Each m-mapped OOO chunk has oooCapMax samples; whatever
|
||||
// remains lives in the in-memory head OOO chunk.
|
||||
require.NotNil(t, ms.ooo)
|
||||
require.Len(t, ms.ooo.oooMmappedChunks, oooSamplesToAppend/oooCapMax)
|
||||
require.Equal(t, oooSamplesToAppend%oooCapMax, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
||||
|
||||
chunkQuerier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
|
||||
css := chunkQuerier.Select(context.Background(), false, nil, matcher)
|
||||
|
||||
var seriesCount, chunkCount, sampleCount int
|
||||
lastTS := int64(math.MinInt64)
|
||||
for css.Next() {
|
||||
seriesCount++
|
||||
series := css.At()
|
||||
it := series.Iterator(nil)
|
||||
for it.Next() {
|
||||
chunkCount++
|
||||
chk := it.At()
|
||||
cit := chk.Chunk.Iterator(nil)
|
||||
for vt := cit.Next(); vt != chunkenc.ValNone; vt = cit.Next() {
|
||||
require.Equal(t, valType, vt)
|
||||
ts := cit.AtT()
|
||||
require.Greater(t, ts, lastTS, "timestamps must be strictly increasing across the returned chunks")
|
||||
lastTS = ts
|
||||
sampleCount++
|
||||
}
|
||||
require.NoError(t, cit.Err())
|
||||
}
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
require.NoError(t, css.Err())
|
||||
|
||||
require.Equal(t, 1, seriesCount)
|
||||
require.Greater(t, chunkCount, 1)
|
||||
require.Equal(t, lastIndex-firstIndex+1, sampleCount)
|
||||
}
|
||||
|
||||
func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
|
||||
ir := mockReaderOfLabels{}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user