xorOptSTv2

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-11-19 08:51:49 +00:00
parent 9c6204d431
commit ec84736451
4 changed files with 316 additions and 71 deletions

View File

@ -165,6 +165,23 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
return ret
}(),
},
{
name: "vt=random 0-1/st=cumulative-periodic-resets",
samples: func() (ret []triple) {
st, t, v := initST, initT, initV
for i := range nSamples {
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
v += r.Float64() // Random between 0 and 1.0.
if i%6 == 5 {
st = t - 10000 // Reset of 10s before current t.
} else {
st = initST
}
ret = append(ret, triple{st: st, t: t, v: v})
}
return ret
}(),
},
{
name: "vt=random 0-1/st=cumulative-periodic-zeros",
samples: func() (ret []triple) {
@ -174,6 +191,8 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
v += r.Float64() // Random between 0 and 1.0.
if i%6 == 5 {
st = 0
} else {
st = initT
}
ret = append(ret, triple{st: st, t: t, v: v})
}
@ -196,11 +215,11 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
}
for _, f := range []fmtCase{
{name: "XOR (ST ignored)", newChunkFn: func() Chunk { return NewXORChunk() }},
//{name: "XOR (ST ignored)", newChunkFn: func() Chunk { return NewXORChunk() }},
//{name: "XORvarbit (ST ignored)", newChunkFn: func() Chunk { return NewXORVarbitChunk() }},
//{name: "XORvarbitTS (ST ignored)", newChunkFn: func() Chunk { return NewXORVarbitTSChunk() }},
//{name: "XORV2Naive", newChunkFn: func() Chunk { return NewXORV2NaiveChunk() }},
//{name: "XOROptST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }},
{name: "XOROptST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }},
//{name: "ALPBuffered", newChunkFn: func() Chunk { return NewALPBufferedChunk() }},
} {
for _, s := range sampleCases {
@ -212,16 +231,16 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
}
/*
export bench=bw.bench/append.xor && go test \
export bench=bw.bench/append.xoroptstv2 && go test \
-run '^$' -bench '^BenchmarkAppender' \
-benchtime 1s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
For profiles:
export bench=bw.bench/append && go test \
export bench=bw.bench/appendprof.xoroptstv2 && go test \
-run '^$' -bench '^BenchmarkAppender' \
-benchtime 10000x -count 6 -cpu 2 -timeout 999m \
-benchtime 1s -count 1 -cpu 2 -timeout 999m \
-cpuprofile=${bench}.cpu.pprof \
| tee ${bench}.txt
*/
@ -254,22 +273,16 @@ type supportsAppenderV2 interface {
}
/*
export bench=bw.bench/iter.xor && go test \
export bench=bw.bench/iter.xoroptst && go test \
-run '^$' -bench '^BenchmarkIterator' \
-benchtime 1s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
export bench=bw.bench/iter.xoroptst && go test \
-run '^$' -bench '^BenchmarkIterator' \
-benchtime 1s -count 6 -cpu 2 -timeout 999m \
-benchtime 1s -count 1 -cpu 2 -timeout 999m \
-cpuprofile=${bench}.cpu.pprof \
| tee ${bench}.txt
export bench=bw.bench/iter.xor && go test \
-run '^$' -bench '^BenchmarkIterator' \
-benchtime 10000x -count 6 -cpu 2 -timeout 999m \
-cpuprofile=${bench}.cpu.pprof \
| tee ${bench}.txt
export bench=bw.bench/iter.xorvts && go test \
-run '^$' -bench '^BenchmarkIterator' \
-benchtime 10000x -count 6 -cpu 2 -timeout 999m \
-cpuprofile=${bench}.cpu.pprof \
| tee ${bench}.txt
*/
func BenchmarkIterator(b *testing.B) {
foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) {

View File

@ -198,6 +198,9 @@ func testChunk(t *testing.T, c Chunk, data []triple, floatEquals func(a, b float
}
require.NoError(t, it1.Err())
if diff := cmp.Diff(data, res1, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" {
// fmt.Println("exp", data[:10])
// fmt.Println("got", res1[:10])
t.Fatalf("mismatch (-want +got):\n%s", diff)
}

View File

@ -20,15 +20,18 @@ import (
"github.com/prometheus/prometheus/model/histogram"
)
const chunkSTHeaderSize = 1
// xorOptSTChunk holds encoded sample data:
// 2B(numSamples), 2B(stChangedOnSample), varint(st), varint(t), xor(v), varuint(stDelta), varuint(tDelta), xor(v), classicvarbitint(stDod), classicvarbitint(tDod), xor(v), ...
// 2B(numSamples), 1B(stHeader), ?varint(st), varint(t), xor(v), ?varuint(stDelta), varuint(tDelta), xor(v), ?classicvarbitint(stDod), classicvarbitint(tDod), xor(v), ...
// stHeader: 1b(nonZeroFirstST), 7b(stSampleUntil)
type xorOptSTChunk struct {
b bstream
}
// NewXOROptSTChunk returns a new chunk with XORv2 encoding.
func NewXOROptSTChunk() *xorOptSTChunk {
b := make([]byte, 2*chunkHeaderSize, chunkAllocationSize)
b := make([]byte, chunkHeaderSize+chunkSTHeaderSize, chunkAllocationSize)
return &xorOptSTChunk{b: bstream{stream: b, count: 0}}
}
@ -69,7 +72,7 @@ func (c *xorOptSTChunk) Appender() (Appender, error) {
// It is not valid to call AppenderV2() multiple times concurrently or to use multiple
// Appenders on the same chunk.
func (c *xorOptSTChunk) AppenderV2() (AppenderV2, error) {
if len(c.b.stream) == 2*chunkHeaderSize { // Avoid allocating an Iterator when chunk is empty.
if len(c.b.stream) == chunkHeaderSize+chunkSTHeaderSize { // Avoid allocating an Iterator when chunk is empty.
return &xorOptSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil
}
it := c.iterator(nil)
@ -92,6 +95,11 @@ func (c *xorOptSTChunk) AppenderV2() (AppenderV2, error) {
tDelta: it.tDelta,
leading: it.leading,
trailing: it.trailing,
numTotal: it.numTotal,
stZeroInitially: it.stZeroInitially,
stSameUntil: it.stSameUntil,
stChangeTrackingDisabled: it.stSameUntil != it.numTotal,
}
return a, nil
}
@ -115,7 +123,18 @@ func (c *xorOptSTChunk) Iterator(it Iterator) Iterator {
}
type xorOptSTAppender struct {
b *bstream
b *bstream
numTotal uint16
// stZeroInitially if true, indicates that the first ST sample was zero
// and there is no first ST value encoded.
stZeroInitially bool
// stSameUntil is a sample number (counting from 0) when ST changed over the
// first ST appended value.
// This means that reader should read first sample and then start reading
// STs only from stSameUntil sample onwards.
stSameUntil uint16
stChangeTrackingDisabled bool
st, t int64
v float64
@ -139,10 +158,13 @@ func (*xorOptSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, in
}
type xorOptSTtIterator struct {
br bstreamReader
numTotal uint16
stChangedOnSample uint16
numRead uint16
br bstreamReader
numTotal uint16
stZeroInitially bool
stSameUntil uint16
numRead uint16
st, t int64
val float64
@ -193,12 +215,10 @@ func (it *xorOptSTtIterator) Err() error {
}
func (it *xorOptSTtIterator) Reset(b []byte) {
// 2*chunkHeaderSize is for the first 2*2 bytes contain chunk headers (numSamples, stChangedOnSample).
// We skip the above for actual samples.
it.br = newBReader(b[2*chunkHeaderSize:])
// We skip initial headers for actual samples.
it.br = newBReader(b[chunkHeaderSize+chunkSTHeaderSize:])
it.numTotal = binary.BigEndian.Uint16(b)
it.stChangedOnSample = binary.BigEndian.Uint16(b[chunkHeaderSize:])
it.stZeroInitially, it.stSameUntil = readSTHeader(b[chunkHeaderSize:], it.numTotal)
it.numRead = 0
it.st = 0
it.t = 0
@ -212,18 +232,22 @@ func (it *xorOptSTtIterator) Reset(b []byte) {
func (a *xorOptSTAppender) Append(st, t int64, v float64) {
var (
stDelta int64
tDelta uint64
stDelta int64
tDelta uint64
stChanged bool
)
num := binary.BigEndian.Uint16(a.b.bytes())
stChangedOnNum := binary.BigEndian.Uint16(a.b.bytes()[chunkHeaderSize:])
switch num {
switch a.numTotal {
case 0:
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, st)] {
a.b.writeByte(b)
if st != 0 {
for _, b := range buf[:binary.PutVarint(buf, st)] {
a.b.writeByte(b)
}
} else {
a.stZeroInitially = true
}
for _, b := range buf[:binary.PutVarint(buf, t)] {
a.b.writeByte(b)
}
@ -232,7 +256,7 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) {
buf := make([]byte, binary.MaxVarintLen64)
stDelta = st - a.st
if stDelta != 0 {
stChangedOnNum = num
stChanged = true
for _, b := range buf[:binary.PutVarint(buf, stDelta)] {
a.b.writeByte(b)
}
@ -246,10 +270,8 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) {
default:
stDelta = st - a.st
sdod := stDelta - a.stDelta
if stChangedOnNum != 0 {
putClassicVarbitInt(a.b, sdod)
} else if sdod != 0 {
stChangedOnNum = num
if sdod != 0 || a.stChangeTrackingDisabled {
stChanged = true
putClassicVarbitInt(a.b, sdod)
}
@ -262,10 +284,17 @@ func (a *xorOptSTAppender) Append(st, t int64, v float64) {
a.st = st
a.t = t
a.v = v
binary.BigEndian.PutUint16(a.b.bytes(), num+1)
binary.BigEndian.PutUint16(a.b.bytes()[chunkHeaderSize:], stChangedOnNum)
a.tDelta = tDelta
a.stDelta = stDelta
a.numTotal++
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
// Bump stSameUntil if we see continuously unchanged ST over all samples so far.
if !a.stChangeTrackingDisabled && !stChanged {
a.stSameUntil++
}
a.stChangeTrackingDisabled = !updateSTHeader(a.b.bytes()[chunkHeaderSize:], a.stZeroInitially, a.stSameUntil, a.numTotal)
}
func (a *xorOptSTAppender) BitProfiledAppend(p *bitProfiler[any], st, t int64, v float64) {
@ -273,6 +302,7 @@ func (a *xorOptSTAppender) BitProfiledAppend(p *bitProfiler[any], st, t int64, v
stDelta int64
tDelta uint64
)
// TODO update!!!
num := binary.BigEndian.Uint16(a.b.bytes())
stChangedOnNum := binary.BigEndian.Uint16(a.b.bytes()[chunkHeaderSize:])
@ -353,11 +383,41 @@ func (it *xorOptSTtIterator) Next() ValueType {
switch it.numRead {
case 0:
st, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
return ValNone
if !it.stZeroInitially {
st, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
return ValNone
}
it.st = st
}
case 1:
if it.stSameUntil <= it.numRead {
stDelta, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
return ValNone
}
it.stDelta = stDelta
it.st += it.stDelta
}
default:
if it.stSameUntil <= it.numRead {
sdod, err := readClassicVarbitInt(&it.br)
if err != nil {
it.err = err
return ValNone
}
it.stDelta = it.stDelta + sdod
it.st += it.stDelta
}
}
return it.stAgnosticNext()
}
func (it *xorOptSTtIterator) stAgnosticNext() ValueType {
switch it.numRead {
case 0:
t, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
@ -368,22 +428,12 @@ func (it *xorOptSTtIterator) Next() ValueType {
it.err = err
return ValNone
}
it.st = st
it.t = t
it.val = math.Float64frombits(v)
it.numRead++
return ValFloat
case 1:
if it.stChangedOnSample > 0 && it.stChangedOnSample <= it.numRead {
stDelta, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
return ValNone
}
it.stDelta = stDelta
it.st += it.stDelta
}
tDelta, err := binary.ReadUvarint(&it.br)
if err != nil {
it.err = err
@ -391,19 +441,8 @@ func (it *xorOptSTtIterator) Next() ValueType {
}
it.tDelta = tDelta
it.t += int64(it.tDelta)
return it.readValue()
default:
if it.stChangedOnSample > 0 && it.stChangedOnSample <= it.numRead {
sdod, err := readClassicVarbitInt(&it.br)
if err != nil {
it.err = err
return ValNone
}
it.stDelta = it.stDelta + sdod
it.st += it.stDelta
}
dod, err := readClassicVarbitInt(&it.br)
if err != nil {
it.err = err
@ -416,6 +455,71 @@ func (it *xorOptSTtIterator) Next() ValueType {
}
}
// NOTE: A lot of info we can pack within 1 byte. We can:
// * we can project stSameUntil input into 63 number (e.g. stSameUntil*63/numTotal)
// * we can project stSameUntil input into 127 number (e.g. stSameUntil*127/numTotal) which would fit exact samples.
// * we could use 6 bits to indicate where ST changes, when it's worth to write (and read it). For 120 samples it means 20 sample chunks.
// updateSTHeader updates one byte ST header with stZeroInitially and stSameUntil data.
// numTotal is used to encode a special case of a chunk having a single ST value (so far).
//
// updateSTHeader returns true is returned if it's ok to skip ST tracking until next changed value. False means encoders have to start tracking
// all ST DoD values.
//
// NOTE: Given 1B constrain we can only fit 127 samples before we don't have space for tracking stSameUntil.
// For those cases encoders and decoders would have to start tracking ST even if they don't change for the rest
// of a chunk. This is fine as typically we see 120 chunk samples.
func updateSTHeader(b []byte, stZeroInitially bool, stSameUntil, numTotal uint16) (ok bool) {
// First bit indicates initial ST value. 0 bit means 0 ST, otherwise non-zero (need to read first varint).
b[0] = 0x00
if !stZeroInitially {
b[0] = 0x80
}
if stSameUntil > 0x7F {
// This should never happen, would cause corruption (ST already skipped but shouldn't).
return false
}
if stSameUntil >= numTotal {
// Fast path for the fully unchanged chunk.
// 0000 0000 for noST.
// 1000 0000 for single ST value for the whole chunk.
return stSameUntil != 0x7F
}
rest := uint8(stSameUntil)
if rest == 0 {
// stSameUntil == 0 makes no sense, but we treat it as stSameUntil == 1
rest = 1
}
b[0] |= rest
return false
}
func readSTHeader(b []byte, numTotal uint16) (stZeroInitially bool, stSameUntil uint16) {
if b[0] == 0x00 {
// Maybe easier without numTotal?
return true, numTotal
}
if b[0] == 0x80 {
return false, numTotal
}
if b[0] == 0x7F {
return true, 127
}
if b[0] == 0xFF {
return false, 127
}
mask := byte(0x80)
if b[0]&mask == 0 {
stZeroInitially = true
}
mask = 0x7F
return stZeroInitially, uint16(b[0] & mask)
}
func (it *xorOptSTtIterator) readValue() ValueType {
err := xorRead(&it.br, &it.val, &it.leading, &it.trailing)
if err != nil {

View File

@ -0,0 +1,125 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"math"
"testing"
"github.com/stretchr/testify/require"
)
func TestSTHeader(t *testing.T) {
b := make([]byte, 1)
require.True(t, updateSTHeader(b, false, 120, 120))
stZeroInitially, stSameUntil := readSTHeader(b, 120)
require.False(t, stZeroInitially)
require.Equal(t, uint16(120), stSameUntil)
require.True(t, updateSTHeader(b, true, 120, 120))
stZeroInitially, stSameUntil = readSTHeader(b, 120)
require.True(t, stZeroInitially)
require.Equal(t, uint16(120), stSameUntil)
require.False(t, updateSTHeader(b, false, 20, 120))
stZeroInitially, stSameUntil = readSTHeader(b, 120)
require.False(t, stZeroInitially)
require.Equal(t, uint16(20), stSameUntil)
require.False(t, updateSTHeader(b, true, 20, 120))
stZeroInitially, stSameUntil = readSTHeader(b, 120)
require.True(t, stZeroInitially)
require.Equal(t, uint16(20), stSameUntil)
require.False(t, updateSTHeader(b, true, 21, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(21), stSameUntil)
require.False(t, updateSTHeader(b, true, 19, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(19), stSameUntil)
require.False(t, updateSTHeader(b, true, 1, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(1), stSameUntil)
require.False(t, updateSTHeader(b, true, 2, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(2), stSameUntil)
require.False(t, updateSTHeader(b, true, 119, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(119), stSameUntil)
require.False(t, updateSTHeader(b, true, 127, 128))
_, stSameUntil = readSTHeader(b, 128)
require.Equal(t, uint16(127), stSameUntil)
// Not full chunks works fine.
require.True(t, updateSTHeader(b, false, 21, 21))
stZeroInitially, stSameUntil = readSTHeader(b, 21)
require.False(t, stZeroInitially)
require.Equal(t, uint16(21), stSameUntil)
require.False(t, updateSTHeader(b, false, 21, 90))
stZeroInitially, stSameUntil = readSTHeader(b, 90)
require.False(t, stZeroInitially)
require.Equal(t, uint16(21), stSameUntil)
require.False(t, updateSTHeader(b, true, 19, 90))
stZeroInitially, stSameUntil = readSTHeader(b, 90)
require.True(t, stZeroInitially)
require.Equal(t, uint16(19), stSameUntil)
require.False(t, updateSTHeader(b, false, 4, 127))
stZeroInitially, stSameUntil = readSTHeader(b, 127)
require.False(t, stZeroInitially)
require.Equal(t, uint16(4), stSameUntil)
require.False(t, updateSTHeader(b, false, 4, 129))
stZeroInitially, stSameUntil = readSTHeader(b, 129)
require.False(t, stZeroInitially)
require.Equal(t, uint16(4), stSameUntil)
require.False(t, updateSTHeader(b, false, 4, 130))
stZeroInitially, stSameUntil = readSTHeader(b, 131)
require.False(t, stZeroInitially)
require.Equal(t, uint16(4), stSameUntil)
require.False(t, updateSTHeader(b, false, 4, 130))
stZeroInitially, stSameUntil = readSTHeader(b, 131)
require.False(t, stZeroInitially)
require.Equal(t, uint16(4), stSameUntil)
require.False(t, updateSTHeader(b, true, 127, 9999))
_, stSameUntil = readSTHeader(b, 9999)
require.Equal(t, uint16(127), stSameUntil)
// Wrong/odd inputs.
require.False(t, updateSTHeader(b, true, 0, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(1), stSameUntil)
require.False(t, updateSTHeader(b, true, 200, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(120), stSameUntil)
require.False(t, updateSTHeader(b, true, math.MaxUint16, 120))
_, stSameUntil = readSTHeader(b, 120)
require.Equal(t, uint16(120), stSameUntil)
require.False(t, updateSTHeader(b, true, 20, math.MaxUint16))
_, stSameUntil = readSTHeader(b, math.MaxUint16)
require.Equal(t, uint16(20), stSameUntil)
}