mirror of
https://github.com/prometheus/prometheus.git
synced 2025-12-02 16:11:02 +01:00
xorOptST optimizations
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
09052127b8
commit
ea7c4de920
@ -216,10 +216,11 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
|
||||
|
||||
for _, f := range []fmtCase{
|
||||
// Reference.
|
||||
{name: "XOR (ST ignored)", newChunkFn: func() Chunk { return NewXORChunk() }},
|
||||
//{name: "XOR (ST ignored)", newChunkFn: func() Chunk { return NewXORChunk() }},
|
||||
|
||||
// Most tempting one.
|
||||
{name: "XOROptST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }},
|
||||
{name: "XOROptST(v3)", newChunkFn: func() Chunk { return NewXOROptSTChunk() }},
|
||||
//{name: "XOROptST2", newChunkFn: func() Chunk { return NewXOROptST2Chunk() }},
|
||||
|
||||
// Slow naive ST implementation.
|
||||
//{name: "XORSTNaive", newChunkFn: func() Chunk { return NewXORSTNaiveChunk() }},
|
||||
@ -230,10 +231,14 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
|
||||
|
||||
// Fun, buffered ones! Very fast, but require more mem.
|
||||
//{name: "ALPBuffered", newChunkFn: func() Chunk { return NewALPBufferedChunk() }},
|
||||
{name: "XORBuffered", newChunkFn: func() Chunk { return NewXORBufferedChunk() }},
|
||||
//{name: "XORBuffered", newChunkFn: func() Chunk { return NewXORBufferedChunk() }},
|
||||
//{name: "XORClassicBuffered", newChunkFn: func() Chunk { return NewXORClassicBufferedChunk() }},
|
||||
} {
|
||||
for _, s := range sampleCases {
|
||||
if s.name != "vt=constant/st=0" {
|
||||
continue // PROFILE DEBUG KILL
|
||||
}
|
||||
|
||||
b.Run(fmt.Sprintf("fmt=%s/%s", f.name, s.name), func(b *testing.B) {
|
||||
fn(b, f, s)
|
||||
})
|
||||
@ -284,16 +289,22 @@ type supportsAppenderV2 interface {
|
||||
}
|
||||
|
||||
/*
|
||||
export bench=bw.bench/iter.all && go test \
|
||||
export bench=bw.bench/iter.all3 && go test \
|
||||
-run '^$' -bench '^BenchmarkIterator' \
|
||||
-benchtime 1s -count 6 -cpu 2 -timeout 999m \
|
||||
| tee ${bench}.txt
|
||||
|
||||
export bench=bw.bench/iter.xoroptst && go test \
|
||||
-run '^$' -bench '^BenchmarkIterator' \
|
||||
-benchtime 1s -count 1 -cpu 2 -timeout 999m \
|
||||
-benchtime 1000000x -count 1 -cpu 2 -timeout 999m \
|
||||
-cpuprofile=${bench}.cpu.pprof \
|
||||
| tee ${bench}.txt
|
||||
|
||||
export bench=bw.bench/iter.xoroptst && go test \
|
||||
-run '^$' -bench '^BenchmarkIterator' \
|
||||
-benchtime 1000000x -count 1 -cpu 2 -timeout 999m \
|
||||
-memprofile=${bench}.cpu.pprof \
|
||||
| tee ${bench}.txt
|
||||
*/
|
||||
func BenchmarkIterator(b *testing.B) {
|
||||
foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) {
|
||||
@ -306,7 +317,6 @@ func BenchmarkIterator(b *testing.B) {
|
||||
return math.Abs(a-b) < 1e-6
|
||||
}
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
|
||||
c := f.newChunkFn()
|
||||
|
||||
@ -35,6 +35,7 @@ func TestChunk(t *testing.T) {
|
||||
|
||||
// Most tempting one.
|
||||
EncXOROptST: func() Chunk { return NewXOROptSTChunk() },
|
||||
199: func() Chunk { return NewXOROptST2Chunk() },
|
||||
|
||||
// Slow naive ST implementation.
|
||||
102: func() Chunk { return NewXORSTNaiveChunk() },
|
||||
|
||||
118
tsdb/chunkenc/varbit_st.go
Normal file
118
tsdb/chunkenc/varbit_st.go
Normal file
@ -0,0 +1,118 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import "fmt"
|
||||
|
||||
// putSTVarbitInt writes an int64 using varbit encoding with a bit bucketing
|
||||
// optimized for ST.
|
||||
func putSTVarbitInt(b *bstream, val int64) {
|
||||
switch {
|
||||
case val == 0:
|
||||
b.writeBit(zero)
|
||||
case bitRange(val, 6): // -31 <= val <= 32, needs 6+2 bits (1B).
|
||||
b.writeByte(0b10<<6 | (uint8(val) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod.
|
||||
// Below simpler form is somehow 10-20% slower!
|
||||
// b.writeBits(0b10, 2)
|
||||
// b.writeBits(uint64(val), 6)
|
||||
case bitRange(val, 8): // -127 <= val <= 128, needs 8+3 bits (1B 3b).
|
||||
b.writeByte(0b110<<5 | (uint8(val>>3) & (1<<5 - 1))) // 0b1110 size code combined with 5 bits of dod.
|
||||
b.writeBits(uint64(val), 3) // Bottom 3 bits.
|
||||
// Below simpler form is somehow 10-20% slower!
|
||||
// b.writeBits(0b110, 3)
|
||||
// b.writeBits(uint64(val), 8)
|
||||
case bitRange(val, 14):
|
||||
b.writeBits(0b1110, 4)
|
||||
b.writeBits(uint64(val), 14)
|
||||
|
||||
// TODO(bwplotka): In previous form it would be as follows, is there any difference?
|
||||
// b.writeByte(0b1110<<4 | (uint8(val>>10) & (1<<4 - 1))) // 0b1110 size code combined with 4 bits of dod.
|
||||
// b.writeBits(uint64(val), 10) // Bottom 10 bits of dod.
|
||||
case bitRange(val, 17):
|
||||
b.writeBits(0b11110, 5)
|
||||
b.writeBits(uint64(val), 17)
|
||||
case bitRange(val, 20):
|
||||
b.writeBits(0b111110, 6)
|
||||
b.writeBits(uint64(val), 20)
|
||||
default:
|
||||
b.writeBits(0b111111, 6)
|
||||
b.writeBits(uint64(val), 64)
|
||||
}
|
||||
}
|
||||
|
||||
// readSTVarbitInt reads an int64 encoded with putSTVarbitInt.
|
||||
func readSTVarbitInt(b *bstreamReader) (int64, error) {
|
||||
var (
|
||||
d byte
|
||||
sz uint8
|
||||
val int64
|
||||
)
|
||||
for range 6 {
|
||||
d <<= 1
|
||||
bit, err := b.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = b.readBit()
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if bit == zero {
|
||||
break
|
||||
}
|
||||
d |= 1
|
||||
}
|
||||
|
||||
switch d {
|
||||
case 0b0:
|
||||
// dod == 0
|
||||
case 0b10:
|
||||
sz = 6
|
||||
case 0b110:
|
||||
sz = 8
|
||||
case 0b1110:
|
||||
sz = 14
|
||||
case 0b11110:
|
||||
sz = 17
|
||||
case 0b111110:
|
||||
sz = 20
|
||||
case 0b111111:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := b.readBits(64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
val = int64(bits)
|
||||
default:
|
||||
return 0, fmt.Errorf("invalid bit pattern %b", d)
|
||||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := b.readBitsFast(sz)
|
||||
if err != nil {
|
||||
bits, err = b.readBits(sz)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Account for negative numbers, which come back as high unsigned numbers.
|
||||
// See docs/bstream.md.
|
||||
if bits > (1 << (sz - 1)) {
|
||||
bits -= 1 << sz
|
||||
}
|
||||
val = int64(bits)
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
57
tsdb/chunkenc/varbit_st_test.go
Normal file
57
tsdb/chunkenc/varbit_st_test.go
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSTVarbitInt(t *testing.T) {
|
||||
numbers := []int64{
|
||||
math.MinInt64,
|
||||
-36028797018963968, -36028797018963967,
|
||||
-16777216, -16777215,
|
||||
-131072, -131071,
|
||||
-2048, -2047,
|
||||
-256, -255,
|
||||
-32, -31,
|
||||
-4, -3,
|
||||
-1, 0, 1,
|
||||
4, 5,
|
||||
32, 33,
|
||||
256, 257,
|
||||
2048, 2049,
|
||||
131072, 131073,
|
||||
16777216, 16777217,
|
||||
36028797018963968, 36028797018963969,
|
||||
math.MaxInt64,
|
||||
}
|
||||
|
||||
bs := bstream{}
|
||||
|
||||
for _, n := range numbers {
|
||||
putSTVarbitInt(&bs, n)
|
||||
}
|
||||
|
||||
bsr := newBReader(bs.bytes())
|
||||
|
||||
for _, want := range numbers {
|
||||
got, err := readSTVarbitInt(&bsr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, want, got)
|
||||
}
|
||||
}
|
||||
@ -181,6 +181,16 @@ func (*xorOptSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, in
|
||||
panic("appended a float histogram sample to a float chunk")
|
||||
}
|
||||
|
||||
const (
|
||||
read0State uint8 = iota
|
||||
read1State
|
||||
readDoDMaybeSTState
|
||||
readDoDNoSTState
|
||||
readDoDState
|
||||
|
||||
eofState uint8 = 1<<8 - 1
|
||||
)
|
||||
|
||||
type xorOptSTtIterator struct {
|
||||
br bstreamReader
|
||||
numTotal uint16
|
||||
@ -188,6 +198,7 @@ type xorOptSTtIterator struct {
|
||||
firstSTKnown bool
|
||||
firstSTChangeOn uint16
|
||||
|
||||
state uint8
|
||||
numRead uint16
|
||||
|
||||
st, t int64
|
||||
@ -199,16 +210,14 @@ type xorOptSTtIterator struct {
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
err error
|
||||
|
||||
nextFn func() ValueType
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) Seek(t int64) ValueType {
|
||||
if it.err != nil {
|
||||
if it.state == eofState {
|
||||
return ValNone
|
||||
}
|
||||
|
||||
for t > it.t || it.numRead == 0 {
|
||||
for t > it.t || it.state == read0State {
|
||||
if it.Next() == ValNone {
|
||||
return ValNone
|
||||
}
|
||||
@ -254,7 +263,10 @@ func (it *xorOptSTtIterator) Reset(b []byte) {
|
||||
it.stDelta = 0
|
||||
it.tDelta = 0
|
||||
it.err = nil
|
||||
it.nextFn = it.initNext
|
||||
it.state = read0State
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
}
|
||||
|
||||
func (a *xorOptSTAppender) Append(st, t int64, v float64) {
|
||||
@ -419,102 +431,239 @@ func (a *xorOptSTAppender) BitProfiledAppend(p *bitProfiler[any], st, t int64, v
|
||||
}
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) Next() ValueType {
|
||||
if it.err != nil || it.numRead == it.numTotal {
|
||||
return ValNone
|
||||
}
|
||||
return it.nextFn()
|
||||
func (it *xorOptSTtIterator) retErr(err error) ValueType {
|
||||
it.err = err
|
||||
it.state = eofState
|
||||
return ValNone
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) initNext() ValueType {
|
||||
switch it.numRead {
|
||||
case 0:
|
||||
func (it *xorOptSTtIterator) Next() ValueType {
|
||||
switch it.state {
|
||||
case eofState:
|
||||
return ValNone
|
||||
case read0State:
|
||||
it.state++
|
||||
|
||||
// Optional ST read.
|
||||
if it.firstSTKnown {
|
||||
st, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
return it.retErr(err)
|
||||
}
|
||||
it.st = st
|
||||
}
|
||||
|
||||
// TS.
|
||||
t, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
return it.retErr(err)
|
||||
}
|
||||
// Value.
|
||||
v, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
it.t = t
|
||||
it.val = math.Float64frombits(v)
|
||||
|
||||
// State EOF check.
|
||||
it.numRead++
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
return ValFloat
|
||||
case 1:
|
||||
case read1State:
|
||||
it.state++
|
||||
if it.firstSTChangeOn == 0 {
|
||||
// This means we have same (zero or non-zero) ST value for the rest of
|
||||
// chunk. We can set rest of next functions to use ~classic XOR chunk iterations.
|
||||
it.nextFn = it.stAgnosticDoDNext
|
||||
// chunk. We can simply use ~classic XOR chunk iterations.
|
||||
it.state = readDoDNoSTState
|
||||
} else if it.firstSTChangeOn == 1 {
|
||||
// We got early ST change on the second sample, likely delta.
|
||||
// Continue ST rich flow from the next iteration.
|
||||
it.state = readDoDState
|
||||
|
||||
stDelta, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
return it.retErr(err)
|
||||
}
|
||||
it.stDelta = stDelta
|
||||
it.st += it.stDelta
|
||||
|
||||
// We got early ST change on the second sample, likely delta.
|
||||
// Continue ST rich flow from the next iteration.
|
||||
it.nextFn = it.stDoDNext
|
||||
}
|
||||
// TS.
|
||||
tDelta, err := binary.ReadUvarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
return it.retErr(err)
|
||||
}
|
||||
it.tDelta = tDelta
|
||||
it.t += int64(it.tDelta)
|
||||
return it.readValue()
|
||||
default:
|
||||
if it.firstSTChangeOn == it.numRead {
|
||||
it.nextFn = it.stDoDNext
|
||||
return it.stDoDNext()
|
||||
|
||||
// Value.
|
||||
if err := xorRead(&it.br, &it.val, &it.leading, &it.trailing); err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
return it.stAgnosticDoDNext()
|
||||
|
||||
// State EOF check.
|
||||
it.numRead++
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
return ValFloat
|
||||
case readDoDMaybeSTState:
|
||||
if it.firstSTChangeOn == it.numRead {
|
||||
// ST changes from this iteration, change state for future.
|
||||
it.state = readDoDState
|
||||
return it.dodNext()
|
||||
}
|
||||
return it.dodNoSTNext()
|
||||
case readDoDState:
|
||||
return it.dodNext()
|
||||
case readDoDNoSTState:
|
||||
return it.dodNoSTNext()
|
||||
default:
|
||||
panic("xorOptSTtIterator: broken machine state")
|
||||
}
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) stDoDNext() ValueType {
|
||||
sdod, err := readClassicVarbitInt(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
func (it *xorOptSTtIterator) dodNext() ValueType {
|
||||
// Inlined readClassicVarbitInt(&it.br)
|
||||
var d byte
|
||||
// read delta-of-delta
|
||||
for range 4 {
|
||||
d <<= 1
|
||||
bit, err := it.br.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = it.br.readBit()
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
if bit == zero {
|
||||
break
|
||||
}
|
||||
d |= 1
|
||||
}
|
||||
var sz uint8
|
||||
var sdod int64
|
||||
switch d {
|
||||
case 0b0:
|
||||
// dod == 0
|
||||
case 0b10:
|
||||
sz = 14
|
||||
case 0b110:
|
||||
sz = 17
|
||||
case 0b1110:
|
||||
sz = 20
|
||||
case 0b1111:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
sdod = int64(bits)
|
||||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := it.br.readBitsFast(sz)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(sz)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Account for negative numbers, which come back as high unsigned numbers.
|
||||
// See docs/bstream.md.
|
||||
if bits > (1 << (sz - 1)) {
|
||||
bits -= 1 << sz
|
||||
}
|
||||
sdod = int64(bits)
|
||||
}
|
||||
|
||||
//sdod, err := readClassicVarbitInt(&it.br)
|
||||
//if err != nil {
|
||||
// it.err = err
|
||||
// return ValNone
|
||||
//}
|
||||
it.stDelta = it.stDelta + sdod
|
||||
it.st += it.stDelta
|
||||
return it.stAgnosticDoDNext()
|
||||
return it.dodNoSTNext()
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) stAgnosticDoDNext() ValueType {
|
||||
dod, err := readClassicVarbitInt(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
func (it *xorOptSTtIterator) dodNoSTNext() ValueType {
|
||||
// Inlined readClassicVarbitInt(&it.br)
|
||||
var d byte
|
||||
// read delta-of-delta
|
||||
for range 4 {
|
||||
d <<= 1
|
||||
bit, err := it.br.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = it.br.readBit()
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
if bit == zero {
|
||||
break
|
||||
}
|
||||
d |= 1
|
||||
}
|
||||
var sz uint8
|
||||
var dod int64
|
||||
switch d {
|
||||
case 0b0:
|
||||
// dod == 0
|
||||
case 0b10:
|
||||
sz = 14
|
||||
case 0b110:
|
||||
sz = 17
|
||||
case 0b1110:
|
||||
sz = 20
|
||||
case 0b1111:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
dod = int64(bits)
|
||||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := it.br.readBitsFast(sz)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(sz)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Account for negative numbers, which come back as high unsigned numbers.
|
||||
// See docs/bstream.md.
|
||||
if bits > (1 << (sz - 1)) {
|
||||
bits -= 1 << sz
|
||||
}
|
||||
dod = int64(bits)
|
||||
}
|
||||
|
||||
//dod, err := readClassicVarbitInt(&it.br)
|
||||
//if err != nil {
|
||||
// it.err = err
|
||||
// return ValNone
|
||||
//}
|
||||
it.tDelta = uint64(int64(it.tDelta) + dod)
|
||||
it.t += int64(it.tDelta)
|
||||
return it.readValue()
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) readValue() ValueType {
|
||||
err := xorRead(&it.br, &it.val, &it.leading, &it.trailing)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
// Value.
|
||||
if err := xorRead(&it.br, &it.val, &it.leading, &it.trailing); err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
// State EOF check.
|
||||
it.numRead++
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
return ValFloat
|
||||
}
|
||||
|
||||
484
tsdb/chunkenc/xxx_xoroptst2.go
Normal file
484
tsdb/chunkenc/xxx_xoroptst2.go
Normal file
@ -0,0 +1,484 @@
|
||||
// Copyright 2025 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
)
|
||||
|
||||
// xorOptST2Chunk holds encoded sample data:
|
||||
// 2B(numSamples), 1B(stHeader), ?st(st), varint(t), xor(v), ?varuint(stDelta), varuint(tDelta), xor(v), ?stvarbitint(stDod), classicvarbitint(tDod), xor(v), ...
|
||||
// stHeader: 1b(firstSTKnown), 7b(firstSTChangeOn)
|
||||
type xorOptST2Chunk struct {
|
||||
b bstream
|
||||
}
|
||||
|
||||
// NewXOROptST2Chunk returns a new chunk with XORv2 encoding.
|
||||
func NewXOROptST2Chunk() *xorOptST2Chunk {
|
||||
b := make([]byte, chunkHeaderSize+chunkSTHeaderSize, chunkAllocationSize)
|
||||
return &xorOptST2Chunk{b: bstream{stream: b, count: 0}}
|
||||
}
|
||||
|
||||
func (c *xorOptST2Chunk) Reset(stream []byte) {
|
||||
c.b.Reset(stream)
|
||||
}
|
||||
|
||||
// Encoding returns the encoding type.
|
||||
func (*xorOptST2Chunk) Encoding() Encoding {
|
||||
return 199
|
||||
}
|
||||
|
||||
// Bytes returns the underlying byte slice of the chunk.
|
||||
func (c *xorOptST2Chunk) Bytes() []byte {
|
||||
return c.b.bytes()
|
||||
}
|
||||
|
||||
// NumSamples returns the number of samples in the chunk.
|
||||
func (c *xorOptST2Chunk) NumSamples() int {
|
||||
return int(binary.BigEndian.Uint16(c.Bytes()))
|
||||
}
|
||||
|
||||
// Compact implements the Chunk interface.
|
||||
func (c *xorOptST2Chunk) Compact() {
|
||||
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
|
||||
buf := make([]byte, l)
|
||||
copy(buf, c.b.stream)
|
||||
c.b.stream = buf
|
||||
}
|
||||
}
|
||||
|
||||
func (c *xorOptST2Chunk) Appender() (Appender, error) {
|
||||
a, err := c.AppenderV2()
|
||||
return &compactAppender{AppenderV2: a}, err
|
||||
}
|
||||
|
||||
// AppenderV2 implements the Chunk interface.
|
||||
// It is not valid to call AppenderV2() multiple times concurrently or to use multiple
|
||||
// Appenders on the same chunk.
|
||||
func (c *xorOptST2Chunk) AppenderV2() (AppenderV2, error) {
|
||||
if len(c.b.stream) == chunkHeaderSize+chunkSTHeaderSize { // Avoid allocating an Iterator when chunk is empty.
|
||||
return &xorOptST2Appender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil
|
||||
}
|
||||
it := c.iterator(nil)
|
||||
|
||||
// To get an appender we must know the state it would have if we had
|
||||
// appended all existing data from scratch.
|
||||
// We iterate through the end and populate via the iterator's state.
|
||||
for it.Next() != ValNone {
|
||||
}
|
||||
if err := it.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
a := &xorOptST2Appender{
|
||||
b: &c.b,
|
||||
st: it.st,
|
||||
t: it.t,
|
||||
v: it.val,
|
||||
stDelta: it.stDelta,
|
||||
tDelta: it.tDelta,
|
||||
leading: it.leading,
|
||||
trailing: it.trailing,
|
||||
|
||||
numTotal: it.numTotal,
|
||||
firstSTChangeOn: it.firstSTChangeOn,
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (c *xorOptST2Chunk) iterator(it Iterator) *xorOptST2tIterator {
|
||||
xorIter, ok := it.(*xorOptST2tIterator)
|
||||
if !ok {
|
||||
xorIter = &xorOptST2tIterator{}
|
||||
}
|
||||
|
||||
xorIter.Reset(c.b.bytes())
|
||||
return xorIter
|
||||
}
|
||||
|
||||
// Iterator implements the Chunk interface.
|
||||
// Iterator() must not be called concurrently with any modifications to the chunk,
|
||||
// but after it returns you can use an Iterator concurrently with an Appender or
|
||||
// other Iterators.
|
||||
func (c *xorOptST2Chunk) Iterator(it Iterator) Iterator {
|
||||
return c.iterator(it)
|
||||
}
|
||||
|
||||
type xorOptST2Appender struct {
|
||||
b *bstream
|
||||
numTotal uint16
|
||||
|
||||
firstSTChangeOn uint16
|
||||
|
||||
st, t int64
|
||||
v float64
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
|
||||
leading uint8
|
||||
trailing uint8
|
||||
}
|
||||
|
||||
func (a *xorOptST2Appender) writeVDelta(v float64) {
|
||||
xorWrite(a.b, v, a.v, &a.leading, &a.trailing)
|
||||
}
|
||||
|
||||
func (*xorOptST2Appender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
|
||||
panic("appended a histogram sample to a float chunk")
|
||||
}
|
||||
|
||||
func (*xorOptST2Appender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
|
||||
panic("appended a float histogram sample to a float chunk")
|
||||
}
|
||||
|
||||
type xorOptST2tIterator struct {
|
||||
br bstreamReader
|
||||
numTotal uint16
|
||||
|
||||
firstSTKnown bool
|
||||
firstSTChangeOn uint16
|
||||
|
||||
numRead uint16
|
||||
|
||||
st, t int64
|
||||
val float64
|
||||
|
||||
leading uint8
|
||||
trailing uint8
|
||||
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
err error
|
||||
|
||||
nextFn func() ValueType
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) Seek(t int64) ValueType {
|
||||
if it.err != nil {
|
||||
return ValNone
|
||||
}
|
||||
|
||||
for t > it.t || it.numRead == 0 {
|
||||
if it.Next() == ValNone {
|
||||
return ValNone
|
||||
}
|
||||
}
|
||||
return ValFloat
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) At() (int64, float64) {
|
||||
return it.t, it.val
|
||||
}
|
||||
|
||||
func (*xorOptST2tIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
panic("cannot call xorIterator.AtHistogram")
|
||||
}
|
||||
|
||||
func (*xorOptST2tIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
panic("cannot call xorIterator.AtFloatHistogram")
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) AtT() int64 {
|
||||
return it.t
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) AtST() int64 {
|
||||
return it.st
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) Err() error {
|
||||
return it.err
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) Reset(b []byte) {
|
||||
// We skip initial headers for actual samples.
|
||||
it.br = newBReader(b[chunkHeaderSize+chunkSTHeaderSize:])
|
||||
it.numTotal = binary.BigEndian.Uint16(b)
|
||||
it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[chunkHeaderSize:])
|
||||
it.numRead = 0
|
||||
it.st = 0
|
||||
it.t = 0
|
||||
it.val = 0
|
||||
it.leading = 0
|
||||
it.trailing = 0
|
||||
it.stDelta = 0
|
||||
it.tDelta = 0
|
||||
it.err = nil
|
||||
it.nextFn = it.initNext
|
||||
}
|
||||
|
||||
func (a *xorOptST2Appender) Append(st, t int64, v float64) {
|
||||
var (
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
stChanged bool
|
||||
)
|
||||
|
||||
switch a.numTotal {
|
||||
case 0:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
if st != 0 {
|
||||
for _, b := range buf[:binary.PutVarint(buf, st)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
writeHeaderFirstSTKnown(a.b.bytes()[chunkHeaderSize:])
|
||||
}
|
||||
|
||||
for _, b := range buf[:binary.PutVarint(buf, t)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
a.b.writeBits(math.Float64bits(v), 64)
|
||||
case 1:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
stDelta = st - a.st
|
||||
if stDelta != 0 {
|
||||
stChanged = true
|
||||
for _, b := range buf[:binary.PutVarint(buf, stDelta)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
}
|
||||
|
||||
tDelta = uint64(t - a.t)
|
||||
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
a.writeVDelta(v)
|
||||
default:
|
||||
if a.firstSTChangeOn == 0 && a.numTotal == maxFirstSTChangeOn {
|
||||
// We are at the 127th sample. firstSTChangeOn can only fit 7 bits due to a
|
||||
// single byte header constrain, which is fine, given typical 120 sample size.
|
||||
a.firstSTChangeOn = a.numTotal
|
||||
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.firstSTChangeOn)
|
||||
}
|
||||
|
||||
stDelta = st - a.st
|
||||
sdod := stDelta - a.stDelta
|
||||
if sdod != 0 || a.firstSTChangeOn != 0 {
|
||||
stChanged = true
|
||||
putClassicVarbitInt(a.b, sdod)
|
||||
}
|
||||
|
||||
tDelta = uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
putClassicVarbitInt(a.b, dod)
|
||||
a.writeVDelta(v)
|
||||
}
|
||||
|
||||
a.st = st
|
||||
a.t = t
|
||||
a.v = v
|
||||
a.tDelta = tDelta
|
||||
a.stDelta = stDelta
|
||||
|
||||
a.numTotal++
|
||||
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
|
||||
|
||||
// firstSTChangeOn == 0 indicates that we have one ST value (zero or not)
|
||||
// for all STs in the appends until now. If we see a first "update"
|
||||
// we are saving this number in the header and continue tracking all DoD (including zeros).
|
||||
if a.firstSTChangeOn == 0 && stChanged {
|
||||
a.firstSTChangeOn = a.numTotal - 1
|
||||
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.firstSTChangeOn)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *xorOptST2Appender) BitProfiledAppend(p *bitProfiler[any], st, t int64, v float64) {
|
||||
var (
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
stChanged bool
|
||||
)
|
||||
|
||||
switch a.numTotal {
|
||||
case 0:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
if st != 0 {
|
||||
p.Write(a.b, t, "st", func() {
|
||||
for _, b := range buf[:binary.PutVarint(buf, st)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
writeHeaderFirstSTKnown(a.b.bytes()[chunkHeaderSize:])
|
||||
})
|
||||
}
|
||||
p.Write(a.b, t, "t", func() {
|
||||
for _, b := range buf[:binary.PutVarint(buf, t)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
})
|
||||
p.Write(a.b, v, "v", func() {
|
||||
a.b.writeBits(math.Float64bits(v), 64)
|
||||
})
|
||||
case 1:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
stDelta = st - a.st
|
||||
if stDelta != 0 {
|
||||
stChanged = true
|
||||
p.Write(a.b, t, "stDelta", func() {
|
||||
for _, b := range buf[:binary.PutVarint(buf, stDelta)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
tDelta = uint64(t - a.t)
|
||||
p.Write(a.b, t, "tDelta", func() {
|
||||
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
})
|
||||
p.Write(a.b, v, "v", func() {
|
||||
a.writeVDelta(v)
|
||||
})
|
||||
default:
|
||||
stDelta = st - a.st
|
||||
sdod := stDelta - a.stDelta
|
||||
if sdod != 0 || a.firstSTChangeOn != 0 {
|
||||
p.Write(a.b, dodSample{t: t, tDelta: tDelta, dod: sdod}, "tDod", func() {
|
||||
stChanged = true
|
||||
putClassicVarbitInt(a.b, sdod)
|
||||
})
|
||||
}
|
||||
|
||||
tDelta = uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
p.Write(a.b, dodSample{t: t, tDelta: tDelta, dod: sdod}, "tDod", func() {
|
||||
putClassicVarbitInt(a.b, dod)
|
||||
})
|
||||
p.Write(a.b, v, "v", func() {
|
||||
a.writeVDelta(v)
|
||||
})
|
||||
}
|
||||
|
||||
a.st = st
|
||||
a.t = t
|
||||
a.v = v
|
||||
a.tDelta = tDelta
|
||||
a.stDelta = stDelta
|
||||
|
||||
a.numTotal++
|
||||
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
|
||||
|
||||
// firstSTChangeOn == 0 indicates that we have one ST value (zero or not)
|
||||
// for all STs in the appends until now. If we see a first "update" OR
|
||||
// we are at the 127th sample, we are saving this number in the header
|
||||
// and continue tracking all DoD (including zeros). 0x7F is due to a single byte
|
||||
// header constrain, which is fine, given typical 120 sample size.
|
||||
if a.firstSTChangeOn == 0 && (stChanged || a.numTotal > 0x7F) {
|
||||
a.firstSTChangeOn = a.numTotal - 1
|
||||
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.firstSTChangeOn)
|
||||
}
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) Next() ValueType {
|
||||
if it.err != nil || it.numRead == it.numTotal {
|
||||
return ValNone
|
||||
}
|
||||
return it.nextFn()
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) initNext() ValueType {
|
||||
switch it.numRead {
|
||||
case 0:
|
||||
if it.firstSTKnown {
|
||||
st, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
it.st = st
|
||||
}
|
||||
t, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
v, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
it.t = t
|
||||
it.val = math.Float64frombits(v)
|
||||
|
||||
it.numRead++
|
||||
return ValFloat
|
||||
case 1:
|
||||
if it.firstSTChangeOn == 0 {
|
||||
// This means we have same (zero or non-zero) ST value for the rest of
|
||||
// chunk. We can set rest of next functions to use ~classic XOR chunk iterations.
|
||||
it.nextFn = it.stAgnosticDoDNext
|
||||
} else if it.firstSTChangeOn == 1 {
|
||||
stDelta, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
it.stDelta = stDelta
|
||||
it.st += it.stDelta
|
||||
|
||||
// We got early ST change on the second sample, likely delta.
|
||||
// Continue ST rich flow from the next iteration.
|
||||
it.nextFn = it.stDoDNext
|
||||
}
|
||||
tDelta, err := binary.ReadUvarint(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
it.tDelta = tDelta
|
||||
it.t += int64(it.tDelta)
|
||||
return it.readValue()
|
||||
default:
|
||||
if it.firstSTChangeOn == it.numRead {
|
||||
it.nextFn = it.stDoDNext
|
||||
return it.stDoDNext()
|
||||
}
|
||||
return it.stAgnosticDoDNext()
|
||||
}
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) stDoDNext() ValueType {
|
||||
sdod, err := readClassicVarbitInt(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
it.stDelta = it.stDelta + sdod
|
||||
it.st += it.stDelta
|
||||
return it.stAgnosticDoDNext()
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) stAgnosticDoDNext() ValueType {
|
||||
dod, err := readClassicVarbitInt(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
it.tDelta = uint64(int64(it.tDelta) + dod)
|
||||
it.t += int64(it.tDelta)
|
||||
return it.readValue()
|
||||
}
|
||||
|
||||
func (it *xorOptST2tIterator) readValue() ValueType {
|
||||
err := xorRead(&it.br, &it.val, &it.leading, &it.trailing)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return ValNone
|
||||
}
|
||||
it.numRead++
|
||||
return ValFloat
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user