Merge branch 'main' into krajo/st-in-promql-tests

Signed-off-by: George Krajcsovits <krajorama@users.noreply.github.com>
This commit is contained in:
George Krajcsovits 2026-03-25 11:11:03 +01:00 committed by GitHub
commit b2ff1e5a5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 258 additions and 23 deletions

View File

@ -30,6 +30,8 @@ import (
// the size by around 1%. A more detailed study would be needed for precise
// values, but it's appears quite certain that we would end up far below 10%,
// which would maybe convince us to invest the increased coding/decoding cost.
//
// TODO(XOR2): Once XOR2 is stable, merge putVarbitInt and putVarbitIntFast.
func putVarbitInt(b *bstream, val int64) {
switch {
case val == 0: // Precisely 0, needs 1 bit.
@ -61,6 +63,36 @@ func putVarbitInt(b *bstream, val int64) {
}
}
// putVarbitIntFast is like putVarbitInt but combines the prefix and value into
// a single writeBitsFast call per bucket, reducing bstream overhead on the hot
// path. It is used by XOR2 encoding.
//
// TODO(XOR2): Once XOR2 is stable, merge putVarbitInt and putVarbitIntFast.
func putVarbitIntFast(b *bstream, val int64) {
uval := uint64(val)
switch {
case val == 0: // Precisely 0, needs 1 bit.
b.writeBit(zero)
case bitRange(val, 3): // -3 <= val <= 4, needs 5 bits.
b.writeBitsFast((0b10<<3)|(uval&0x7), 5)
case bitRange(val, 6): // -31 <= val <= 32, 9 bits.
b.writeBitsFast((0b110<<6)|(uval&0x3F), 9)
case bitRange(val, 9): // -255 <= val <= 256, 13 bits.
b.writeBitsFast((0b1110<<9)|(uval&0x1FF), 13)
case bitRange(val, 12): // -2047 <= val <= 2048, 17 bits.
b.writeBitsFast((0b11110<<12)|(uval&0xFFF), 17)
case bitRange(val, 18): // -131071 <= val <= 131072, 3 bytes.
b.writeBitsFast((0b111110<<18)|(uval&0x3FFFF), 24)
case bitRange(val, 25): // -16777215 <= val <= 16777216, 4 bytes.
b.writeBitsFast((0b1111110<<25)|(uval&0x1FFFFFF), 32)
case bitRange(val, 56): // -36028797018963967 <= val <= 36028797018963968, 8 bytes.
b.writeBitsFast((0b11111110<<56)|(uval&0xFFFFFFFFFFFFFF), 64)
default:
b.writeBitsFast(0b11111111, 8) // Worst case, needs 9 bytes.
b.writeBitsFast(uval, 64)
}
}
// readVarbitInt reads an int64 encoded with putVarbitInt.
func readVarbitInt(b *bstreamReader) (int64, error) {
var d byte

View File

@ -20,8 +20,8 @@ import (
"github.com/stretchr/testify/require"
)
func TestVarbitInt(t *testing.T) {
numbers := []int64{
func testVarbitIntBoundaryValues() []int64 {
return []int64{
math.MinInt64,
-36028797018963968, -36028797018963967,
-16777216, -16777215,
@ -40,6 +40,10 @@ func TestVarbitInt(t *testing.T) {
36028797018963968, 36028797018963969,
math.MaxInt64,
}
}
func TestVarbitInt(t *testing.T) {
numbers := testVarbitIntBoundaryValues()
bs := bstream{}
@ -56,6 +60,36 @@ func TestVarbitInt(t *testing.T) {
}
}
func TestVarbitIntFast(t *testing.T) {
numbers := testVarbitIntBoundaryValues()
bs := bstream{}
for _, n := range numbers {
putVarbitIntFast(&bs, n)
}
bsr := newBReader(bs.bytes())
for _, want := range numbers {
got, err := readVarbitInt(&bsr)
require.NoError(t, err)
require.Equal(t, want, got)
}
}
func TestVarbitIntAndFastProduceIdenticalOutput(t *testing.T) {
numbers := testVarbitIntBoundaryValues()
var slow, fast bstream
for _, n := range numbers {
putVarbitInt(&slow, n)
putVarbitIntFast(&fast, n)
}
require.Equal(t, slow.bytes(), fast.bytes())
}
func TestVarbitUint(t *testing.T) {
numbers := []uint64{
0, 1,

View File

@ -51,7 +51,7 @@
// chunk has no additional bits in it.
//
// When ST is present, the ST delta (prevT - st) is appended after each
// sample's joint timestamp+value encoding using putVarbitInt.
// sample's joint timestamp+value encoding using putVarbitIntFast.
package chunkenc
@ -194,7 +194,7 @@ func (c *XOR2Chunk) Iterator(it Iterator) Iterator {
// xor2Appender appends samples with optional start timestamps using
// the XOR2 joint control bit encoding for regular timestamp and value,
// and putVarbitInt for the start timestamp delta.
// and putVarbitIntFast for the start timestamp delta.
type xor2Appender struct {
b *bstream
@ -248,40 +248,134 @@ func (a *xor2Appender) Append(st, t int64, v float64) {
stDiff = a.t - st
a.firstSTChangeOn = 1
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], 1)
putVarbitInt(a.b, stDiff)
putVarbitIntFast(a.b, stDiff)
}
default:
tDelta = uint64(t - a.t)
dod := int64(tDelta - a.tDelta)
// Fast path: no ST involvement at all.
if st == 0 && a.numTotal != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown {
a.encodeJoint(dod, v)
a.t = t
if !value.IsStaleNaN(v) {
a.v = v
// Fast path: no new ST data to write for this sample.
// Covers: ST never seen (st=0 always), or ST recorded initially but unchanged.
// Must use the slow path at maxFirstSTChangeOn so the header remains valid
// even if ST changes on a later sample (index > maxFirstSTChangeOn).
if a.firstSTChangeOn == 0 && st == a.st && a.numTotal != maxFirstSTChangeOn {
vbits := math.Float64bits(v)
switch {
case dod == 0 && vbits == math.Float64bits(a.v):
// Unchanged value and timestamp: write a single 0 bit.
// This is the most common case for stable metrics.
// a.v stays correct (v == a.v), so no update needed.
a.b.writeBit(zero)
case dod >= -(1<<12) && dod <= (1<<12)-1 && vbits == math.Float64bits(a.v):
// 13-bit dod, value unchanged: the most common case for metrics with
// small timestamp jitter. Inline both bytes and the zero value bit to
// avoid calling encodeJoint and writeVDelta.
a.b.writeByte(0b110_00000 | byte(uint64(dod)>>8)&0x1F)
a.b.writeByte(byte(uint64(dod)))
a.b.writeBit(zero)
default:
a.encodeJoint(dod, v)
if !value.IsStaleNaN(v) {
a.v = v
}
}
a.t = t
a.tDelta = tDelta
a.numTotal++
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
return
}
// Slow path: ST may be involved.
// Active-ST fast path: firstSTChangeOn is set, so every sample needs a
// per-sample ST delta. Inline T+V encoding and the zero-delta ST case to
// avoid two non-inlined function calls (encodeJoint + putVarbitIntFast).
if a.firstSTChangeOn > 0 {
newStDiff := a.t - st
deltaStDiff := newStDiff - a.stDiff
vbits := math.Float64bits(v)
switch {
case dod == 0 && vbits == math.Float64bits(a.v):
// T/V: single 0 bit (dod=0, value unchanged). For non-zero ST deltas
// we fuse this bit with the ST delta write into a single writeBitsFast
// call, saving a non-inlined writeBit call. For deltaStDiff=0 we use
// two writeBit calls because writeBit has a smaller body than
// writeBitsFast, making it faster for writing just 1 bit.
switch {
case deltaStDiff == 0:
a.b.writeBit(zero)
a.b.writeBit(zero)
case deltaStDiff >= -3 && deltaStDiff <= 4:
// 0 (T/V) + 5-bit ST = 6 bits.
a.b.writeBitsFast((0b10<<3)|(uint64(deltaStDiff)&0x7), 6)
case deltaStDiff >= -31 && deltaStDiff <= 32:
// 0 (T/V) + 9-bit ST = 10 bits.
a.b.writeBitsFast((0b110<<6)|(uint64(deltaStDiff)&0x3F), 10)
case deltaStDiff >= -255 && deltaStDiff <= 256:
// 0 (T/V) + 13-bit ST = 14 bits.
a.b.writeBitsFast((0b1110<<9)|(uint64(deltaStDiff)&0x1FF), 14)
default:
a.b.writeBit(zero)
putVarbitIntFast(a.b, deltaStDiff)
}
case dod >= -(1<<12) && dod <= (1<<12)-1 && vbits == math.Float64bits(a.v):
a.b.writeByte(0b110_00000 | byte(uint64(dod)>>8)&0x1F)
a.b.writeByte(byte(uint64(dod)))
// T/V ends with a 0 bit (value unchanged indicator). Fuse it with
// non-zero ST deltas to save a writeBit call; for deltaStDiff=0 keep
// two cheap writeBit calls (faster than one writeBitsFast for 2 bits).
switch {
case deltaStDiff == 0:
a.b.writeBit(zero)
a.b.writeBit(zero)
case deltaStDiff >= -3 && deltaStDiff <= 4:
a.b.writeBitsFast((0b10<<3)|(uint64(deltaStDiff)&0x7), 6)
case deltaStDiff >= -31 && deltaStDiff <= 32:
a.b.writeBitsFast((0b110<<6)|(uint64(deltaStDiff)&0x3F), 10)
case deltaStDiff >= -255 && deltaStDiff <= 256:
a.b.writeBitsFast((0b1110<<9)|(uint64(deltaStDiff)&0x1FF), 14)
default:
a.b.writeBit(zero)
putVarbitIntFast(a.b, deltaStDiff)
}
default:
a.encodeJoint(dod, v)
if !value.IsStaleNaN(v) {
a.v = v
}
// Inline the three most common ST delta ranges to avoid the
// non-inlineable putVarbitIntFast call for typical small-jitter STs.
switch {
case deltaStDiff == 0:
a.b.writeBit(zero)
case deltaStDiff >= -3 && deltaStDiff <= 4:
a.b.writeBitsFast((0b10<<3)|(uint64(deltaStDiff)&0x7), 5)
case deltaStDiff >= -31 && deltaStDiff <= 32:
a.b.writeBitsFast((0b110<<6)|(uint64(deltaStDiff)&0x3F), 9)
case deltaStDiff >= -255 && deltaStDiff <= 256:
a.b.writeBitsFast((0b1110<<9)|(uint64(deltaStDiff)&0x1FF), 13)
default:
putVarbitIntFast(a.b, deltaStDiff)
}
}
a.stDiff = newStDiff
a.st = st
a.t = t
a.tDelta = tDelta
a.numTotal++
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
return
}
// Full slow path: firstSTChangeOn == 0 and ST may be initialised here.
a.encodeJoint(dod, v)
if a.firstSTChangeOn == 0 {
if st != a.st || a.numTotal == maxFirstSTChangeOn {
// First ST change: record prevT - st.
stDiff = a.t - st
a.firstSTChangeOn = a.numTotal
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.numTotal)
putVarbitInt(a.b, stDiff)
}
} else {
if st != a.st || a.numTotal == maxFirstSTChangeOn {
// First ST change: record prevT - st.
stDiff = a.t - st
putVarbitInt(a.b, stDiff-a.stDiff)
a.firstSTChangeOn = a.numTotal
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.numTotal)
putVarbitIntFast(a.b, stDiff)
}
}
@ -329,7 +423,12 @@ func (a *xor2Appender) encodeJoint(dod int64, v float64) {
a.b.writeBitsFast(0b11110, 5)
a.b.writeBitsFast(uint64(dod), 64)
}
a.writeVDelta(v)
// Inline the most common value-unchanged case to avoid a function call.
if math.Float64bits(v) == math.Float64bits(a.v) {
a.b.writeBit(zero)
} else {
a.writeVDelta(v)
}
}
// writeVDelta encodes the value delta for the dod≠0 case.

View File

@ -116,6 +116,29 @@ func BenchmarkXor2Read(b *testing.B) {
}
}
func requireXOR2Samples(t *testing.T, samples []triple) {
t.Helper()
chunk := NewXOR2Chunk()
app, err := chunk.Appender()
require.NoError(t, err)
for _, sample := range samples {
app.Append(sample.st, sample.t, sample.v)
}
it := chunk.Iterator(nil)
for _, want := range samples {
require.Equal(t, ValFloat, it.Next())
require.Equal(t, want.st, it.AtST())
ts, v := it.At()
require.Equal(t, want.t, ts)
require.Equal(t, want.v, v)
}
require.Equal(t, ValNone, it.Next())
require.NoError(t, it.Err())
}
func TestXOR2Basic(t *testing.T) {
c := NewXOR2Chunk()
app, err := c.Appender()
@ -268,6 +291,53 @@ func TestXOR2LargeDod(t *testing.T) {
require.Equal(t, ValNone, it.Next())
}
func TestXOR2LargeDodWithActiveST(t *testing.T) {
requireXOR2Samples(t, []triple{
{st: 0, t: 0, v: 1.0},
{st: 900, t: 1000, v: 2.0},
{st: 1000, t: 2000, v: 3.0},
{st: 1047576, t: 1050576, v: 4.0},
})
}
func TestXOR2ActiveSTFastPathBoundaries(t *testing.T) {
requireXOR2Samples(t, []triple{
{st: 0, t: 1000, v: 1.0},
{st: 1990, t: 2000, v: 1.0},
{st: 2986, t: 3000, v: 1.0},
{st: 3954, t: 4000, v: 1.0},
{st: 4698, t: 5000, v: 1.0},
})
}
func TestXOR2EncodeJointValueUnchangedThenChanged(t *testing.T) {
requireXOR2Samples(t, []triple{
{st: 0, t: 1000, v: 1.0},
{st: 0, t: 2000, v: 2.0},
{st: 0, t: 7096, v: 2.0},
{st: 0, t: 12192, v: 3.0},
})
}
func TestXOR2ConstantNonZeroSTFastPath(t *testing.T) {
requireXOR2Samples(t, []triple{
{st: 500, t: 1000, v: 1.0},
{st: 500, t: 2000, v: 2.0},
{st: 500, t: 3000, v: 2.0},
{st: 500, t: 4050, v: 2.0},
{st: 500, t: 5100, v: 3.0},
})
}
func TestXOR2ActiveSTDodZeroValueChange(t *testing.T) {
requireXOR2Samples(t, []triple{
{st: 0, t: 1000, v: 1.0},
{st: 500, t: 2000, v: 2.0},
{st: 500, t: 3000, v: 3.0}, // dod=0, value changed.
{st: 500, t: 4000, v: 4.0}, // dod=0, value changed.
})
}
func TestXOR2ChunkST(t *testing.T) {
testChunkSTHandling(t, ValFloat, func() Chunk {
return NewXOR2Chunk()