From ec847364517255d5f16067ecb640b2c57dc7e88f Mon Sep 17 00:00:00 2001 From: bwplotka Date: Wed, 19 Nov 2025 08:51:49 +0000 Subject: [PATCH] xorOptSTv2 Signed-off-by: bwplotka --- tsdb/chunkenc/benchmark_test.go | 51 ++++--- tsdb/chunkenc/chunk_test.go | 3 + tsdb/chunkenc/xxx_xoroptst.go | 208 +++++++++++++++++++++-------- tsdb/chunkenc/xxx_xoroptst_test.go | 125 +++++++++++++++++ 4 files changed, 316 insertions(+), 71 deletions(-) create mode 100644 tsdb/chunkenc/xxx_xoroptst_test.go diff --git a/tsdb/chunkenc/benchmark_test.go b/tsdb/chunkenc/benchmark_test.go index 79c4e78a23..5d1ed4adde 100644 --- a/tsdb/chunkenc/benchmark_test.go +++ b/tsdb/chunkenc/benchmark_test.go @@ -165,6 +165,23 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl return ret }(), }, + { + name: "vt=random 0-1/st=cumulative-periodic-resets", + samples: func() (ret []triple) { + st, t, v := initST, initT, initV + for i := range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += r.Float64() // Random between 0 and 1.0. + if i%6 == 5 { + st = t - 10000 // Reset of 10s before current t. + } else { + st = initST + } + ret = append(ret, triple{st: st, t: t, v: v}) + } + return ret + }(), + }, { name: "vt=random 0-1/st=cumulative-periodic-zeros", samples: func() (ret []triple) { @@ -174,6 +191,8 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl v += r.Float64() // Random between 0 and 1.0. if i%6 == 5 { st = 0 + } else { + st = initT } ret = append(ret, triple{st: st, t: t, v: v}) } @@ -196,11 +215,11 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl } for _, f := range []fmtCase{ - {name: "XOR (ST ignored)", newChunkFn: func() Chunk { return NewXORChunk() }}, + //{name: "XOR (ST ignored)", newChunkFn: func() Chunk { return NewXORChunk() }}, //{name: "XORvarbit (ST ignored)", newChunkFn: func() Chunk { return NewXORVarbitChunk() }}, //{name: "XORvarbitTS (ST ignored)", newChunkFn: func() Chunk { return NewXORVarbitTSChunk() }}, //{name: "XORV2Naive", newChunkFn: func() Chunk { return NewXORV2NaiveChunk() }}, - //{name: "XOROptST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }}, + {name: "XOROptST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }}, //{name: "ALPBuffered", newChunkFn: func() Chunk { return NewALPBufferedChunk() }}, } { for _, s := range sampleCases { @@ -212,16 +231,16 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl } /* - export bench=bw.bench/append.xor && go test \ + export bench=bw.bench/append.xoroptstv2 && go test \ -run '^$' -bench '^BenchmarkAppender' \ -benchtime 1s -count 6 -cpu 2 -timeout 999m \ | tee ${bench}.txt For profiles: - export bench=bw.bench/append && go test \ + export bench=bw.bench/appendprof.xoroptstv2 && go test \ -run '^$' -bench '^BenchmarkAppender' \ - -benchtime 10000x -count 6 -cpu 2 -timeout 999m \ + -benchtime 1s -count 1 -cpu 2 -timeout 999m \ -cpuprofile=${bench}.cpu.pprof \ | tee ${bench}.txt */ @@ -254,22 +273,16 @@ type supportsAppenderV2 interface { } /* - export bench=bw.bench/iter.xor && go test \ + export bench=bw.bench/iter.xoroptst && go test \ + -run '^$' -bench '^BenchmarkIterator' \ + -benchtime 1s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt + + export bench=bw.bench/iter.xoroptst && go test \ -run '^$' -bench '^BenchmarkIterator' \ - -benchtime 1s -count 6 -cpu 2 -timeout 999m \ + -benchtime 1s -count 1 -cpu 2 -timeout 999m \ + -cpuprofile=${bench}.cpu.pprof \ | tee ${bench}.txt - - export bench=bw.bench/iter.xor && go test \ - -run '^$' -bench '^BenchmarkIterator' \ - -benchtime 10000x -count 6 -cpu 2 -timeout 999m \ - -cpuprofile=${bench}.cpu.pprof \ - | tee ${bench}.txt - - export bench=bw.bench/iter.xorvts && go test \ - -run '^$' -bench '^BenchmarkIterator' \ - -benchtime 10000x -count 6 -cpu 2 -timeout 999m \ - -cpuprofile=${bench}.cpu.pprof \ - | tee ${bench}.txt */ func BenchmarkIterator(b *testing.B) { foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) { diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index a24f0e1087..b166369fd5 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -198,6 +198,9 @@ func testChunk(t *testing.T, c Chunk, data []triple, floatEquals func(a, b float } require.NoError(t, it1.Err()) if diff := cmp.Diff(data, res1, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" { + // fmt.Println("exp", data[:10]) + // fmt.Println("got", res1[:10]) + t.Fatalf("mismatch (-want +got):\n%s", diff) } diff --git a/tsdb/chunkenc/xxx_xoroptst.go b/tsdb/chunkenc/xxx_xoroptst.go index f5eb8b5842..d5185b7aae 100644 --- a/tsdb/chunkenc/xxx_xoroptst.go +++ b/tsdb/chunkenc/xxx_xoroptst.go @@ -20,15 +20,18 @@ import ( "github.com/prometheus/prometheus/model/histogram" ) +const chunkSTHeaderSize = 1 + // xorOptSTChunk holds encoded sample data: -// 2B(numSamples), 2B(stChangedOnSample), varint(st), varint(t), xor(v), varuint(stDelta), varuint(tDelta), xor(v), classicvarbitint(stDod), classicvarbitint(tDod), xor(v), ... +// 2B(numSamples), 1B(stHeader), ?varint(st), varint(t), xor(v), ?varuint(stDelta), varuint(tDelta), xor(v), ?classicvarbitint(stDod), classicvarbitint(tDod), xor(v), ... +// stHeader: 1b(nonZeroFirstST), 7b(stSampleUntil) type xorOptSTChunk struct { b bstream } // NewXOROptSTChunk returns a new chunk with XORv2 encoding. func NewXOROptSTChunk() *xorOptSTChunk { - b := make([]byte, 2*chunkHeaderSize, chunkAllocationSize) + b := make([]byte, chunkHeaderSize+chunkSTHeaderSize, chunkAllocationSize) return &xorOptSTChunk{b: bstream{stream: b, count: 0}} } @@ -69,7 +72,7 @@ func (c *xorOptSTChunk) Appender() (Appender, error) { // It is not valid to call AppenderV2() multiple times concurrently or to use multiple // Appenders on the same chunk. func (c *xorOptSTChunk) AppenderV2() (AppenderV2, error) { - if len(c.b.stream) == 2*chunkHeaderSize { // Avoid allocating an Iterator when chunk is empty. + if len(c.b.stream) == chunkHeaderSize+chunkSTHeaderSize { // Avoid allocating an Iterator when chunk is empty. return &xorOptSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil } it := c.iterator(nil) @@ -92,6 +95,11 @@ func (c *xorOptSTChunk) AppenderV2() (AppenderV2, error) { tDelta: it.tDelta, leading: it.leading, trailing: it.trailing, + + numTotal: it.numTotal, + stZeroInitially: it.stZeroInitially, + stSameUntil: it.stSameUntil, + stChangeTrackingDisabled: it.stSameUntil != it.numTotal, } return a, nil } @@ -115,7 +123,18 @@ func (c *xorOptSTChunk) Iterator(it Iterator) Iterator { } type xorOptSTAppender struct { - b *bstream + b *bstream + numTotal uint16 + + // stZeroInitially if true, indicates that the first ST sample was zero + // and there is no first ST value encoded. + stZeroInitially bool + // stSameUntil is a sample number (counting from 0) when ST changed over the + // first ST appended value. + // This means that reader should read first sample and then start reading + // STs only from stSameUntil sample onwards. + stSameUntil uint16 + stChangeTrackingDisabled bool st, t int64 v float64 @@ -139,10 +158,13 @@ func (*xorOptSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, in } type xorOptSTtIterator struct { - br bstreamReader - numTotal uint16 - stChangedOnSample uint16 - numRead uint16 + br bstreamReader + numTotal uint16 + + stZeroInitially bool + stSameUntil uint16 + + numRead uint16 st, t int64 val float64 @@ -193,12 +215,10 @@ func (it *xorOptSTtIterator) Err() error { } func (it *xorOptSTtIterator) Reset(b []byte) { - // 2*chunkHeaderSize is for the first 2*2 bytes contain chunk headers (numSamples, stChangedOnSample). - // We skip the above for actual samples. - it.br = newBReader(b[2*chunkHeaderSize:]) + // We skip initial headers for actual samples. + it.br = newBReader(b[chunkHeaderSize+chunkSTHeaderSize:]) it.numTotal = binary.BigEndian.Uint16(b) - it.stChangedOnSample = binary.BigEndian.Uint16(b[chunkHeaderSize:]) - + it.stZeroInitially, it.stSameUntil = readSTHeader(b[chunkHeaderSize:], it.numTotal) it.numRead = 0 it.st = 0 it.t = 0 @@ -212,18 +232,22 @@ func (it *xorOptSTtIterator) Reset(b []byte) { func (a *xorOptSTAppender) Append(st, t int64, v float64) { var ( - stDelta int64 - tDelta uint64 + stDelta int64 + tDelta uint64 + stChanged bool ) - num := binary.BigEndian.Uint16(a.b.bytes()) - stChangedOnNum := binary.BigEndian.Uint16(a.b.bytes()[chunkHeaderSize:]) - switch num { + switch a.numTotal { case 0: buf := make([]byte, binary.MaxVarintLen64) - for _, b := range buf[:binary.PutVarint(buf, st)] { - a.b.writeByte(b) + if st != 0 { + for _, b := range buf[:binary.PutVarint(buf, st)] { + a.b.writeByte(b) + } + } else { + a.stZeroInitially = true } + for _, b := range buf[:binary.PutVarint(buf, t)] { a.b.writeByte(b) } @@ -232,7 +256,7 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) { buf := make([]byte, binary.MaxVarintLen64) stDelta = st - a.st if stDelta != 0 { - stChangedOnNum = num + stChanged = true for _, b := range buf[:binary.PutVarint(buf, stDelta)] { a.b.writeByte(b) } @@ -246,10 +270,8 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) { default: stDelta = st - a.st sdod := stDelta - a.stDelta - if stChangedOnNum != 0 { - putClassicVarbitInt(a.b, sdod) - } else if sdod != 0 { - stChangedOnNum = num + if sdod != 0 || a.stChangeTrackingDisabled { + stChanged = true putClassicVarbitInt(a.b, sdod) } @@ -262,10 +284,17 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) { a.st = st a.t = t a.v = v - binary.BigEndian.PutUint16(a.b.bytes(), num+1) - binary.BigEndian.PutUint16(a.b.bytes()[chunkHeaderSize:], stChangedOnNum) a.tDelta = tDelta a.stDelta = stDelta + + a.numTotal++ + binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) + + // Bump stSameUntil if we see continuously unchanged ST over all samples so far. + if !a.stChangeTrackingDisabled && !stChanged { + a.stSameUntil++ + } + a.stChangeTrackingDisabled = !updateSTHeader(a.b.bytes()[chunkHeaderSize:], a.stZeroInitially, a.stSameUntil, a.numTotal) } func (a *xorOptSTAppender) BitProfiledAppend(p *bitProfiler[any], st, t int64, v float64) { @@ -273,6 +302,7 @@ func (a *xorOptSTAppender) BitProfiledAppend(p *bitProfiler[any], st, t int64, v stDelta int64 tDelta uint64 ) + // TODO update!!! num := binary.BigEndian.Uint16(a.b.bytes()) stChangedOnNum := binary.BigEndian.Uint16(a.b.bytes()[chunkHeaderSize:]) @@ -353,11 +383,41 @@ func (it *xorOptSTtIterator) Next() ValueType { switch it.numRead { case 0: - st, err := binary.ReadVarint(&it.br) - if err != nil { - it.err = err - return ValNone + if !it.stZeroInitially { + st, err := binary.ReadVarint(&it.br) + if err != nil { + it.err = err + return ValNone + } + it.st = st } + case 1: + if it.stSameUntil <= it.numRead { + stDelta, err := binary.ReadVarint(&it.br) + if err != nil { + it.err = err + return ValNone + } + it.stDelta = stDelta + it.st += it.stDelta + } + default: + if it.stSameUntil <= it.numRead { + sdod, err := readClassicVarbitInt(&it.br) + if err != nil { + it.err = err + return ValNone + } + it.stDelta = it.stDelta + sdod + it.st += it.stDelta + } + } + return it.stAgnosticNext() +} + +func (it *xorOptSTtIterator) stAgnosticNext() ValueType { + switch it.numRead { + case 0: t, err := binary.ReadVarint(&it.br) if err != nil { it.err = err @@ -368,22 +428,12 @@ func (it *xorOptSTtIterator) Next() ValueType { it.err = err return ValNone } - it.st = st it.t = t it.val = math.Float64frombits(v) it.numRead++ return ValFloat case 1: - if it.stChangedOnSample > 0 && it.stChangedOnSample <= it.numRead { - stDelta, err := binary.ReadVarint(&it.br) - if err != nil { - it.err = err - return ValNone - } - it.stDelta = stDelta - it.st += it.stDelta - } tDelta, err := binary.ReadUvarint(&it.br) if err != nil { it.err = err @@ -391,19 +441,8 @@ func (it *xorOptSTtIterator) Next() ValueType { } it.tDelta = tDelta it.t += int64(it.tDelta) - return it.readValue() default: - if it.stChangedOnSample > 0 && it.stChangedOnSample <= it.numRead { - sdod, err := readClassicVarbitInt(&it.br) - if err != nil { - it.err = err - return ValNone - } - it.stDelta = it.stDelta + sdod - it.st += it.stDelta - } - dod, err := readClassicVarbitInt(&it.br) if err != nil { it.err = err @@ -416,6 +455,71 @@ func (it *xorOptSTtIterator) Next() ValueType { } } +// NOTE: A lot of info we can pack within 1 byte. We can: +// * we can project stSameUntil input into 63 number (e.g. stSameUntil*63/numTotal) +// * we can project stSameUntil input into 127 number (e.g. stSameUntil*127/numTotal) which would fit exact samples. +// * we could use 6 bits to indicate where ST changes, when it's worth to write (and read it). For 120 samples it means 20 sample chunks. +// updateSTHeader updates one byte ST header with stZeroInitially and stSameUntil data. +// numTotal is used to encode a special case of a chunk having a single ST value (so far). +// +// updateSTHeader returns true is returned if it's ok to skip ST tracking until next changed value. False means encoders have to start tracking +// all ST DoD values. +// +// NOTE: Given 1B constrain we can only fit 127 samples before we don't have space for tracking stSameUntil. +// For those cases encoders and decoders would have to start tracking ST even if they don't change for the rest +// of a chunk. This is fine as typically we see 120 chunk samples. +func updateSTHeader(b []byte, stZeroInitially bool, stSameUntil, numTotal uint16) (ok bool) { + // First bit indicates initial ST value. 0 bit means 0 ST, otherwise non-zero (need to read first varint). + b[0] = 0x00 + if !stZeroInitially { + b[0] = 0x80 + } + + if stSameUntil > 0x7F { + // This should never happen, would cause corruption (ST already skipped but shouldn't). + return false + } + + if stSameUntil >= numTotal { + // Fast path for the fully unchanged chunk. + // 0000 0000 for noST. + // 1000 0000 for single ST value for the whole chunk. + return stSameUntil != 0x7F + } + + rest := uint8(stSameUntil) + if rest == 0 { + // stSameUntil == 0 makes no sense, but we treat it as stSameUntil == 1 + rest = 1 + } + b[0] |= rest + return false +} + +func readSTHeader(b []byte, numTotal uint16) (stZeroInitially bool, stSameUntil uint16) { + if b[0] == 0x00 { + // Maybe easier without numTotal? + return true, numTotal + } + if b[0] == 0x80 { + return false, numTotal + } + if b[0] == 0x7F { + return true, 127 + } + if b[0] == 0xFF { + return false, 127 + } + + mask := byte(0x80) + if b[0]&mask == 0 { + stZeroInitially = true + } + + mask = 0x7F + return stZeroInitially, uint16(b[0] & mask) +} + func (it *xorOptSTtIterator) readValue() ValueType { err := xorRead(&it.br, &it.val, &it.leading, &it.trailing) if err != nil { diff --git a/tsdb/chunkenc/xxx_xoroptst_test.go b/tsdb/chunkenc/xxx_xoroptst_test.go new file mode 100644 index 0000000000..d5bd8cc19c --- /dev/null +++ b/tsdb/chunkenc/xxx_xoroptst_test.go @@ -0,0 +1,125 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSTHeader(t *testing.T) { + b := make([]byte, 1) + require.True(t, updateSTHeader(b, false, 120, 120)) + stZeroInitially, stSameUntil := readSTHeader(b, 120) + require.False(t, stZeroInitially) + require.Equal(t, uint16(120), stSameUntil) + + require.True(t, updateSTHeader(b, true, 120, 120)) + stZeroInitially, stSameUntil = readSTHeader(b, 120) + require.True(t, stZeroInitially) + require.Equal(t, uint16(120), stSameUntil) + + require.False(t, updateSTHeader(b, false, 20, 120)) + stZeroInitially, stSameUntil = readSTHeader(b, 120) + require.False(t, stZeroInitially) + require.Equal(t, uint16(20), stSameUntil) + + require.False(t, updateSTHeader(b, true, 20, 120)) + stZeroInitially, stSameUntil = readSTHeader(b, 120) + require.True(t, stZeroInitially) + require.Equal(t, uint16(20), stSameUntil) + + require.False(t, updateSTHeader(b, true, 21, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(21), stSameUntil) + + require.False(t, updateSTHeader(b, true, 19, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(19), stSameUntil) + + require.False(t, updateSTHeader(b, true, 1, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(1), stSameUntil) + + require.False(t, updateSTHeader(b, true, 2, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(2), stSameUntil) + + require.False(t, updateSTHeader(b, true, 119, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(119), stSameUntil) + + require.False(t, updateSTHeader(b, true, 127, 128)) + _, stSameUntil = readSTHeader(b, 128) + require.Equal(t, uint16(127), stSameUntil) + + // Not full chunks works fine. + require.True(t, updateSTHeader(b, false, 21, 21)) + stZeroInitially, stSameUntil = readSTHeader(b, 21) + require.False(t, stZeroInitially) + require.Equal(t, uint16(21), stSameUntil) + + require.False(t, updateSTHeader(b, false, 21, 90)) + stZeroInitially, stSameUntil = readSTHeader(b, 90) + require.False(t, stZeroInitially) + require.Equal(t, uint16(21), stSameUntil) + + require.False(t, updateSTHeader(b, true, 19, 90)) + stZeroInitially, stSameUntil = readSTHeader(b, 90) + require.True(t, stZeroInitially) + require.Equal(t, uint16(19), stSameUntil) + + require.False(t, updateSTHeader(b, false, 4, 127)) + stZeroInitially, stSameUntil = readSTHeader(b, 127) + require.False(t, stZeroInitially) + require.Equal(t, uint16(4), stSameUntil) + + require.False(t, updateSTHeader(b, false, 4, 129)) + stZeroInitially, stSameUntil = readSTHeader(b, 129) + require.False(t, stZeroInitially) + require.Equal(t, uint16(4), stSameUntil) + + require.False(t, updateSTHeader(b, false, 4, 130)) + stZeroInitially, stSameUntil = readSTHeader(b, 131) + require.False(t, stZeroInitially) + require.Equal(t, uint16(4), stSameUntil) + + require.False(t, updateSTHeader(b, false, 4, 130)) + stZeroInitially, stSameUntil = readSTHeader(b, 131) + require.False(t, stZeroInitially) + require.Equal(t, uint16(4), stSameUntil) + + require.False(t, updateSTHeader(b, true, 127, 9999)) + _, stSameUntil = readSTHeader(b, 9999) + require.Equal(t, uint16(127), stSameUntil) + + // Wrong/odd inputs. + require.False(t, updateSTHeader(b, true, 0, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(1), stSameUntil) + + require.False(t, updateSTHeader(b, true, 200, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(120), stSameUntil) + + require.False(t, updateSTHeader(b, true, math.MaxUint16, 120)) + _, stSameUntil = readSTHeader(b, 120) + require.Equal(t, uint16(120), stSameUntil) + + require.False(t, updateSTHeader(b, true, 20, math.MaxUint16)) + _, stSameUntil = readSTHeader(b, math.MaxUint16) + require.Equal(t, uint16(20), stSameUntil) +}