mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-05 12:26:14 +02:00
feat(tsdb/chunkenc): add float chunk format with start timestamp support
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
04a3ef75f2
commit
4b8fb76d95
342
tsdb/chunkenc/benchmark_test.go
Normal file
342
tsdb/chunkenc/benchmark_test.go
Normal file
@ -0,0 +1,342 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
)
|
||||
|
||||
type sampleCase struct {
|
||||
name string
|
||||
samples []triple
|
||||
}
|
||||
|
||||
type fmtCase struct {
|
||||
name string
|
||||
newChunkFn func() Chunk
|
||||
}
|
||||
|
||||
func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampleCase)) {
|
||||
const nSamples = 120 // Same as tsdb.DefaultSamplesPerChunk.
|
||||
|
||||
d, err := time.Parse(time.DateTime, "2025-11-04 10:01:05")
|
||||
require.NoError(b, err)
|
||||
|
||||
var (
|
||||
r = rand.New(rand.NewSource(1))
|
||||
initST = timestamp.FromTime(d) // Use realistic timestamp.
|
||||
initT = initST + 15000 // 15s after initST.
|
||||
initV = 1243535.123
|
||||
)
|
||||
|
||||
sampleCases := []sampleCase{
|
||||
{
|
||||
name: "vt=constant/st=0",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
t += 15000
|
||||
ret = append(ret, triple{st: 0, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
|
||||
{
|
||||
// Cumulative with a constant ST through the whole chunk, typical case (e.g. long counting counter).
|
||||
name: "vt=constant/st=cumulative",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
t += 15000
|
||||
ret = append(ret, triple{st: initST, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
// Delta simulates delta type or worst case for cumulatives, where ST
|
||||
// is changing on every sample.
|
||||
name: "vt=constant/st=delta",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
st := t + 1 // ST is a tight interval after the last t+1ms.
|
||||
t += 15000
|
||||
ret = append(ret, triple{st: st, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random steps/st=0",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps.
|
||||
ret = append(ret, triple{st: 0, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random steps/st=cumulative",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps.
|
||||
ret = append(ret, triple{st: initST, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random steps/st=delta",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
st := t + 1 // ST is a tight interval after the last t+1ms.
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps.
|
||||
ret = append(ret, triple{st: st, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random 0-1/st=0",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += r.Float64() // Random between 0 and 1.0.
|
||||
ret = append(ret, triple{st: 0, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
// Are we impacted by https://victoriametrics.com/blog/go-protobuf/ negative varint issue? (zig-zag needed?)
|
||||
name: "vt=negrandom 0-1/st=0",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v -= r.Float64() // Random between 0 and 1.0.
|
||||
ret = append(ret, triple{st: 0, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random 0-1/st=cumulative",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += r.Float64() // Random between 0 and 1.0.
|
||||
ret = append(ret, triple{st: initST, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random 0-1/st=cumulative-periodic-resets",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for i := range nSamples {
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += r.Float64() // Random between 0 and 1.0.
|
||||
st := initST
|
||||
if i%6 == 5 {
|
||||
st = t - 10000 // Reset of 10s before current t.
|
||||
}
|
||||
ret = append(ret, triple{st: st, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random 0-1/st=cumulative-periodic-zeros",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for i := range nSamples {
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += r.Float64() // Random between 0 and 1.0.
|
||||
st := initST
|
||||
if i%6 == 5 {
|
||||
st = 0
|
||||
}
|
||||
ret = append(ret, triple{st: st, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "vt=random 0-1/st=delta",
|
||||
samples: func() (ret []triple) {
|
||||
t, v := initT, initV
|
||||
for range nSamples {
|
||||
st := t + 1 // ST is a tight interval after the last t+1ms.
|
||||
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
|
||||
v += r.Float64() // Random between 0 and 1.0.
|
||||
ret = append(ret, triple{st: st, t: t, v: v})
|
||||
}
|
||||
return ret
|
||||
}(),
|
||||
},
|
||||
}
|
||||
|
||||
for _, f := range []fmtCase{
|
||||
{name: "XOR", newChunkFn: func() Chunk { return NewXORChunk() }},
|
||||
{name: "XOR_OPT_ST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }},
|
||||
} {
|
||||
for _, s := range sampleCases {
|
||||
b.Run(fmt.Sprintf("fmt=%s/%s", f.name, s.name), func(b *testing.B) {
|
||||
fn(b, f, s)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
export bench=bw.bench/append.v2 && go test \
|
||||
-run '^$' -bench '^BenchmarkAppender' \
|
||||
-benchtime 1s -count 6 -cpu 2 -timeout 999m \
|
||||
| tee ${bench}.txt
|
||||
|
||||
For profiles:
|
||||
|
||||
export bench=bw.bench/appendprof && go test \
|
||||
-run '^$' -bench '^BenchmarkAppender' \
|
||||
-benchtime 1s -count 1 -cpu 2 -timeout 999m \
|
||||
-cpuprofile=${bench}.cpu.pprof \
|
||||
| tee ${bench}.txt
|
||||
*/
|
||||
func BenchmarkAppender(b *testing.B) {
|
||||
foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) {
|
||||
b.ReportAllocs()
|
||||
|
||||
for b.Loop() {
|
||||
c := f.newChunkFn()
|
||||
|
||||
a, err := c.Appender()
|
||||
if err != nil {
|
||||
b.Fatalf("get appender: %s", err)
|
||||
}
|
||||
for _, p := range s.samples {
|
||||
a.Append(p.st, p.t, p.v)
|
||||
}
|
||||
// NOTE: Some buffered implementations only encode on Bytes().
|
||||
b.ReportMetric(float64(len(c.Bytes())), "B/chunk")
|
||||
|
||||
require.Equal(b, len(s.samples), c.NumSamples())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/*
|
||||
export bench=bw.bench/iter && go test \
|
||||
-run '^$' -bench '^BenchmarkIterator' \
|
||||
-benchtime 1s -count 6 -cpu 2 -timeout 999m \
|
||||
| tee ${bench}.txt
|
||||
|
||||
For profiles:
|
||||
|
||||
export bench=bw.bench/iterprof && go test \
|
||||
-run '^$' -bench '^BenchmarkIterator' \
|
||||
-benchtime 1000000x -count 1 -cpu 2 -timeout 999m \
|
||||
-cpuprofile=${bench}.cpu.pprof \
|
||||
| tee ${bench}.txt
|
||||
export bench=bw.bench/iterprof && go test \
|
||||
-run '^$' -bench '^BenchmarkIterator' \
|
||||
-benchtime 1000000x -count 1 -cpu 2 -timeout 999m \
|
||||
-memprofile=${bench}.mem.pprof \
|
||||
| tee ${bench}.txt
|
||||
*/
|
||||
func BenchmarkIterator(b *testing.B) {
|
||||
foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) {
|
||||
floatEquals := func(a, b float64) bool {
|
||||
return a == b
|
||||
}
|
||||
if f.name == "ALPBuffered" {
|
||||
// Hack as ALP loses precision.
|
||||
floatEquals = func(a, b float64) bool {
|
||||
return math.Abs(a-b) < 1e-6
|
||||
}
|
||||
}
|
||||
b.ReportAllocs()
|
||||
|
||||
c := f.newChunkFn()
|
||||
a, err := c.Appender()
|
||||
if err != nil {
|
||||
b.Fatalf("get appender: %s", err)
|
||||
}
|
||||
for _, p := range s.samples {
|
||||
a.Append(p.st, p.t, p.v)
|
||||
}
|
||||
|
||||
// Some chunk implementations might be buffered. Reset to ensure we don't reuse
|
||||
// appending buffers.
|
||||
c.Reset(c.Bytes())
|
||||
|
||||
// While we are at it, test if encoding/decoding works.
|
||||
it := c.Iterator(nil)
|
||||
require.Equal(b, len(s.samples), c.NumSamples())
|
||||
var got []triple
|
||||
for i := 0; it.Next() == ValFloat; i++ {
|
||||
t, v := it.At()
|
||||
got = append(got, triple{st: it.AtST(), t: t, v: v})
|
||||
}
|
||||
if err := it.Err(); err != nil && !errors.Is(err, io.EOF) {
|
||||
require.NoError(b, err)
|
||||
}
|
||||
if diff := cmp.Diff(s.samples, got, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" {
|
||||
b.Fatalf("mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
|
||||
var sink float64
|
||||
// Measure decoding efficiency.
|
||||
for i := 0; b.Loop(); {
|
||||
// Some chunk implementations might be buffered. Reset to ensure we don't reuse
|
||||
// previous decoded data.
|
||||
c.Reset(c.Bytes())
|
||||
b.ReportMetric(float64(len(c.Bytes())), "B/chunk")
|
||||
|
||||
it := c.Iterator(it)
|
||||
for it.Next() == ValFloat {
|
||||
_, v := it.At()
|
||||
sink = v
|
||||
i++
|
||||
}
|
||||
if err := it.Err(); err != nil && !errors.Is(err, io.EOF) {
|
||||
require.NoError(b, err)
|
||||
}
|
||||
_ = sink
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -30,6 +30,7 @@ const (
|
||||
EncXOR
|
||||
EncHistogram
|
||||
EncFloatHistogram
|
||||
EncXOROptST
|
||||
)
|
||||
|
||||
func (e Encoding) String() string {
|
||||
@ -42,13 +43,15 @@ func (e Encoding) String() string {
|
||||
return "histogram"
|
||||
case EncFloatHistogram:
|
||||
return "floathistogram"
|
||||
case EncXOROptST:
|
||||
return "XOR-start-timestamp"
|
||||
}
|
||||
return "<unknown>"
|
||||
}
|
||||
|
||||
// IsValidEncoding returns true for supported encodings.
|
||||
func IsValidEncoding(e Encoding) bool {
|
||||
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram
|
||||
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOROptST
|
||||
}
|
||||
|
||||
const (
|
||||
@ -299,6 +302,7 @@ type pool struct {
|
||||
xor sync.Pool
|
||||
histogram sync.Pool
|
||||
floatHistogram sync.Pool
|
||||
xoroptst sync.Pool
|
||||
}
|
||||
|
||||
// NewPool returns a new pool.
|
||||
@ -319,6 +323,11 @@ func NewPool() Pool {
|
||||
return &FloatHistogramChunk{b: bstream{}}
|
||||
},
|
||||
},
|
||||
xoroptst: sync.Pool{
|
||||
New: func() any {
|
||||
return &XorOptSTChunk{b: bstream{}}
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -331,6 +340,8 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
|
||||
c = p.histogram.Get().(*HistogramChunk)
|
||||
case EncFloatHistogram:
|
||||
c = p.floatHistogram.Get().(*FloatHistogramChunk)
|
||||
case EncXOROptST:
|
||||
c = p.xoroptst.Get().(*XorOptSTChunk)
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid chunk encoding %q", e)
|
||||
}
|
||||
@ -352,6 +363,9 @@ func (p *pool) Put(c Chunk) error {
|
||||
case EncFloatHistogram:
|
||||
_, ok = c.(*FloatHistogramChunk)
|
||||
sp = &p.floatHistogram
|
||||
case EncXOROptST:
|
||||
_, ok = c.(*XorOptSTChunk)
|
||||
sp = &p.xoroptst
|
||||
default:
|
||||
return fmt.Errorf("invalid chunk encoding %q", c.Encoding())
|
||||
}
|
||||
|
||||
@ -16,36 +16,41 @@ package chunkenc
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type pair struct {
|
||||
t int64
|
||||
v float64
|
||||
type triple struct {
|
||||
st, t int64
|
||||
v float64
|
||||
}
|
||||
|
||||
func TestChunk(t *testing.T) {
|
||||
for enc, nc := range map[Encoding]func() Chunk{
|
||||
EncXOR: func() Chunk { return NewXORChunk() },
|
||||
} {
|
||||
t.Run(fmt.Sprintf("%v", enc), func(t *testing.T) {
|
||||
testcases := []struct {
|
||||
encoding Encoding
|
||||
supportsST bool
|
||||
factory func() Chunk
|
||||
}{
|
||||
{encoding: EncXOR, supportsST: false, factory: func() Chunk { return NewXORChunk() }},
|
||||
{encoding: EncXOROptST, supportsST: true, factory: func() Chunk { return NewXOROptSTChunk() }},
|
||||
}
|
||||
for _, tc := range testcases {
|
||||
t.Run(fmt.Sprintf("%v", tc.encoding), func(t *testing.T) {
|
||||
for range make([]struct{}, 1) {
|
||||
c := nc()
|
||||
testChunk(t, c)
|
||||
c := tc.factory()
|
||||
testChunk(t, c, tc.supportsST)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testChunk(t *testing.T, c Chunk) {
|
||||
func testChunk(t *testing.T, c Chunk, supportsST bool) {
|
||||
app, err := c.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
var exp []pair
|
||||
var exp []triple
|
||||
var (
|
||||
ts = int64(1234123324)
|
||||
v = 1243535.123
|
||||
@ -65,26 +70,30 @@ func testChunk(t *testing.T, c Chunk) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
app.Append(0, ts, v)
|
||||
exp = append(exp, pair{t: ts, v: v})
|
||||
app.Append(ts-100, ts, v)
|
||||
expST := int64(0)
|
||||
if supportsST {
|
||||
expST = ts - 100
|
||||
}
|
||||
exp = append(exp, triple{st: expST, t: ts, v: v})
|
||||
}
|
||||
|
||||
// 1. Expand iterator in simple case.
|
||||
it1 := c.Iterator(nil)
|
||||
var res1 []pair
|
||||
var res1 []triple
|
||||
for it1.Next() == ValFloat {
|
||||
ts, v := it1.At()
|
||||
res1 = append(res1, pair{t: ts, v: v})
|
||||
res1 = append(res1, triple{st: it1.AtST(), t: ts, v: v})
|
||||
}
|
||||
require.NoError(t, it1.Err())
|
||||
require.Equal(t, exp, res1)
|
||||
|
||||
// 2. Expand second iterator while reusing first one.
|
||||
it2 := c.Iterator(it1)
|
||||
var res2 []pair
|
||||
var res2 []triple
|
||||
for it2.Next() == ValFloat {
|
||||
ts, v := it2.At()
|
||||
res2 = append(res2, pair{t: ts, v: v})
|
||||
res2 = append(res2, triple{st: it2.AtST(), t: ts, v: v})
|
||||
}
|
||||
require.NoError(t, it2.Err())
|
||||
require.Equal(t, exp, res2)
|
||||
@ -93,17 +102,17 @@ func testChunk(t *testing.T, c Chunk) {
|
||||
mid := len(exp) / 2
|
||||
|
||||
it3 := c.Iterator(nil)
|
||||
var res3 []pair
|
||||
var res3 []triple
|
||||
require.Equal(t, ValFloat, it3.Seek(exp[mid].t))
|
||||
// Below ones should not matter.
|
||||
require.Equal(t, ValFloat, it3.Seek(exp[mid].t))
|
||||
require.Equal(t, ValFloat, it3.Seek(exp[mid].t))
|
||||
ts, v = it3.At()
|
||||
res3 = append(res3, pair{t: ts, v: v})
|
||||
res3 = append(res3, triple{st: it3.AtST(), t: ts, v: v})
|
||||
|
||||
for it3.Next() == ValFloat {
|
||||
ts, v := it3.At()
|
||||
res3 = append(res3, pair{t: ts, v: v})
|
||||
res3 = append(res3, triple{st: it3.AtST(), t: ts, v: v})
|
||||
}
|
||||
require.NoError(t, it3.Err())
|
||||
require.Equal(t, exp[mid:], res3)
|
||||
@ -129,6 +138,10 @@ func TestPool(t *testing.T) {
|
||||
name: "float histogram",
|
||||
encoding: EncFloatHistogram,
|
||||
},
|
||||
{
|
||||
name: "xor opt st",
|
||||
encoding: EncXOROptST,
|
||||
},
|
||||
{
|
||||
name: "invalid encoding",
|
||||
encoding: EncNone,
|
||||
@ -150,6 +163,8 @@ func TestPool(t *testing.T) {
|
||||
b = &c.(*HistogramChunk).b
|
||||
case EncFloatHistogram:
|
||||
b = &c.(*FloatHistogramChunk).b
|
||||
case EncXOROptST:
|
||||
b = &c.(*XorOptSTChunk).b
|
||||
default:
|
||||
b = &c.(*XORChunk).b
|
||||
}
|
||||
@ -199,111 +214,3 @@ func (c fakeChunk) Encoding() Encoding {
|
||||
func (c fakeChunk) Reset([]byte) {
|
||||
c.t.Fatal("Reset should not be called")
|
||||
}
|
||||
|
||||
func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
|
||||
const samplesPerChunk = 250
|
||||
var (
|
||||
t = int64(1234123324)
|
||||
v = 1243535.123
|
||||
exp []pair
|
||||
)
|
||||
for range samplesPerChunk {
|
||||
// t += int64(rand.Intn(10000) + 1)
|
||||
t += int64(1000)
|
||||
// v = rand.Float64()
|
||||
v += float64(100)
|
||||
exp = append(exp, pair{t: t, v: v})
|
||||
}
|
||||
|
||||
chunk := newChunk()
|
||||
{
|
||||
a, err := chunk.Appender()
|
||||
if err != nil {
|
||||
b.Fatalf("get appender: %s", err)
|
||||
}
|
||||
j := 0
|
||||
for _, p := range exp {
|
||||
if j > 250 {
|
||||
break
|
||||
}
|
||||
a.Append(0, p.t, p.v)
|
||||
j++
|
||||
}
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
|
||||
var res float64
|
||||
var it Iterator
|
||||
for i := 0; b.Loop(); {
|
||||
it := chunk.Iterator(it)
|
||||
|
||||
for it.Next() == ValFloat {
|
||||
_, v := it.At()
|
||||
res = v
|
||||
i++
|
||||
}
|
||||
if err := it.Err(); err != nil && !errors.Is(err, io.EOF) {
|
||||
require.NoError(b, err)
|
||||
}
|
||||
_ = res
|
||||
}
|
||||
}
|
||||
|
||||
func newXORChunk() Chunk {
|
||||
return NewXORChunk()
|
||||
}
|
||||
|
||||
func BenchmarkXORIterator(b *testing.B) {
|
||||
benchmarkIterator(b, newXORChunk)
|
||||
}
|
||||
|
||||
func BenchmarkXORAppender(b *testing.B) {
|
||||
r := rand.New(rand.NewSource(1))
|
||||
b.Run("constant", func(b *testing.B) {
|
||||
benchmarkAppender(b, func() (int64, float64) {
|
||||
return 1000, 0
|
||||
}, newXORChunk)
|
||||
})
|
||||
b.Run("random steps", func(b *testing.B) {
|
||||
benchmarkAppender(b, func() (int64, float64) {
|
||||
return int64(r.Intn(100) - 50 + 15000), // 15 seconds +- up to 100ms of jitter.
|
||||
float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps.
|
||||
}, newXORChunk)
|
||||
})
|
||||
b.Run("random 0-1", func(b *testing.B) {
|
||||
benchmarkAppender(b, func() (int64, float64) {
|
||||
return int64(r.Intn(100) - 50 + 15000), // 15 seconds +- up to 100ms of jitter.
|
||||
r.Float64() // Random between 0 and 1.0.
|
||||
}, newXORChunk)
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkAppender(b *testing.B, deltas func() (int64, float64), newChunk func() Chunk) {
|
||||
var (
|
||||
t = int64(1234123324)
|
||||
v = 1243535.123
|
||||
)
|
||||
const nSamples = 120 // Same as tsdb.DefaultSamplesPerChunk.
|
||||
var exp []pair
|
||||
for range nSamples {
|
||||
dt, dv := deltas()
|
||||
t += dt
|
||||
v += dv
|
||||
exp = append(exp, pair{t: t, v: v})
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
|
||||
for b.Loop() {
|
||||
c := newChunk()
|
||||
|
||||
a, err := c.Appender()
|
||||
if err != nil {
|
||||
b.Fatalf("get appender: %s", err)
|
||||
}
|
||||
for _, p := range exp {
|
||||
a.Append(0, p.t, p.v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
124
tsdb/chunkenc/st_helper_test.go
Normal file
124
tsdb/chunkenc/st_helper_test.go
Normal file
@ -0,0 +1,124 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
)
|
||||
|
||||
// testChunkSTHandling tests handling of start times in chunks.
|
||||
// It uses 0-4 samples with timestamp 1000,2000,3000,4000 and monotonically
|
||||
// increasing start times that are chosen from 0-(ts-500) for each sample.
|
||||
// All combinations of start times are tested for each number of samples.
|
||||
func testChunkSTHandling(t *testing.T, vt ValueType, chunkFactory func() Chunk) {
|
||||
sampleAppend := func(app Appender, vt ValueType, st, ts int64, v float64) {
|
||||
switch vt {
|
||||
case ValFloat:
|
||||
app.Append(st, ts, v)
|
||||
case ValHistogram:
|
||||
_, recoded, _, err := app.AppendHistogram(nil, st, ts, &histogram.Histogram{Sum: v, Count: uint64(v * 10)}, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, recoded)
|
||||
case ValFloatHistogram:
|
||||
_, recoded, _, err := app.AppendFloatHistogram(nil, st, ts, &histogram.FloatHistogram{Sum: v, Count: v * 10}, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, recoded)
|
||||
default:
|
||||
t.Fatalf("unsupported value type %v", vt)
|
||||
}
|
||||
}
|
||||
|
||||
get := func(it Iterator, vt ValueType) (int64, int64, float64) {
|
||||
switch vt {
|
||||
case ValFloat:
|
||||
ts, v := it.At()
|
||||
return it.AtST(), ts, v
|
||||
case ValHistogram:
|
||||
ts, h := it.AtHistogram(nil)
|
||||
return it.AtST(), ts, float64(h.Sum)
|
||||
case ValFloatHistogram:
|
||||
ts, fh := it.AtFloatHistogram(nil)
|
||||
return it.AtST(), ts, fh.Sum
|
||||
default:
|
||||
t.Fatalf("unsupported value type %v", vt)
|
||||
return 0, 0, 0
|
||||
}
|
||||
}
|
||||
|
||||
runCase := func(t *testing.T, samples []triple) {
|
||||
chunk := chunkFactory()
|
||||
app, err := chunk.Appender()
|
||||
require.NoError(t, err)
|
||||
for _, s := range samples {
|
||||
sampleAppend(app, vt, s.st, s.t, s.v)
|
||||
}
|
||||
it := chunk.Iterator(nil)
|
||||
for i, s := range samples {
|
||||
require.Equal(t, vt, it.Next())
|
||||
st, ts, f := get(it, vt)
|
||||
require.Equal(t, s.t, ts, "%d: timestamp mismatch", i)
|
||||
require.Equal(t, s.st, st, "%d: start time mismatch", i)
|
||||
require.InDelta(t, s.v, f, 1e-9, "%d: value mismatch", i)
|
||||
}
|
||||
require.Equal(t, ValNone, it.Next())
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
|
||||
t.Run("manual for debugging", func(t *testing.T) {
|
||||
samples := []triple{
|
||||
{st: 0, t: 1000, v: 1.5},
|
||||
{st: 0, t: 2000, v: 2.5},
|
||||
{st: 0, t: 3000, v: 3.5},
|
||||
{st: 0, t: 4000, v: 4.5},
|
||||
}
|
||||
runCase(t, samples)
|
||||
})
|
||||
|
||||
stTimes := []int64{0, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000}
|
||||
for numberOfSamples := range 5 {
|
||||
samples := make([]triple, numberOfSamples)
|
||||
sampleSTidx := make([]int, numberOfSamples)
|
||||
for {
|
||||
for j := range numberOfSamples {
|
||||
samples[j] = triple{
|
||||
st: stTimes[sampleSTidx[j]],
|
||||
t: int64(1000 * (j + 1)),
|
||||
v: float64(j) + 0.5,
|
||||
}
|
||||
}
|
||||
|
||||
t.Run(fmt.Sprintf("%v", samples), func(t *testing.T) {
|
||||
runCase(t, samples)
|
||||
})
|
||||
|
||||
exhausted := true
|
||||
for j := numberOfSamples - 1; j >= 0; j-- {
|
||||
if sampleSTidx[j] < j+2 {
|
||||
sampleSTidx[j]++
|
||||
exhausted = false
|
||||
break
|
||||
}
|
||||
sampleSTidx[j] = 0
|
||||
}
|
||||
if exhausted {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
103
tsdb/chunkenc/varbit_classic.go
Normal file
103
tsdb/chunkenc/varbit_classic.go
Normal file
@ -0,0 +1,103 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
// putClassicVarbitInt writes an int64 using varbit encoding with a bit bucketing
|
||||
// as it was done for a long time in the initial XOR chunk format.
|
||||
func putClassicVarbitInt(b *bstream, val int64) {
|
||||
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
|
||||
// Thus we use higher value range steps with larger bit size.
|
||||
//
|
||||
// TODO(beorn7): This seems to needlessly jump to large bit
|
||||
// sizes even for very small deviations from zero. Timestamp
|
||||
// compression can probably benefit from some smaller bit
|
||||
// buckets. See also what was done for histogram encoding in
|
||||
// varbit.go.
|
||||
switch {
|
||||
case val == 0:
|
||||
b.writeBit(zero)
|
||||
case bitRange(val, 14):
|
||||
b.writeByte(0b10<<6 | (uint8(val>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod.
|
||||
b.writeByte(uint8(val)) // Bottom 8 bits of dod.
|
||||
case bitRange(val, 17):
|
||||
b.writeBits(0b110, 3)
|
||||
b.writeBits(uint64(val), 17)
|
||||
case bitRange(val, 20):
|
||||
b.writeBits(0b1110, 4)
|
||||
b.writeBits(uint64(val), 20)
|
||||
default:
|
||||
b.writeBits(0b1111, 4)
|
||||
b.writeBits(uint64(val), 64)
|
||||
}
|
||||
}
|
||||
|
||||
// readClassicVarbitInt reads an int64 encoded with putClassicVarbitInt.
|
||||
// This is copied into production code to make it inline.
|
||||
func readClassicVarbitInt(b *bstreamReader) (int64, error) {
|
||||
var d byte
|
||||
// read delta-of-delta
|
||||
for range 4 {
|
||||
d <<= 1
|
||||
bit, err := b.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = b.readBit()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
if bit == zero {
|
||||
break
|
||||
}
|
||||
d |= 1
|
||||
}
|
||||
var sz uint8
|
||||
var val int64
|
||||
switch d {
|
||||
case 0b0:
|
||||
// dod == 0
|
||||
case 0b10:
|
||||
sz = 14
|
||||
case 0b110:
|
||||
sz = 17
|
||||
case 0b1110:
|
||||
sz = 20
|
||||
case 0b1111:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := b.readBits(64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
val = int64(bits)
|
||||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := b.readBitsFast(sz)
|
||||
if err != nil {
|
||||
bits, err = b.readBits(sz)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// Account for negative numbers, which come back as high unsigned numbers.
|
||||
// See docs/bstream.md.
|
||||
if bits > (1 << (sz - 1)) {
|
||||
bits -= 1 << sz
|
||||
}
|
||||
val = int64(bits)
|
||||
}
|
||||
|
||||
return val, nil
|
||||
}
|
||||
57
tsdb/chunkenc/varbit_classic_test.go
Normal file
57
tsdb/chunkenc/varbit_classic_test.go
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestClassicVarbitInt(t *testing.T) {
|
||||
numbers := []int64{
|
||||
math.MinInt64,
|
||||
-36028797018963968, -36028797018963967,
|
||||
-16777216, -16777215,
|
||||
-131072, -131071,
|
||||
-2048, -2047,
|
||||
-256, -255,
|
||||
-32, -31,
|
||||
-4, -3,
|
||||
-1, 0, 1,
|
||||
4, 5,
|
||||
32, 33,
|
||||
256, 257,
|
||||
2048, 2049,
|
||||
131072, 131073,
|
||||
16777216, 16777217,
|
||||
36028797018963968, 36028797018963969,
|
||||
math.MaxInt64,
|
||||
}
|
||||
|
||||
bs := bstream{}
|
||||
|
||||
for _, n := range numbers {
|
||||
putClassicVarbitInt(&bs, n)
|
||||
}
|
||||
|
||||
bsr := newBReader(bs.bytes())
|
||||
|
||||
for _, want := range numbers {
|
||||
got, err := readClassicVarbitInt(&bsr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, want, got)
|
||||
}
|
||||
}
|
||||
681
tsdb/chunkenc/xoroptst.go
Normal file
681
tsdb/chunkenc/xoroptst.go
Normal file
@ -0,0 +1,681 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
)
|
||||
|
||||
const (
|
||||
chunkSTHeaderSize = 1
|
||||
maxFirstSTChangeOn = 0x7F
|
||||
)
|
||||
|
||||
func writeHeaderFirstSTKnown(b []byte) {
|
||||
b[0] = 0x80
|
||||
}
|
||||
|
||||
func writeHeaderFirstSTChangeOn(b []byte, firstSTChangeOn uint16) {
|
||||
// First bit indicates the initial ST value.
|
||||
// Here we save the sample number from where the first change occurs in the
|
||||
// rest of the byte (7 bits)
|
||||
|
||||
if firstSTChangeOn > maxFirstSTChangeOn {
|
||||
// This should never happen, would cause corruption (ST already skipped but shouldn't).
|
||||
return
|
||||
}
|
||||
b[0] |= uint8(firstSTChangeOn)
|
||||
}
|
||||
|
||||
func readSTHeader(b []byte) (firstSTKnown bool, firstSTChangeOn uint16) {
|
||||
if b[0] == 0x00 {
|
||||
return false, 0
|
||||
}
|
||||
if b[0] == 0x80 {
|
||||
return true, 0
|
||||
}
|
||||
mask := byte(0x80)
|
||||
if b[0]&mask != 0 {
|
||||
firstSTKnown = true
|
||||
}
|
||||
mask = 0x7F
|
||||
return firstSTKnown, uint16(b[0] & mask)
|
||||
}
|
||||
|
||||
// XorOptSTChunk holds encoded sample data:
|
||||
// 2B(numSamples), 1B(stHeader), ?varint(st), varint(t), xor(v), ?varuint(stDelta), varuint(tDelta), xor(v), ?classicvarbitint(stDod), classicvarbitint(tDod), xor(v), ...
|
||||
// stHeader: 1b(firstSTKnown), 7b(firstSTChangeOn).
|
||||
type XorOptSTChunk struct {
|
||||
b bstream
|
||||
}
|
||||
|
||||
// NewXOROptSTChunk returns a new chunk with XORv2 encoding.
|
||||
func NewXOROptSTChunk() *XorOptSTChunk {
|
||||
b := make([]byte, chunkHeaderSize+chunkSTHeaderSize, chunkAllocationSize)
|
||||
return &XorOptSTChunk{b: bstream{stream: b, count: 0}}
|
||||
}
|
||||
|
||||
func (c *XorOptSTChunk) Reset(stream []byte) {
|
||||
c.b.Reset(stream)
|
||||
}
|
||||
|
||||
// Encoding returns the encoding type.
|
||||
func (*XorOptSTChunk) Encoding() Encoding {
|
||||
return EncXOROptST
|
||||
}
|
||||
|
||||
// Bytes returns the underlying byte slice of the chunk.
|
||||
func (c *XorOptSTChunk) Bytes() []byte {
|
||||
return c.b.bytes()
|
||||
}
|
||||
|
||||
// NumSamples returns the number of samples in the chunk.
|
||||
func (c *XorOptSTChunk) NumSamples() int {
|
||||
return int(binary.BigEndian.Uint16(c.Bytes()))
|
||||
}
|
||||
|
||||
// Compact implements the Chunk interface.
|
||||
func (c *XorOptSTChunk) Compact() {
|
||||
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
|
||||
buf := make([]byte, l)
|
||||
copy(buf, c.b.stream)
|
||||
c.b.stream = buf
|
||||
}
|
||||
}
|
||||
|
||||
// Appender implements the Chunk interface.
|
||||
// It is not valid to call Appender() multiple times concurrently or to use multiple
|
||||
// Appenders on the same chunk.
|
||||
func (c *XorOptSTChunk) Appender() (Appender, error) {
|
||||
if len(c.b.stream) == chunkHeaderSize+chunkSTHeaderSize { // Avoid allocating an Iterator when chunk is empty.
|
||||
return &xorOptSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil
|
||||
}
|
||||
it := c.iterator(nil)
|
||||
|
||||
// To get an appender we must know the state it would have if we had
|
||||
// appended all existing data from scratch.
|
||||
// We iterate through the end and populate via the iterator's state.
|
||||
for it.Next() != ValNone {
|
||||
}
|
||||
if err := it.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
a := &xorOptSTAppender{
|
||||
b: &c.b,
|
||||
st: it.st,
|
||||
t: it.t,
|
||||
v: it.val,
|
||||
stDelta: it.stDelta,
|
||||
tDelta: it.tDelta,
|
||||
leading: it.leading,
|
||||
trailing: it.trailing,
|
||||
|
||||
numTotal: it.numTotal,
|
||||
firstSTKnown: it.firstSTKnown,
|
||||
firstSTChangeOn: it.firstSTChangeOn,
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (c *XorOptSTChunk) iterator(it Iterator) *xorOptSTtIterator {
|
||||
xorIter, ok := it.(*xorOptSTtIterator)
|
||||
if !ok {
|
||||
xorIter = &xorOptSTtIterator{}
|
||||
}
|
||||
|
||||
xorIter.Reset(c.b.bytes())
|
||||
return xorIter
|
||||
}
|
||||
|
||||
// Iterator implements the Chunk interface.
|
||||
// Iterator() must not be called concurrently with any modifications to the chunk,
|
||||
// but after it returns you can use an Iterator concurrently with an Appender or
|
||||
// other Iterators.
|
||||
func (c *XorOptSTChunk) Iterator(it Iterator) Iterator {
|
||||
return c.iterator(it)
|
||||
}
|
||||
|
||||
type xorOptSTAppender struct {
|
||||
b *bstream
|
||||
numTotal uint16
|
||||
|
||||
firstSTKnown bool
|
||||
firstSTChangeOn uint16
|
||||
|
||||
st, t int64
|
||||
v float64
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
|
||||
leading uint8
|
||||
trailing uint8
|
||||
}
|
||||
|
||||
func (a *xorOptSTAppender) writeVDelta(v float64) {
|
||||
xorWrite(a.b, v, a.v, &a.leading, &a.trailing)
|
||||
}
|
||||
|
||||
func (*xorOptSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
|
||||
panic("appended a histogram sample to a float chunk")
|
||||
}
|
||||
|
||||
func (*xorOptSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
|
||||
panic("appended a float histogram sample to a float chunk")
|
||||
}
|
||||
|
||||
const (
|
||||
read0State uint8 = iota
|
||||
read1State
|
||||
readDoDMaybeSTState
|
||||
readDoDNoSTState
|
||||
readDoDState
|
||||
|
||||
eofState uint8 = 1<<8 - 1
|
||||
)
|
||||
|
||||
type xorOptSTtIterator struct {
|
||||
br bstreamReader
|
||||
numTotal uint16
|
||||
|
||||
firstSTKnown bool
|
||||
firstSTChangeOn uint16
|
||||
|
||||
state uint8
|
||||
numRead uint16
|
||||
|
||||
st, t int64
|
||||
val float64
|
||||
|
||||
leading uint8
|
||||
trailing uint8
|
||||
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
err error
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) Seek(t int64) ValueType {
|
||||
if it.state == eofState {
|
||||
return ValNone
|
||||
}
|
||||
|
||||
for t > it.t || it.state == read0State {
|
||||
if it.Next() == ValNone {
|
||||
return ValNone
|
||||
}
|
||||
}
|
||||
return ValFloat
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) At() (int64, float64) {
|
||||
return it.t, it.val
|
||||
}
|
||||
|
||||
func (*xorOptSTtIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
panic("cannot call xorIterator.AtHistogram")
|
||||
}
|
||||
|
||||
func (*xorOptSTtIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
panic("cannot call xorIterator.AtFloatHistogram")
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) AtT() int64 {
|
||||
return it.t
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) AtST() int64 {
|
||||
return it.st
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) Err() error {
|
||||
return it.err
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) Reset(b []byte) {
|
||||
// We skip initial headers for actual samples.
|
||||
it.br = newBReader(b[chunkHeaderSize+chunkSTHeaderSize:])
|
||||
it.numTotal = binary.BigEndian.Uint16(b)
|
||||
it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[chunkHeaderSize:])
|
||||
it.numRead = 0
|
||||
it.st = 0
|
||||
it.t = 0
|
||||
it.val = 0
|
||||
it.leading = 0
|
||||
it.trailing = 0
|
||||
it.stDelta = 0
|
||||
it.tDelta = 0
|
||||
it.err = nil
|
||||
it.state = read0State
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
}
|
||||
|
||||
func (a *xorOptSTAppender) Append(st, t int64, v float64) {
|
||||
if st == 0 && a.numTotal != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown {
|
||||
// Fast path for no ST usage at all.
|
||||
// Same as classic XOR chunk appender.
|
||||
|
||||
var tDelta uint64
|
||||
|
||||
switch a.numTotal {
|
||||
case 0:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
for _, b := range buf[:binary.PutVarint(buf, t)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
a.b.writeBits(math.Float64bits(v), 64)
|
||||
case 1:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
tDelta = uint64(t - a.t)
|
||||
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
a.writeVDelta(v)
|
||||
default:
|
||||
tDelta = uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
|
||||
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
|
||||
// Thus we use higher value range steps with larger bit size.
|
||||
//
|
||||
// TODO(beorn7): This seems to needlessly jump to large bit
|
||||
// sizes even for very small deviations from zero. Timestamp
|
||||
// compression can probably benefit from some smaller bit
|
||||
// buckets. See also what was done for histogram encoding in
|
||||
// varbit.go.
|
||||
switch {
|
||||
case dod == 0:
|
||||
a.b.writeBit(zero)
|
||||
case bitRange(dod, 14):
|
||||
a.b.writeByte(0b10<<6 | (uint8(dod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod.
|
||||
a.b.writeByte(uint8(dod)) // Bottom 8 bits of dod.
|
||||
case bitRange(dod, 17):
|
||||
a.b.writeBits(0b110, 3)
|
||||
a.b.writeBits(uint64(dod), 17)
|
||||
case bitRange(dod, 20):
|
||||
a.b.writeBits(0b1110, 4)
|
||||
a.b.writeBits(uint64(dod), 20)
|
||||
default:
|
||||
a.b.writeBits(0b1111, 4)
|
||||
a.b.writeBits(uint64(dod), 64)
|
||||
}
|
||||
|
||||
a.writeVDelta(v)
|
||||
}
|
||||
|
||||
a.t = t
|
||||
a.v = v
|
||||
a.tDelta = tDelta
|
||||
a.numTotal++
|
||||
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
stDelta int64
|
||||
tDelta uint64
|
||||
stChanged bool
|
||||
)
|
||||
|
||||
// Slow path for ST usage.
|
||||
switch a.numTotal {
|
||||
case 0:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
|
||||
for _, b := range buf[:binary.PutVarint(buf, st)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
writeHeaderFirstSTKnown(a.b.bytes()[chunkHeaderSize:])
|
||||
a.firstSTKnown = true
|
||||
|
||||
for _, b := range buf[:binary.PutVarint(buf, t)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
a.b.writeBits(math.Float64bits(v), 64)
|
||||
case 1:
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
stDelta = st - a.st
|
||||
if stDelta != 0 {
|
||||
stChanged = true
|
||||
for _, b := range buf[:binary.PutVarint(buf, stDelta)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
}
|
||||
|
||||
tDelta = uint64(t - a.t)
|
||||
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
|
||||
a.b.writeByte(b)
|
||||
}
|
||||
a.writeVDelta(v)
|
||||
default:
|
||||
if a.firstSTChangeOn == 0 && a.numTotal == maxFirstSTChangeOn {
|
||||
// We are at the 127th sample. firstSTChangeOn can only fit 7 bits due to a
|
||||
// single byte header constrain, which is fine, given typical 120 sample size.
|
||||
a.firstSTChangeOn = a.numTotal
|
||||
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.firstSTChangeOn)
|
||||
}
|
||||
|
||||
stDelta = st - a.st
|
||||
sdod := stDelta - a.stDelta
|
||||
if sdod != 0 || a.firstSTChangeOn != 0 {
|
||||
stChanged = true
|
||||
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
|
||||
// Thus we use higher value range steps with larger bit size.
|
||||
//
|
||||
// TODO(beorn7): This seems to needlessly jump to large bit
|
||||
// sizes even for very small deviations from zero. Timestamp
|
||||
// compression can probably benefit from some smaller bit
|
||||
// buckets. See also what was done for histogram encoding in
|
||||
// varbit.go.
|
||||
switch {
|
||||
case sdod == 0:
|
||||
a.b.writeBit(zero)
|
||||
case bitRange(sdod, 14):
|
||||
a.b.writeByte(0b10<<6 | (uint8(sdod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod.
|
||||
a.b.writeByte(uint8(sdod)) // Bottom 8 bits of dod.
|
||||
case bitRange(sdod, 17):
|
||||
a.b.writeBits(0b110, 3)
|
||||
a.b.writeBits(uint64(sdod), 17)
|
||||
case bitRange(sdod, 20):
|
||||
a.b.writeBits(0b1110, 4)
|
||||
a.b.writeBits(uint64(sdod), 20)
|
||||
default:
|
||||
a.b.writeBits(0b1111, 4)
|
||||
a.b.writeBits(uint64(sdod), 64)
|
||||
}
|
||||
// putClassicVarbitInt(a.b, sdod)
|
||||
}
|
||||
|
||||
tDelta = uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
|
||||
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
|
||||
// Thus we use higher value range steps with larger bit size.
|
||||
//
|
||||
// TODO(beorn7): This seems to needlessly jump to large bit
|
||||
// sizes even for very small deviations from zero. Timestamp
|
||||
// compression can probably benefit from some smaller bit
|
||||
// buckets. See also what was done for histogram encoding in
|
||||
// varbit.go.
|
||||
switch {
|
||||
case dod == 0:
|
||||
a.b.writeBit(zero)
|
||||
case bitRange(dod, 14):
|
||||
a.b.writeByte(0b10<<6 | (uint8(dod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod.
|
||||
a.b.writeByte(uint8(dod)) // Bottom 8 bits of dod.
|
||||
case bitRange(dod, 17):
|
||||
a.b.writeBits(0b110, 3)
|
||||
a.b.writeBits(uint64(dod), 17)
|
||||
case bitRange(dod, 20):
|
||||
a.b.writeBits(0b1110, 4)
|
||||
a.b.writeBits(uint64(dod), 20)
|
||||
default:
|
||||
a.b.writeBits(0b1111, 4)
|
||||
a.b.writeBits(uint64(dod), 64)
|
||||
}
|
||||
|
||||
a.writeVDelta(v)
|
||||
}
|
||||
|
||||
a.st = st
|
||||
a.t = t
|
||||
a.v = v
|
||||
a.tDelta = tDelta
|
||||
a.stDelta = stDelta
|
||||
|
||||
a.numTotal++
|
||||
binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal)
|
||||
|
||||
// firstSTChangeOn == 0 indicates that we have one ST value (zero or not)
|
||||
// for all STs in the appends until now. If we see a first "update"
|
||||
// we are saving this number in the header and continue tracking all DoD (including zeros).
|
||||
if a.firstSTChangeOn == 0 && stChanged {
|
||||
a.firstSTChangeOn = a.numTotal - 1
|
||||
writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.firstSTChangeOn)
|
||||
}
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) retErr(err error) ValueType {
|
||||
it.err = err
|
||||
it.state = eofState
|
||||
return ValNone
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) Next() ValueType {
|
||||
switch it.state {
|
||||
case eofState:
|
||||
return ValNone
|
||||
case read0State:
|
||||
it.state++
|
||||
|
||||
// Optional ST read.
|
||||
if it.firstSTKnown {
|
||||
st, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
it.st = st
|
||||
}
|
||||
|
||||
// TS.
|
||||
t, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
// Value.
|
||||
v, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
it.t = t
|
||||
it.val = math.Float64frombits(v)
|
||||
|
||||
// State EOF check.
|
||||
it.numRead++
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
return ValFloat
|
||||
case read1State:
|
||||
it.state++
|
||||
switch it.firstSTChangeOn {
|
||||
case 0:
|
||||
// This means we have same (zero or non-zero) ST value for the rest of
|
||||
// chunk. We can simply use ~classic XOR chunk iterations.
|
||||
it.state = readDoDNoSTState
|
||||
case 1:
|
||||
// We got early ST change on the second sample, likely delta.
|
||||
// Continue ST rich flow from the next iteration.
|
||||
it.state = readDoDState
|
||||
|
||||
stDelta, err := binary.ReadVarint(&it.br)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
it.stDelta = stDelta
|
||||
it.st += it.stDelta
|
||||
}
|
||||
// TS.
|
||||
tDelta, err := binary.ReadUvarint(&it.br)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
it.tDelta = tDelta
|
||||
it.t += int64(it.tDelta)
|
||||
|
||||
// Value.
|
||||
if err := xorRead(&it.br, &it.val, &it.leading, &it.trailing); err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
// State EOF check.
|
||||
it.numRead++
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
return ValFloat
|
||||
case readDoDMaybeSTState:
|
||||
if it.firstSTChangeOn == it.numRead {
|
||||
// ST changes from this iteration, change state for future.
|
||||
it.state = readDoDState
|
||||
return it.dodNext()
|
||||
}
|
||||
return it.dodNoSTNext()
|
||||
case readDoDState:
|
||||
return it.dodNext()
|
||||
case readDoDNoSTState:
|
||||
return it.dodNoSTNext()
|
||||
default:
|
||||
panic("xorOptSTtIterator: broken machine state")
|
||||
}
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) dodNext() ValueType {
|
||||
// Inlined readClassicVarbitInt(&it.br)
|
||||
var d byte
|
||||
// read delta-of-delta
|
||||
for range 4 {
|
||||
d <<= 1
|
||||
bit, err := it.br.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = it.br.readBit()
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
if bit == zero {
|
||||
break
|
||||
}
|
||||
d |= 1
|
||||
}
|
||||
var sz uint8
|
||||
var sdod int64
|
||||
switch d {
|
||||
case 0b0:
|
||||
// dod == 0
|
||||
case 0b10:
|
||||
sz = 14
|
||||
case 0b110:
|
||||
sz = 17
|
||||
case 0b1110:
|
||||
sz = 20
|
||||
case 0b1111:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
sdod = int64(bits)
|
||||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := it.br.readBitsFast(sz)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(sz)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Account for negative numbers, which come back as high unsigned numbers.
|
||||
// See docs/bstream.md.
|
||||
if bits > (1 << (sz - 1)) {
|
||||
bits -= 1 << sz
|
||||
}
|
||||
sdod = int64(bits)
|
||||
}
|
||||
|
||||
it.stDelta += sdod
|
||||
it.st += it.stDelta
|
||||
return it.dodNoSTNext()
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) dodNoSTNext() ValueType {
|
||||
// Inlined readClassicVarbitInt(&it.br)
|
||||
var d byte
|
||||
// read delta-of-delta
|
||||
for range 4 {
|
||||
d <<= 1
|
||||
bit, err := it.br.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = it.br.readBit()
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
if bit == zero {
|
||||
break
|
||||
}
|
||||
d |= 1
|
||||
}
|
||||
var sz uint8
|
||||
var dod int64
|
||||
switch d {
|
||||
case 0b0:
|
||||
// dod == 0
|
||||
case 0b10:
|
||||
sz = 14
|
||||
case 0b110:
|
||||
sz = 17
|
||||
case 0b1110:
|
||||
sz = 20
|
||||
case 0b1111:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
dod = int64(bits)
|
||||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := it.br.readBitsFast(sz)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(sz)
|
||||
if err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Account for negative numbers, which come back as high unsigned numbers.
|
||||
// See docs/bstream.md.
|
||||
if bits > (1 << (sz - 1)) {
|
||||
bits -= 1 << sz
|
||||
}
|
||||
dod = int64(bits)
|
||||
}
|
||||
|
||||
it.tDelta = uint64(int64(it.tDelta) + dod)
|
||||
it.t += int64(it.tDelta)
|
||||
// Value.
|
||||
if err := xorRead(&it.br, &it.val, &it.leading, &it.trailing); err != nil {
|
||||
return it.retErr(err)
|
||||
}
|
||||
|
||||
// State EOF check.
|
||||
it.numRead++
|
||||
if it.numRead >= it.numTotal {
|
||||
it.state = eofState
|
||||
}
|
||||
return ValFloat
|
||||
}
|
||||
108
tsdb/chunkenc/xoroptst_test.go
Normal file
108
tsdb/chunkenc/xoroptst_test.go
Normal file
@ -0,0 +1,108 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestXorOptSTChunk(t *testing.T) {
|
||||
testChunkSTHandling(t, ValFloat, func() Chunk {
|
||||
return NewXOROptSTChunk()
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) {
|
||||
const afterMax = maxFirstSTChangeOn + 3
|
||||
t.Run("zero ST", func(t *testing.T) {
|
||||
chunk := NewXOROptSTChunk()
|
||||
app, err := chunk.Appender()
|
||||
require.NoError(t, err)
|
||||
for i := range afterMax {
|
||||
app.Append(0, int64(i*10+1), float64(i)*1.5)
|
||||
}
|
||||
|
||||
it := chunk.Iterator(nil)
|
||||
for i := range afterMax {
|
||||
require.Equal(t, ValFloat, it.Next())
|
||||
st := it.AtST()
|
||||
ts, v := it.At()
|
||||
require.Equal(t, int64(0), st)
|
||||
require.Equal(t, int64(i*10+1), ts)
|
||||
require.Equal(t, float64(i)*1.5, v)
|
||||
}
|
||||
|
||||
require.Equal(t, ValNone, it.Next())
|
||||
require.NoError(t, it.Err())
|
||||
})
|
||||
|
||||
t.Run("non-zero ST after 127", func(t *testing.T) {
|
||||
chunk := NewXOROptSTChunk()
|
||||
app, err := chunk.Appender()
|
||||
require.NoError(t, err)
|
||||
for i := range afterMax {
|
||||
st := int64(0)
|
||||
if i == afterMax-1 {
|
||||
st = int64((afterMax - 1) * 10)
|
||||
}
|
||||
app.Append(st, int64(i*10+1), float64(i)*1.5)
|
||||
}
|
||||
|
||||
it := chunk.Iterator(nil)
|
||||
for i := range afterMax {
|
||||
require.Equal(t, ValFloat, it.Next())
|
||||
st := it.AtST()
|
||||
ts, v := it.At()
|
||||
if i == afterMax-1 {
|
||||
require.Equal(t, int64((afterMax-1)*10), st)
|
||||
} else {
|
||||
require.Equal(t, int64(0), st)
|
||||
}
|
||||
require.Equal(t, int64(i*10+1), ts)
|
||||
require.Equal(t, float64(i)*1.5, v)
|
||||
}
|
||||
|
||||
require.Equal(t, ValNone, it.Next())
|
||||
require.NoError(t, it.Err())
|
||||
})
|
||||
}
|
||||
|
||||
func TestXorOptSTChunk_STHeader(t *testing.T) {
|
||||
b := make([]byte, 1)
|
||||
writeHeaderFirstSTKnown(b)
|
||||
firstSTKnown, firstSTChangeOn := readSTHeader(b)
|
||||
require.True(t, firstSTKnown)
|
||||
require.Equal(t, uint16(0), firstSTChangeOn)
|
||||
|
||||
b = make([]byte, 1)
|
||||
firstSTKnown, firstSTChangeOn = readSTHeader(b)
|
||||
require.False(t, firstSTKnown)
|
||||
require.Equal(t, uint16(0), firstSTChangeOn)
|
||||
|
||||
b = make([]byte, 1)
|
||||
writeHeaderFirstSTChangeOn(b, 1)
|
||||
firstSTKnown, firstSTChangeOn = readSTHeader(b)
|
||||
require.False(t, firstSTKnown)
|
||||
require.Equal(t, uint16(1), firstSTChangeOn)
|
||||
|
||||
b = make([]byte, 1)
|
||||
writeHeaderFirstSTKnown(b)
|
||||
writeHeaderFirstSTChangeOn(b, 119)
|
||||
firstSTKnown, firstSTChangeOn = readSTHeader(b)
|
||||
require.True(t, firstSTKnown)
|
||||
require.Equal(t, uint16(119), firstSTChangeOn)
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user