diff --git a/tsdb/chunkenc/varbit.go b/tsdb/chunkenc/varbit.go index 4338555328..a137e5882b 100644 --- a/tsdb/chunkenc/varbit.go +++ b/tsdb/chunkenc/varbit.go @@ -30,6 +30,8 @@ import ( // the size by around 1%. A more detailed study would be needed for precise // values, but it's appears quite certain that we would end up far below 10%, // which would maybe convince us to invest the increased coding/decoding cost. +// +// TODO(XOR2): Once XOR2 is stable, merge putVarbitInt and putVarbitIntFast. func putVarbitInt(b *bstream, val int64) { switch { case val == 0: // Precisely 0, needs 1 bit. @@ -61,6 +63,36 @@ func putVarbitInt(b *bstream, val int64) { } } +// putVarbitIntFast is like putVarbitInt but combines the prefix and value into +// a single writeBitsFast call per bucket, reducing bstream overhead on the hot +// path. It is used by XOR2 encoding. +// +// TODO(XOR2): Once XOR2 is stable, merge putVarbitInt and putVarbitIntFast. +func putVarbitIntFast(b *bstream, val int64) { + uval := uint64(val) + switch { + case val == 0: // Precisely 0, needs 1 bit. + b.writeBit(zero) + case bitRange(val, 3): // -3 <= val <= 4, needs 5 bits. + b.writeBitsFast((0b10<<3)|(uval&0x7), 5) + case bitRange(val, 6): // -31 <= val <= 32, 9 bits. + b.writeBitsFast((0b110<<6)|(uval&0x3F), 9) + case bitRange(val, 9): // -255 <= val <= 256, 13 bits. + b.writeBitsFast((0b1110<<9)|(uval&0x1FF), 13) + case bitRange(val, 12): // -2047 <= val <= 2048, 17 bits. + b.writeBitsFast((0b11110<<12)|(uval&0xFFF), 17) + case bitRange(val, 18): // -131071 <= val <= 131072, 3 bytes. + b.writeBitsFast((0b111110<<18)|(uval&0x3FFFF), 24) + case bitRange(val, 25): // -16777215 <= val <= 16777216, 4 bytes. + b.writeBitsFast((0b1111110<<25)|(uval&0x1FFFFFF), 32) + case bitRange(val, 56): // -36028797018963967 <= val <= 36028797018963968, 8 bytes. + b.writeBitsFast((0b11111110<<56)|(uval&0xFFFFFFFFFFFFFF), 64) + default: + b.writeBitsFast(0b11111111, 8) // Worst case, needs 9 bytes. + b.writeBitsFast(uval, 64) + } +} + // readVarbitInt reads an int64 encoded with putVarbitInt. func readVarbitInt(b *bstreamReader) (int64, error) { var d byte diff --git a/tsdb/chunkenc/varbit_test.go b/tsdb/chunkenc/varbit_test.go index dcb43f08df..b0c776bc47 100644 --- a/tsdb/chunkenc/varbit_test.go +++ b/tsdb/chunkenc/varbit_test.go @@ -20,8 +20,8 @@ import ( "github.com/stretchr/testify/require" ) -func TestVarbitInt(t *testing.T) { - numbers := []int64{ +func testVarbitIntBoundaryValues() []int64 { + return []int64{ math.MinInt64, -36028797018963968, -36028797018963967, -16777216, -16777215, @@ -40,6 +40,10 @@ func TestVarbitInt(t *testing.T) { 36028797018963968, 36028797018963969, math.MaxInt64, } +} + +func TestVarbitInt(t *testing.T) { + numbers := testVarbitIntBoundaryValues() bs := bstream{} @@ -56,6 +60,36 @@ func TestVarbitInt(t *testing.T) { } } +func TestVarbitIntFast(t *testing.T) { + numbers := testVarbitIntBoundaryValues() + + bs := bstream{} + + for _, n := range numbers { + putVarbitIntFast(&bs, n) + } + + bsr := newBReader(bs.bytes()) + + for _, want := range numbers { + got, err := readVarbitInt(&bsr) + require.NoError(t, err) + require.Equal(t, want, got) + } +} + +func TestVarbitIntAndFastProduceIdenticalOutput(t *testing.T) { + numbers := testVarbitIntBoundaryValues() + + var slow, fast bstream + for _, n := range numbers { + putVarbitInt(&slow, n) + putVarbitIntFast(&fast, n) + } + + require.Equal(t, slow.bytes(), fast.bytes()) +} + func TestVarbitUint(t *testing.T) { numbers := []uint64{ 0, 1, diff --git a/tsdb/chunkenc/xor2.go b/tsdb/chunkenc/xor2.go index defe1e8102..d25cf4ca4a 100644 --- a/tsdb/chunkenc/xor2.go +++ b/tsdb/chunkenc/xor2.go @@ -51,7 +51,7 @@ // chunk has no additional bits in it. // // When ST is present, the ST delta (prevT - st) is appended after each -// sample's joint timestamp+value encoding using putVarbitInt. +// sample's joint timestamp+value encoding using putVarbitIntFast. package chunkenc @@ -194,7 +194,7 @@ func (c *XOR2Chunk) Iterator(it Iterator) Iterator { // xor2Appender appends samples with optional start timestamps using // the XOR2 joint control bit encoding for regular timestamp and value, -// and putVarbitInt for the start timestamp delta. +// and putVarbitIntFast for the start timestamp delta. type xor2Appender struct { b *bstream @@ -248,40 +248,134 @@ func (a *xor2Appender) Append(st, t int64, v float64) { stDiff = a.t - st a.firstSTChangeOn = 1 writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], 1) - putVarbitInt(a.b, stDiff) + putVarbitIntFast(a.b, stDiff) } default: tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta) - // Fast path: no ST involvement at all. - if st == 0 && a.numTotal != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown { - a.encodeJoint(dod, v) - a.t = t - if !value.IsStaleNaN(v) { - a.v = v + // Fast path: no new ST data to write for this sample. + // Covers: ST never seen (st=0 always), or ST recorded initially but unchanged. + // Must use the slow path at maxFirstSTChangeOn so the header remains valid + // even if ST changes on a later sample (index > maxFirstSTChangeOn). + if a.firstSTChangeOn == 0 && st == a.st && a.numTotal != maxFirstSTChangeOn { + vbits := math.Float64bits(v) + switch { + case dod == 0 && vbits == math.Float64bits(a.v): + // Unchanged value and timestamp: write a single 0 bit. + // This is the most common case for stable metrics. + // a.v stays correct (v == a.v), so no update needed. + a.b.writeBit(zero) + case dod >= -(1<<12) && dod <= (1<<12)-1 && vbits == math.Float64bits(a.v): + // 13-bit dod, value unchanged: the most common case for metrics with + // small timestamp jitter. Inline both bytes and the zero value bit to + // avoid calling encodeJoint and writeVDelta. + a.b.writeByte(0b110_00000 | byte(uint64(dod)>>8)&0x1F) + a.b.writeByte(byte(uint64(dod))) + a.b.writeBit(zero) + default: + a.encodeJoint(dod, v) + if !value.IsStaleNaN(v) { + a.v = v + } } + a.t = t a.tDelta = tDelta a.numTotal++ binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) return } - // Slow path: ST may be involved. + // Active-ST fast path: firstSTChangeOn is set, so every sample needs a + // per-sample ST delta. Inline T+V encoding and the zero-delta ST case to + // avoid two non-inlined function calls (encodeJoint + putVarbitIntFast). + if a.firstSTChangeOn > 0 { + newStDiff := a.t - st + deltaStDiff := newStDiff - a.stDiff + vbits := math.Float64bits(v) + switch { + case dod == 0 && vbits == math.Float64bits(a.v): + // T/V: single 0 bit (dod=0, value unchanged). For non-zero ST deltas + // we fuse this bit with the ST delta write into a single writeBitsFast + // call, saving a non-inlined writeBit call. For deltaStDiff=0 we use + // two writeBit calls because writeBit has a smaller body than + // writeBitsFast, making it faster for writing just 1 bit. + switch { + case deltaStDiff == 0: + a.b.writeBit(zero) + a.b.writeBit(zero) + case deltaStDiff >= -3 && deltaStDiff <= 4: + // 0 (T/V) + 5-bit ST = 6 bits. + a.b.writeBitsFast((0b10<<3)|(uint64(deltaStDiff)&0x7), 6) + case deltaStDiff >= -31 && deltaStDiff <= 32: + // 0 (T/V) + 9-bit ST = 10 bits. + a.b.writeBitsFast((0b110<<6)|(uint64(deltaStDiff)&0x3F), 10) + case deltaStDiff >= -255 && deltaStDiff <= 256: + // 0 (T/V) + 13-bit ST = 14 bits. + a.b.writeBitsFast((0b1110<<9)|(uint64(deltaStDiff)&0x1FF), 14) + default: + a.b.writeBit(zero) + putVarbitIntFast(a.b, deltaStDiff) + } + case dod >= -(1<<12) && dod <= (1<<12)-1 && vbits == math.Float64bits(a.v): + a.b.writeByte(0b110_00000 | byte(uint64(dod)>>8)&0x1F) + a.b.writeByte(byte(uint64(dod))) + // T/V ends with a 0 bit (value unchanged indicator). Fuse it with + // non-zero ST deltas to save a writeBit call; for deltaStDiff=0 keep + // two cheap writeBit calls (faster than one writeBitsFast for 2 bits). + switch { + case deltaStDiff == 0: + a.b.writeBit(zero) + a.b.writeBit(zero) + case deltaStDiff >= -3 && deltaStDiff <= 4: + a.b.writeBitsFast((0b10<<3)|(uint64(deltaStDiff)&0x7), 6) + case deltaStDiff >= -31 && deltaStDiff <= 32: + a.b.writeBitsFast((0b110<<6)|(uint64(deltaStDiff)&0x3F), 10) + case deltaStDiff >= -255 && deltaStDiff <= 256: + a.b.writeBitsFast((0b1110<<9)|(uint64(deltaStDiff)&0x1FF), 14) + default: + a.b.writeBit(zero) + putVarbitIntFast(a.b, deltaStDiff) + } + default: + a.encodeJoint(dod, v) + if !value.IsStaleNaN(v) { + a.v = v + } + // Inline the three most common ST delta ranges to avoid the + // non-inlineable putVarbitIntFast call for typical small-jitter STs. + switch { + case deltaStDiff == 0: + a.b.writeBit(zero) + case deltaStDiff >= -3 && deltaStDiff <= 4: + a.b.writeBitsFast((0b10<<3)|(uint64(deltaStDiff)&0x7), 5) + case deltaStDiff >= -31 && deltaStDiff <= 32: + a.b.writeBitsFast((0b110<<6)|(uint64(deltaStDiff)&0x3F), 9) + case deltaStDiff >= -255 && deltaStDiff <= 256: + a.b.writeBitsFast((0b1110<<9)|(uint64(deltaStDiff)&0x1FF), 13) + default: + putVarbitIntFast(a.b, deltaStDiff) + } + } + a.stDiff = newStDiff + a.st = st + a.t = t + a.tDelta = tDelta + a.numTotal++ + binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) + return + } + + // Full slow path: firstSTChangeOn == 0 and ST may be initialised here. a.encodeJoint(dod, v) - if a.firstSTChangeOn == 0 { - if st != a.st || a.numTotal == maxFirstSTChangeOn { - // First ST change: record prevT - st. - stDiff = a.t - st - a.firstSTChangeOn = a.numTotal - writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.numTotal) - putVarbitInt(a.b, stDiff) - } - } else { + if st != a.st || a.numTotal == maxFirstSTChangeOn { + // First ST change: record prevT - st. stDiff = a.t - st - putVarbitInt(a.b, stDiff-a.stDiff) + a.firstSTChangeOn = a.numTotal + writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.numTotal) + putVarbitIntFast(a.b, stDiff) } } @@ -329,7 +423,12 @@ func (a *xor2Appender) encodeJoint(dod int64, v float64) { a.b.writeBitsFast(0b11110, 5) a.b.writeBitsFast(uint64(dod), 64) } - a.writeVDelta(v) + // Inline the most common value-unchanged case to avoid a function call. + if math.Float64bits(v) == math.Float64bits(a.v) { + a.b.writeBit(zero) + } else { + a.writeVDelta(v) + } } // writeVDelta encodes the value delta for the dod≠0 case. diff --git a/tsdb/chunkenc/xor2_test.go b/tsdb/chunkenc/xor2_test.go index c0c1af8a1b..c6cb35d99f 100644 --- a/tsdb/chunkenc/xor2_test.go +++ b/tsdb/chunkenc/xor2_test.go @@ -116,6 +116,29 @@ func BenchmarkXor2Read(b *testing.B) { } } +func requireXOR2Samples(t *testing.T, samples []triple) { + t.Helper() + + chunk := NewXOR2Chunk() + app, err := chunk.Appender() + require.NoError(t, err) + + for _, sample := range samples { + app.Append(sample.st, sample.t, sample.v) + } + + it := chunk.Iterator(nil) + for _, want := range samples { + require.Equal(t, ValFloat, it.Next()) + require.Equal(t, want.st, it.AtST()) + ts, v := it.At() + require.Equal(t, want.t, ts) + require.Equal(t, want.v, v) + } + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) +} + func TestXOR2Basic(t *testing.T) { c := NewXOR2Chunk() app, err := c.Appender() @@ -268,6 +291,53 @@ func TestXOR2LargeDod(t *testing.T) { require.Equal(t, ValNone, it.Next()) } +func TestXOR2LargeDodWithActiveST(t *testing.T) { + requireXOR2Samples(t, []triple{ + {st: 0, t: 0, v: 1.0}, + {st: 900, t: 1000, v: 2.0}, + {st: 1000, t: 2000, v: 3.0}, + {st: 1047576, t: 1050576, v: 4.0}, + }) +} + +func TestXOR2ActiveSTFastPathBoundaries(t *testing.T) { + requireXOR2Samples(t, []triple{ + {st: 0, t: 1000, v: 1.0}, + {st: 1990, t: 2000, v: 1.0}, + {st: 2986, t: 3000, v: 1.0}, + {st: 3954, t: 4000, v: 1.0}, + {st: 4698, t: 5000, v: 1.0}, + }) +} + +func TestXOR2EncodeJointValueUnchangedThenChanged(t *testing.T) { + requireXOR2Samples(t, []triple{ + {st: 0, t: 1000, v: 1.0}, + {st: 0, t: 2000, v: 2.0}, + {st: 0, t: 7096, v: 2.0}, + {st: 0, t: 12192, v: 3.0}, + }) +} + +func TestXOR2ConstantNonZeroSTFastPath(t *testing.T) { + requireXOR2Samples(t, []triple{ + {st: 500, t: 1000, v: 1.0}, + {st: 500, t: 2000, v: 2.0}, + {st: 500, t: 3000, v: 2.0}, + {st: 500, t: 4050, v: 2.0}, + {st: 500, t: 5100, v: 3.0}, + }) +} + +func TestXOR2ActiveSTDodZeroValueChange(t *testing.T) { + requireXOR2Samples(t, []triple{ + {st: 0, t: 1000, v: 1.0}, + {st: 500, t: 2000, v: 2.0}, + {st: 500, t: 3000, v: 3.0}, // dod=0, value changed. + {st: 500, t: 4000, v: 4.0}, // dod=0, value changed. + }) +} + func TestXOR2ChunkST(t *testing.T) { testChunkSTHandling(t, ValFloat, func() Chunk { return NewXOR2Chunk()