xorBuffered is now using SIMD enabled dod.Enc/Dec

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-11-21 09:15:35 +00:00
parent 7f3f8fc546
commit 09052127b8
5 changed files with 409 additions and 177 deletions

View File

@ -231,6 +231,7 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
// Fun, buffered ones! Very fast, but require more mem.
//{name: "ALPBuffered", newChunkFn: func() Chunk { return NewALPBufferedChunk() }},
{name: "XORBuffered", newChunkFn: func() Chunk { return NewXORBufferedChunk() }},
//{name: "XORClassicBuffered", newChunkFn: func() Chunk { return NewXORClassicBufferedChunk() }},
} {
for _, s := range sampleCases {
b.Run(fmt.Sprintf("fmt=%s/%s", f.name, s.name), func(b *testing.B) {

View File

@ -41,7 +41,8 @@ func TestChunk(t *testing.T) {
// Fun, buffered ones! Very fast, but require more mem.
103: func() Chunk { return NewALPBufferedChunk() },
133: func() Chunk { return NewXORBufferedChunk() },
134: func() Chunk { return NewXORBufferedChunk() },
133: func() Chunk { return NewXORClassicBufferedChunk() },
// Non-ST varbit fun attempts (less space, but a bit slower).
101: func() Chunk { return NewXORVarbitChunk() },

View File

@ -15,50 +15,30 @@ package chunkenc
import (
"encoding/binary"
"math"
"github.com/fpetkovski/tscodec-go/delta"
"github.com/fpetkovski/tscodec-go/dod"
"github.com/prometheus/prometheus/model/histogram"
)
const (
maxConstSampleLimit = 0x7FFF
)
func writeMixedHeaderSTNotConst(b []byte) {
b[0] = (b[0] & 0x7F) | 0x80
}
func writeMixedHeaderSampleNum(b []byte, numSamples uint16) {
_ = b[1]
b[0] = (b[0] & 0x80) | byte(numSamples&0x7f)
b[1] = byte(numSamples >> 7)
}
func readMixedHeader(b []byte) (isSTConst bool, numSamples uint16) {
_ = b[1]
mask := byte(0x80)
if b[0]&mask == 0 {
isSTConst = true
}
return isSTConst, uint16(b[0]&0x7f) | uint16(b[1])<<7
}
// xorBufferedChunk holds encoded sample data:
// 2B(mixedHeader), DOD(sts), DOD(ts), XOR(values)
// mixedHeader: 1b(isSTConstant), 15b(numSamples)
// 2B(tsOffset), 2B(valOffset), ?DOD(sts), DOD(ts), XOR(values)
// DOD: 8B(minVal), 2B(numSamples), 1B(bitWidth), dodValues...
type xorBufferedChunk struct {
b bstream
sts []int64
ts []int64
values []float64
buf []byte
stSet bool
}
// NewXORBufferedChunk returns a new chunk with XOR encoding.
func NewXORBufferedChunk() *xorBufferedChunk {
b := make([]byte, chunkHeaderSize, chunkAllocationSize)
b := make([]byte, 2*chunkHeaderSize, chunkAllocationSize)
return &xorBufferedChunk{
b: bstream{stream: b, count: 0},
sts: make([]int64, 0, 120),
@ -72,95 +52,46 @@ func (c *xorBufferedChunk) Reset(stream []byte) {
c.sts = c.sts[:0]
c.ts = c.ts[:0]
c.values = c.values[:0]
c.buf = c.buf[:0]
c.stSet = false
}
// Encoding returns the encoding type.
func (*xorBufferedChunk) Encoding() Encoding {
return 133
return 134
}
func numSamplesFromBytes(b []byte) int {
if len(b) == 2*chunkHeaderSize {
return 0
}
return int(binary.LittleEndian.Uint16(b[2*chunkHeaderSize+delta.Int64SizeBytes:]))
}
// Bytes returns the underlying byte slice of the chunk.
func (c *xorBufferedChunk) Bytes() []byte {
_, numSamples := readMixedHeader(c.b.bytes())
// TODO: Can we assume Bytes is only called once chunk is done?
if len(c.values) > 0 && int(numSamples) != len(c.values) {
// Encode.
writeMixedHeaderSampleNum(c.b.bytes(), uint16(len(c.values)))
// TOOD: We could prealloc much better here!
var (
stNotConst bool
prev = c.sts[0]
)
// TODO: This takes extra time, we can move it to append.
for _, st := range c.sts[1:] {
if prev != st {
stNotConst = true
break
}
prev = st
}
// TODO: Space explodes for random v, there is some bug?
if stNotConst {
writeMixedHeaderSTNotConst(c.b.bytes())
encodeDoD(&c.b, c.sts)
} else {
// Write only one value.
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, c.sts[0])] {
if len(c.values) > 0 && numSamplesFromBytes(c.b.bytes()) != len(c.values) {
c.buf = c.buf[:0]
if c.stSet {
c.buf = dod.EncodeInt64(c.buf, c.sts[:])
for _, b := range c.buf {
c.b.writeByte(b)
}
}
encodeDoD(&c.b, c.ts)
// tsOffset (from 2*chunkHeaderSize).
binary.LittleEndian.PutUint16(c.b.bytes(), uint16(len(c.buf)))
c.buf = dod.EncodeInt64(c.buf[:0], c.ts[:])
for _, b := range c.buf {
c.b.writeByte(b)
}
// valOffset (from 2*chunkHeaderSize+tsOffset).
binary.LittleEndian.PutUint16(c.b.bytes()[chunkHeaderSize:], uint16(len(c.buf)))
encodeXOR(&c.b, c.values)
}
return c.b.bytes()
}
func encodeDoD(bs *bstream, ts []int64) {
// 0
prev := ts[0]
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, prev)] {
bs.writeByte(b)
}
// 1
if len(ts) == 1 {
return
}
// TODO: For timestamp (this for both) this could be uvarint, optimize later if needed..
prevDelta := ts[1] - prev
prev = ts[1]
for _, b := range buf[:binary.PutVarint(buf, prevDelta)] {
bs.writeByte(b)
}
if len(ts) == 2 {
return
}
// 2
// TODO: So much more could be optimized here for STs (e.g. new DoD, taking TS as diff..)
for _, ts := range ts[2:] {
delta := ts - prev
dod := delta - prevDelta
putClassicVarbitInt(bs, dod)
prev = ts
prevDelta = delta
}
}
func encodeXOR(bs *bstream, values []float64) {
// 0
prev := values[0]
bs.writeBits(math.Float64bits(prev), 64)
// 1
var leading, trailing uint8
for _, v := range values[1:] {
xorWrite(bs, v, prev, &leading, &trailing)
prev = v
}
}
// NumSamples returns the number of samples in the chunk.
func (c *xorBufferedChunk) NumSamples() int {
return len(c.values)
@ -195,91 +126,40 @@ func (c *xorBufferedChunk) Iterator(_ Iterator) Iterator {
// TODO: This is yolo, ideally iterator is reused, slices are shared across iterations only etc.
// Lazy decode.
isSTConst, numSamples := readMixedHeader(c.b.bytes())
if len(c.values) == 0 && int(numSamples) != len(c.values) {
if cap(c.values) < int(numSamples) {
c.sts = make([]int64, 0, int(numSamples))
c.ts = make([]int64, 0, int(numSamples))
c.values = make([]float64, 0, int(numSamples))
}
br := newBReader(c.b.bytes()[chunkHeaderSize:])
if isSTConst {
st, err := binary.ReadVarint(&br)
if err != nil {
return &xorBufferedtIterator{err: err}
numSamples := numSamplesFromBytes(c.b.bytes())
if len(c.values) == 0 && numSamples != len(c.values) {
tsOffset := binary.LittleEndian.Uint16(c.b.bytes())
valOffset := binary.LittleEndian.Uint16(c.b.bytes()[chunkHeaderSize:])
if cap(c.values) < numSamples {
if tsOffset > 0 {
// tsOffset != 0 means st no zero.
c.sts = make([]int64, 0, numSamples)
}
c.sts = append(c.sts, st)
c.ts = make([]int64, 0, numSamples)
c.values = make([]float64, 0, numSamples)
}
if tsOffset == 0 {
c.sts = c.sts[:1]
c.sts[0] = 0
} else {
if err := decodeDoD(&br, int(numSamples), &c.sts); err != nil {
return &xorBufferedtIterator{err: err}
}
c.sts = c.sts[:numSamples]
// TODO: Optimize decode to give one ST for const.
dod.DecodeInt64(c.sts, c.b.bytes()[2*chunkHeaderSize:])
}
if err := decodeDoD(&br, int(numSamples), &c.ts); err != nil {
return &xorBufferedtIterator{err: err}
}
if err := decodeXOR(&br, int(numSamples), &c.values); err != nil {
c.ts = c.ts[:numSamples]
dod.DecodeInt64(c.ts, c.b.bytes()[2*chunkHeaderSize+tsOffset:])
br := newBReader(c.b.bytes()[2*chunkHeaderSize+tsOffset+valOffset:])
if err := decodeXOR(&br, numSamples, &c.values); err != nil {
return &xorBufferedtIterator{err: err}
}
}
return &xorBufferedtIterator{c: c, curr: -1}
}
func decodeDoD(br *bstreamReader, numSamples int, ts *[]int64) error {
// 0
curr, err := binary.ReadVarint(br)
if err != nil {
return err
}
*ts = append(*ts, curr)
if len(*ts) == numSamples {
return nil
}
// 1
currDelta, err := binary.ReadVarint(br)
if err != nil {
return err
}
curr += currDelta
*ts = append(*ts, curr)
// 2
for len(*ts) < numSamples {
dod, err := readClassicVarbitInt(br)
if err != nil {
return err
}
currDelta += dod
curr += currDelta
*ts = append(*ts, curr)
}
return nil
}
func decodeXOR(br *bstreamReader, numSamples int, values *[]float64) error {
// 0
v, err := br.readBits(64)
if err != nil {
return err
}
curr := math.Float64frombits(v)
*values = append(*values, curr)
if len(*values) == numSamples {
return nil
}
// 1
var leading, trailing uint8
for len(*values) < numSamples {
err := xorRead(br, &curr, &leading, &trailing)
if err != nil {
return err
}
*values = append(*values, curr)
}
return nil
}
func (c *xorBufferedChunk) Append(st, t int64, v float64) {
if !c.stSet {
c.stSet = st != 0
}
c.sts = append(c.sts, st)
c.ts = append(c.ts, t)
c.values = append(c.values, v)

View File

@ -0,0 +1,350 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"encoding/binary"
"math"
"github.com/prometheus/prometheus/model/histogram"
)
const (
maxConstSampleLimit = 0x7FFF
)
func writeMixedHeaderSTNotConst(b []byte) {
b[0] = (b[0] & 0x7F) | 0x80
}
func writeMixedHeaderSampleNum(b []byte, numSamples uint16) {
_ = b[1]
b[0] = (b[0] & 0x80) | byte(numSamples&0x7f)
b[1] = byte(numSamples >> 7)
}
func readMixedHeader(b []byte) (isSTConst bool, numSamples uint16) {
_ = b[1]
mask := byte(0x80)
if b[0]&mask == 0 {
isSTConst = true
}
return isSTConst, uint16(b[0]&0x7f) | uint16(b[1])<<7
}
// xorClassicBufferedChunk holds encoded sample data:
// 2B(mixedHeader), DOD(sts), DOD(ts), XOR(values)
// mixedHeader: 1b(isSTConstant), 15b(numSamples)
type xorClassicBufferedChunk struct {
b bstream
sts []int64
ts []int64
values []float64
}
// NewXORClassicBufferedChunk returns a new chunk with XOR encoding.
func NewXORClassicBufferedChunk() *xorClassicBufferedChunk {
b := make([]byte, chunkHeaderSize, chunkAllocationSize)
return &xorClassicBufferedChunk{
b: bstream{stream: b, count: 0},
sts: make([]int64, 0, 120),
ts: make([]int64, 0, 120),
values: make([]float64, 0, 120),
}
}
func (c *xorClassicBufferedChunk) Reset(stream []byte) {
c.b.Reset(stream)
c.sts = c.sts[:0]
c.ts = c.ts[:0]
c.values = c.values[:0]
}
// Encoding returns the encoding type.
func (*xorClassicBufferedChunk) Encoding() Encoding {
return 133
}
// Bytes returns the underlying byte slice of the chunk.
func (c *xorClassicBufferedChunk) Bytes() []byte {
_, numSamples := readMixedHeader(c.b.bytes())
// TODO: Can we assume Bytes is only called once chunk is done?
if len(c.values) > 0 && int(numSamples) != len(c.values) {
// Encode.
writeMixedHeaderSampleNum(c.b.bytes(), uint16(len(c.values)))
// TOOD: We could prealloc much better here!
var (
stNotConst bool
prev = c.sts[0]
)
// TODO: This takes extra time, we can move it to append.
for _, st := range c.sts[1:] {
if prev != st {
stNotConst = true
break
}
prev = st
}
// TODO: Space explodes for random v, there is some bug?
if stNotConst {
writeMixedHeaderSTNotConst(c.b.bytes())
encodeDoD(&c.b, c.sts)
} else {
// Write only one value.
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, c.sts[0])] {
c.b.writeByte(b)
}
}
encodeDoD(&c.b, c.ts)
encodeXOR(&c.b, c.values)
}
return c.b.bytes()
}
func encodeDoD(bs *bstream, ts []int64) {
// 0
prev := ts[0]
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, prev)] {
bs.writeByte(b)
}
// 1
if len(ts) == 1 {
return
}
// TODO: For timestamp (this for both) this could be uvarint, optimize later if needed..
prevDelta := ts[1] - prev
prev = ts[1]
for _, b := range buf[:binary.PutVarint(buf, prevDelta)] {
bs.writeByte(b)
}
if len(ts) == 2 {
return
}
// 2
// TODO: So much more could be optimized here for STs (e.g. new DoD, taking TS as diff..)
for _, ts := range ts[2:] {
delta := ts - prev
dod := delta - prevDelta
putClassicVarbitInt(bs, dod)
prev = ts
prevDelta = delta
}
}
func encodeXOR(bs *bstream, values []float64) {
// 0
prev := values[0]
bs.writeBits(math.Float64bits(prev), 64)
// 1
var leading, trailing uint8
for _, v := range values[1:] {
xorWrite(bs, v, prev, &leading, &trailing)
prev = v
}
}
// NumSamples returns the number of samples in the chunk.
func (c *xorClassicBufferedChunk) NumSamples() int {
return len(c.values)
}
// Compact implements the Chunk interface.
func (c *xorClassicBufferedChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
buf := make([]byte, l)
copy(buf, c.b.stream)
c.b.stream = buf
}
}
func (c *xorClassicBufferedChunk) Appender() (Appender, error) {
a, err := c.AppenderV2()
return &compactAppender{AppenderV2: a}, err
}
// AppenderV2 implements the Chunk interface.
// It is not valid to call AppenderV2() multiple times concurrently or to use multiple
// Appenders on the same chunk.
func (c *xorClassicBufferedChunk) AppenderV2() (AppenderV2, error) {
return c, nil
}
// Iterator implements the Chunk interface.
// Iterator() must not be called concurrently with any modifications to the chunk,
// but after it returns you can use an Iterator concurrently with an Appender or
// other Iterators.
func (c *xorClassicBufferedChunk) Iterator(_ Iterator) Iterator {
// TODO: This is yolo, ideally iterator is reused, slices are shared across iterations only etc.
// Lazy decode.
isSTConst, numSamples := readMixedHeader(c.b.bytes())
if len(c.values) == 0 && int(numSamples) != len(c.values) {
if cap(c.values) < int(numSamples) {
c.sts = make([]int64, 0, int(numSamples))
c.ts = make([]int64, 0, int(numSamples))
c.values = make([]float64, 0, int(numSamples))
}
br := newBReader(c.b.bytes()[chunkHeaderSize:])
if isSTConst {
st, err := binary.ReadVarint(&br)
if err != nil {
return &xorClassicBufferedtIterator{err: err}
}
c.sts = append(c.sts, st)
} else {
if err := decodeDoD(&br, int(numSamples), &c.sts); err != nil {
return &xorClassicBufferedtIterator{err: err}
}
}
if err := decodeDoD(&br, int(numSamples), &c.ts); err != nil {
return &xorClassicBufferedtIterator{err: err}
}
if err := decodeXOR(&br, int(numSamples), &c.values); err != nil {
return &xorClassicBufferedtIterator{err: err}
}
}
return &xorClassicBufferedtIterator{c: c, curr: -1}
}
func decodeDoD(br *bstreamReader, numSamples int, ts *[]int64) error {
// 0
curr, err := binary.ReadVarint(br)
if err != nil {
return err
}
*ts = append(*ts, curr)
if len(*ts) == numSamples {
return nil
}
// 1
currDelta, err := binary.ReadVarint(br)
if err != nil {
return err
}
curr += currDelta
*ts = append(*ts, curr)
// 2
for len(*ts) < numSamples {
dod, err := readClassicVarbitInt(br)
if err != nil {
return err
}
currDelta += dod
curr += currDelta
*ts = append(*ts, curr)
}
return nil
}
func decodeXOR(br *bstreamReader, numSamples int, values *[]float64) error {
// 0
v, err := br.readBits(64)
if err != nil {
return err
}
curr := math.Float64frombits(v)
*values = append(*values, curr)
if len(*values) == numSamples {
return nil
}
// 1
var leading, trailing uint8
for len(*values) < numSamples {
err := xorRead(br, &curr, &leading, &trailing)
if err != nil {
return err
}
*values = append(*values, curr)
}
return nil
}
func (c *xorClassicBufferedChunk) Append(st, t int64, v float64) {
c.sts = append(c.sts, st)
c.ts = append(c.ts, t)
c.values = append(c.values, v)
}
func (*xorClassicBufferedChunk) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float chunk")
}
func (*xorClassicBufferedChunk) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a float chunk")
}
type xorClassicBufferedtIterator struct {
c *xorClassicBufferedChunk
curr int
err error
}
func (it *xorClassicBufferedtIterator) Seek(t int64) ValueType {
if it.curr >= len(it.c.values) {
return ValNone
}
for it.curr == -1 || t > it.c.ts[it.curr] {
if it.Next() == ValNone {
return ValNone
}
}
return ValFloat
}
func (it *xorClassicBufferedtIterator) At() (int64, float64) {
return it.c.ts[it.curr], it.c.values[it.curr]
}
func (*xorClassicBufferedtIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
panic("cannot call xorIterator.AtHistogram")
}
func (*xorClassicBufferedtIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
panic("cannot call xorIterator.AtFloatHistogram")
}
func (it *xorClassicBufferedtIterator) AtT() int64 {
return it.c.ts[it.curr]
}
func (it *xorClassicBufferedtIterator) AtST() int64 {
if it.curr >= len(it.c.sts) {
// Const case.
return it.c.sts[0]
}
return it.c.sts[it.curr]
}
func (it *xorClassicBufferedtIterator) Err() error {
return it.err
}
func (it *xorClassicBufferedtIterator) Next() ValueType {
if it.curr+1 >= len(it.c.values) {
return ValNone
}
it.curr++
return ValFloat
}