diff --git a/tsdb/chunkenc/benchmark_test.go b/tsdb/chunkenc/benchmark_test.go new file mode 100644 index 0000000000..cc69725858 --- /dev/null +++ b/tsdb/chunkenc/benchmark_test.go @@ -0,0 +1,342 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "errors" + "fmt" + "io" + "math" + "math/rand" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/timestamp" +) + +type sampleCase struct { + name string + samples []triple +} + +type fmtCase struct { + name string + newChunkFn func() Chunk +} + +func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampleCase)) { + const nSamples = 120 // Same as tsdb.DefaultSamplesPerChunk. + + d, err := time.Parse(time.DateTime, "2025-11-04 10:01:05") + require.NoError(b, err) + + var ( + r = rand.New(rand.NewSource(1)) + initST = timestamp.FromTime(d) // Use realistic timestamp. + initT = initST + 15000 // 15s after initST. + initV = 1243535.123 + ) + + sampleCases := []sampleCase{ + { + name: "vt=constant/st=0", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + t += 15000 + ret = append(ret, triple{st: 0, t: t, v: v}) + } + return ret + }(), + }, + + { + // Cumulative with a constant ST through the whole chunk, typical case (e.g. long counting counter). + name: "vt=constant/st=cumulative", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + t += 15000 + ret = append(ret, triple{st: initST, t: t, v: v}) + } + return ret + }(), + }, + { + // Delta simulates delta type or worst case for cumulatives, where ST + // is changing on every sample. + name: "vt=constant/st=delta", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + st := t + 1 // ST is a tight interval after the last t+1ms. + t += 15000 + ret = append(ret, triple{st: st, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random steps/st=0", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps. + ret = append(ret, triple{st: 0, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random steps/st=cumulative", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps. + ret = append(ret, triple{st: initST, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random steps/st=delta", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + st := t + 1 // ST is a tight interval after the last t+1ms. + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps. + ret = append(ret, triple{st: st, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random 0-1/st=0", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += r.Float64() // Random between 0 and 1.0. + ret = append(ret, triple{st: 0, t: t, v: v}) + } + return ret + }(), + }, + { + // Are we impacted by https://victoriametrics.com/blog/go-protobuf/ negative varint issue? (zig-zag needed?) + name: "vt=negrandom 0-1/st=0", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v -= r.Float64() // Random between 0 and 1.0. + ret = append(ret, triple{st: 0, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random 0-1/st=cumulative", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += r.Float64() // Random between 0 and 1.0. + ret = append(ret, triple{st: initST, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random 0-1/st=cumulative-periodic-resets", + samples: func() (ret []triple) { + t, v := initT, initV + for i := range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += r.Float64() // Random between 0 and 1.0. + st := initST + if i%6 == 5 { + st = t - 10000 // Reset of 10s before current t. + } + ret = append(ret, triple{st: st, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random 0-1/st=cumulative-periodic-zeros", + samples: func() (ret []triple) { + t, v := initT, initV + for i := range nSamples { + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += r.Float64() // Random between 0 and 1.0. + st := initST + if i%6 == 5 { + st = 0 + } + ret = append(ret, triple{st: st, t: t, v: v}) + } + return ret + }(), + }, + { + name: "vt=random 0-1/st=delta", + samples: func() (ret []triple) { + t, v := initT, initV + for range nSamples { + st := t + 1 // ST is a tight interval after the last t+1ms. + t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter. + v += r.Float64() // Random between 0 and 1.0. + ret = append(ret, triple{st: st, t: t, v: v}) + } + return ret + }(), + }, + } + + for _, f := range []fmtCase{ + {name: "XOR", newChunkFn: func() Chunk { return NewXORChunk() }}, + {name: "XOR_OPT_ST", newChunkFn: func() Chunk { return NewXOROptSTChunk() }}, + } { + for _, s := range sampleCases { + b.Run(fmt.Sprintf("fmt=%s/%s", f.name, s.name), func(b *testing.B) { + fn(b, f, s) + }) + } + } +} + +/* + export bench=bw.bench/append.v2 && go test \ + -run '^$' -bench '^BenchmarkAppender' \ + -benchtime 1s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt + +For profiles: + + export bench=bw.bench/appendprof && go test \ + -run '^$' -bench '^BenchmarkAppender' \ + -benchtime 1s -count 1 -cpu 2 -timeout 999m \ + -cpuprofile=${bench}.cpu.pprof \ + | tee ${bench}.txt +*/ +func BenchmarkAppender(b *testing.B) { + foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) { + b.ReportAllocs() + + for b.Loop() { + c := f.newChunkFn() + + a, err := c.Appender() + if err != nil { + b.Fatalf("get appender: %s", err) + } + for _, p := range s.samples { + a.Append(p.st, p.t, p.v) + } + // NOTE: Some buffered implementations only encode on Bytes(). + b.ReportMetric(float64(len(c.Bytes())), "B/chunk") + + require.Equal(b, len(s.samples), c.NumSamples()) + } + }) +} + +/* + export bench=bw.bench/iter && go test \ + -run '^$' -bench '^BenchmarkIterator' \ + -benchtime 1s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt + +For profiles: + + export bench=bw.bench/iterprof && go test \ + -run '^$' -bench '^BenchmarkIterator' \ + -benchtime 1000000x -count 1 -cpu 2 -timeout 999m \ + -cpuprofile=${bench}.cpu.pprof \ + | tee ${bench}.txt + export bench=bw.bench/iterprof && go test \ + -run '^$' -bench '^BenchmarkIterator' \ + -benchtime 1000000x -count 1 -cpu 2 -timeout 999m \ + -memprofile=${bench}.mem.pprof \ + | tee ${bench}.txt +*/ +func BenchmarkIterator(b *testing.B) { + foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) { + floatEquals := func(a, b float64) bool { + return a == b + } + if f.name == "ALPBuffered" { + // Hack as ALP loses precision. + floatEquals = func(a, b float64) bool { + return math.Abs(a-b) < 1e-6 + } + } + b.ReportAllocs() + + c := f.newChunkFn() + a, err := c.Appender() + if err != nil { + b.Fatalf("get appender: %s", err) + } + for _, p := range s.samples { + a.Append(p.st, p.t, p.v) + } + + // Some chunk implementations might be buffered. Reset to ensure we don't reuse + // appending buffers. + c.Reset(c.Bytes()) + + // While we are at it, test if encoding/decoding works. + it := c.Iterator(nil) + require.Equal(b, len(s.samples), c.NumSamples()) + var got []triple + for i := 0; it.Next() == ValFloat; i++ { + t, v := it.At() + got = append(got, triple{st: it.AtST(), t: t, v: v}) + } + if err := it.Err(); err != nil && !errors.Is(err, io.EOF) { + require.NoError(b, err) + } + if diff := cmp.Diff(s.samples, got, cmp.AllowUnexported(triple{}), cmp.Comparer(floatEquals)); diff != "" { + b.Fatalf("mismatch (-want +got):\n%s", diff) + } + + var sink float64 + // Measure decoding efficiency. + for i := 0; b.Loop(); { + // Some chunk implementations might be buffered. Reset to ensure we don't reuse + // previous decoded data. + c.Reset(c.Bytes()) + b.ReportMetric(float64(len(c.Bytes())), "B/chunk") + + it := c.Iterator(it) + for it.Next() == ValFloat { + _, v := it.At() + sink = v + i++ + } + if err := it.Err(); err != nil && !errors.Is(err, io.EOF) { + require.NoError(b, err) + } + _ = sink + } + }) +} diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 711966ec39..71f38e7a7e 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -30,6 +30,7 @@ const ( EncXOR EncHistogram EncFloatHistogram + EncXOROptST ) func (e Encoding) String() string { @@ -42,13 +43,15 @@ func (e Encoding) String() string { return "histogram" case EncFloatHistogram: return "floathistogram" + case EncXOROptST: + return "XOR-start-timestamp" } return "" } // IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { - return e == EncXOR || e == EncHistogram || e == EncFloatHistogram + return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOROptST } const ( @@ -299,6 +302,7 @@ type pool struct { xor sync.Pool histogram sync.Pool floatHistogram sync.Pool + xoroptst sync.Pool } // NewPool returns a new pool. @@ -319,6 +323,11 @@ func NewPool() Pool { return &FloatHistogramChunk{b: bstream{}} }, }, + xoroptst: sync.Pool{ + New: func() any { + return &XorOptSTChunk{b: bstream{}} + }, + }, } } @@ -331,6 +340,8 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { c = p.histogram.Get().(*HistogramChunk) case EncFloatHistogram: c = p.floatHistogram.Get().(*FloatHistogramChunk) + case EncXOROptST: + c = p.xoroptst.Get().(*XorOptSTChunk) default: return nil, fmt.Errorf("invalid chunk encoding %q", e) } @@ -352,6 +363,9 @@ func (p *pool) Put(c Chunk) error { case EncFloatHistogram: _, ok = c.(*FloatHistogramChunk) sp = &p.floatHistogram + case EncXOROptST: + _, ok = c.(*XorOptSTChunk) + sp = &p.xoroptst default: return fmt.Errorf("invalid chunk encoding %q", c.Encoding()) } diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 41bb23ddd1..92fa3cab38 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -16,36 +16,41 @@ package chunkenc import ( "errors" "fmt" - "io" "math/rand" "testing" "github.com/stretchr/testify/require" ) -type pair struct { - t int64 - v float64 +type triple struct { + st, t int64 + v float64 } func TestChunk(t *testing.T) { - for enc, nc := range map[Encoding]func() Chunk{ - EncXOR: func() Chunk { return NewXORChunk() }, - } { - t.Run(fmt.Sprintf("%v", enc), func(t *testing.T) { + testcases := []struct { + encoding Encoding + supportsST bool + factory func() Chunk + }{ + {encoding: EncXOR, supportsST: false, factory: func() Chunk { return NewXORChunk() }}, + {encoding: EncXOROptST, supportsST: true, factory: func() Chunk { return NewXOROptSTChunk() }}, + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("%v", tc.encoding), func(t *testing.T) { for range make([]struct{}, 1) { - c := nc() - testChunk(t, c) + c := tc.factory() + testChunk(t, c, tc.supportsST) } }) } } -func testChunk(t *testing.T, c Chunk) { +func testChunk(t *testing.T, c Chunk, supportsST bool) { app, err := c.Appender() require.NoError(t, err) - var exp []pair + var exp []triple var ( ts = int64(1234123324) v = 1243535.123 @@ -65,26 +70,30 @@ func testChunk(t *testing.T, c Chunk) { require.NoError(t, err) } - app.Append(0, ts, v) - exp = append(exp, pair{t: ts, v: v}) + app.Append(ts-100, ts, v) + expST := int64(0) + if supportsST { + expST = ts - 100 + } + exp = append(exp, triple{st: expST, t: ts, v: v}) } // 1. Expand iterator in simple case. it1 := c.Iterator(nil) - var res1 []pair + var res1 []triple for it1.Next() == ValFloat { ts, v := it1.At() - res1 = append(res1, pair{t: ts, v: v}) + res1 = append(res1, triple{st: it1.AtST(), t: ts, v: v}) } require.NoError(t, it1.Err()) require.Equal(t, exp, res1) // 2. Expand second iterator while reusing first one. it2 := c.Iterator(it1) - var res2 []pair + var res2 []triple for it2.Next() == ValFloat { ts, v := it2.At() - res2 = append(res2, pair{t: ts, v: v}) + res2 = append(res2, triple{st: it2.AtST(), t: ts, v: v}) } require.NoError(t, it2.Err()) require.Equal(t, exp, res2) @@ -93,17 +102,17 @@ func testChunk(t *testing.T, c Chunk) { mid := len(exp) / 2 it3 := c.Iterator(nil) - var res3 []pair + var res3 []triple require.Equal(t, ValFloat, it3.Seek(exp[mid].t)) // Below ones should not matter. require.Equal(t, ValFloat, it3.Seek(exp[mid].t)) require.Equal(t, ValFloat, it3.Seek(exp[mid].t)) ts, v = it3.At() - res3 = append(res3, pair{t: ts, v: v}) + res3 = append(res3, triple{st: it3.AtST(), t: ts, v: v}) for it3.Next() == ValFloat { ts, v := it3.At() - res3 = append(res3, pair{t: ts, v: v}) + res3 = append(res3, triple{st: it3.AtST(), t: ts, v: v}) } require.NoError(t, it3.Err()) require.Equal(t, exp[mid:], res3) @@ -129,6 +138,10 @@ func TestPool(t *testing.T) { name: "float histogram", encoding: EncFloatHistogram, }, + { + name: "xor opt st", + encoding: EncXOROptST, + }, { name: "invalid encoding", encoding: EncNone, @@ -150,6 +163,8 @@ func TestPool(t *testing.T) { b = &c.(*HistogramChunk).b case EncFloatHistogram: b = &c.(*FloatHistogramChunk).b + case EncXOROptST: + b = &c.(*XorOptSTChunk).b default: b = &c.(*XORChunk).b } @@ -199,111 +214,3 @@ func (c fakeChunk) Encoding() Encoding { func (c fakeChunk) Reset([]byte) { c.t.Fatal("Reset should not be called") } - -func benchmarkIterator(b *testing.B, newChunk func() Chunk) { - const samplesPerChunk = 250 - var ( - t = int64(1234123324) - v = 1243535.123 - exp []pair - ) - for range samplesPerChunk { - // t += int64(rand.Intn(10000) + 1) - t += int64(1000) - // v = rand.Float64() - v += float64(100) - exp = append(exp, pair{t: t, v: v}) - } - - chunk := newChunk() - { - a, err := chunk.Appender() - if err != nil { - b.Fatalf("get appender: %s", err) - } - j := 0 - for _, p := range exp { - if j > 250 { - break - } - a.Append(0, p.t, p.v) - j++ - } - } - - b.ReportAllocs() - - var res float64 - var it Iterator - for i := 0; b.Loop(); { - it := chunk.Iterator(it) - - for it.Next() == ValFloat { - _, v := it.At() - res = v - i++ - } - if err := it.Err(); err != nil && !errors.Is(err, io.EOF) { - require.NoError(b, err) - } - _ = res - } -} - -func newXORChunk() Chunk { - return NewXORChunk() -} - -func BenchmarkXORIterator(b *testing.B) { - benchmarkIterator(b, newXORChunk) -} - -func BenchmarkXORAppender(b *testing.B) { - r := rand.New(rand.NewSource(1)) - b.Run("constant", func(b *testing.B) { - benchmarkAppender(b, func() (int64, float64) { - return 1000, 0 - }, newXORChunk) - }) - b.Run("random steps", func(b *testing.B) { - benchmarkAppender(b, func() (int64, float64) { - return int64(r.Intn(100) - 50 + 15000), // 15 seconds +- up to 100ms of jitter. - float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps. - }, newXORChunk) - }) - b.Run("random 0-1", func(b *testing.B) { - benchmarkAppender(b, func() (int64, float64) { - return int64(r.Intn(100) - 50 + 15000), // 15 seconds +- up to 100ms of jitter. - r.Float64() // Random between 0 and 1.0. - }, newXORChunk) - }) -} - -func benchmarkAppender(b *testing.B, deltas func() (int64, float64), newChunk func() Chunk) { - var ( - t = int64(1234123324) - v = 1243535.123 - ) - const nSamples = 120 // Same as tsdb.DefaultSamplesPerChunk. - var exp []pair - for range nSamples { - dt, dv := deltas() - t += dt - v += dv - exp = append(exp, pair{t: t, v: v}) - } - - b.ReportAllocs() - - for b.Loop() { - c := newChunk() - - a, err := c.Appender() - if err != nil { - b.Fatalf("get appender: %s", err) - } - for _, p := range exp { - a.Append(0, p.t, p.v) - } - } -} diff --git a/tsdb/chunkenc/st_helper_test.go b/tsdb/chunkenc/st_helper_test.go new file mode 100644 index 0000000000..7f657a4293 --- /dev/null +++ b/tsdb/chunkenc/st_helper_test.go @@ -0,0 +1,124 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" +) + +// testChunkSTHandling tests handling of start times in chunks. +// It uses 0-4 samples with timestamp 1000,2000,3000,4000 and monotonically +// increasing start times that are chosen from 0-(ts-500) for each sample. +// All combinations of start times are tested for each number of samples. +func testChunkSTHandling(t *testing.T, vt ValueType, chunkFactory func() Chunk) { + sampleAppend := func(app Appender, vt ValueType, st, ts int64, v float64) { + switch vt { + case ValFloat: + app.Append(st, ts, v) + case ValHistogram: + _, recoded, _, err := app.AppendHistogram(nil, st, ts, &histogram.Histogram{Sum: v, Count: uint64(v * 10)}, false) + require.NoError(t, err) + require.False(t, recoded) + case ValFloatHistogram: + _, recoded, _, err := app.AppendFloatHistogram(nil, st, ts, &histogram.FloatHistogram{Sum: v, Count: v * 10}, false) + require.NoError(t, err) + require.False(t, recoded) + default: + t.Fatalf("unsupported value type %v", vt) + } + } + + get := func(it Iterator, vt ValueType) (int64, int64, float64) { + switch vt { + case ValFloat: + ts, v := it.At() + return it.AtST(), ts, v + case ValHistogram: + ts, h := it.AtHistogram(nil) + return it.AtST(), ts, float64(h.Sum) + case ValFloatHistogram: + ts, fh := it.AtFloatHistogram(nil) + return it.AtST(), ts, fh.Sum + default: + t.Fatalf("unsupported value type %v", vt) + return 0, 0, 0 + } + } + + runCase := func(t *testing.T, samples []triple) { + chunk := chunkFactory() + app, err := chunk.Appender() + require.NoError(t, err) + for _, s := range samples { + sampleAppend(app, vt, s.st, s.t, s.v) + } + it := chunk.Iterator(nil) + for i, s := range samples { + require.Equal(t, vt, it.Next()) + st, ts, f := get(it, vt) + require.Equal(t, s.t, ts, "%d: timestamp mismatch", i) + require.Equal(t, s.st, st, "%d: start time mismatch", i) + require.InDelta(t, s.v, f, 1e-9, "%d: value mismatch", i) + } + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + } + + t.Run("manual for debugging", func(t *testing.T) { + samples := []triple{ + {st: 0, t: 1000, v: 1.5}, + {st: 0, t: 2000, v: 2.5}, + {st: 0, t: 3000, v: 3.5}, + {st: 0, t: 4000, v: 4.5}, + } + runCase(t, samples) + }) + + stTimes := []int64{0, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000} + for numberOfSamples := range 5 { + samples := make([]triple, numberOfSamples) + sampleSTidx := make([]int, numberOfSamples) + for { + for j := range numberOfSamples { + samples[j] = triple{ + st: stTimes[sampleSTidx[j]], + t: int64(1000 * (j + 1)), + v: float64(j) + 0.5, + } + } + + t.Run(fmt.Sprintf("%v", samples), func(t *testing.T) { + runCase(t, samples) + }) + + exhausted := true + for j := numberOfSamples - 1; j >= 0; j-- { + if sampleSTidx[j] < j+2 { + sampleSTidx[j]++ + exhausted = false + break + } + sampleSTidx[j] = 0 + } + if exhausted { + break + } + } + } +} diff --git a/tsdb/chunkenc/varbit_classic.go b/tsdb/chunkenc/varbit_classic.go new file mode 100644 index 0000000000..b8f293bc3f --- /dev/null +++ b/tsdb/chunkenc/varbit_classic.go @@ -0,0 +1,103 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +// putClassicVarbitInt writes an int64 using varbit encoding with a bit bucketing +// as it was done for a long time in the initial XOR chunk format. +func putClassicVarbitInt(b *bstream, val int64) { + // Gorilla has a max resolution of seconds, Prometheus milliseconds. + // Thus we use higher value range steps with larger bit size. + // + // TODO(beorn7): This seems to needlessly jump to large bit + // sizes even for very small deviations from zero. Timestamp + // compression can probably benefit from some smaller bit + // buckets. See also what was done for histogram encoding in + // varbit.go. + switch { + case val == 0: + b.writeBit(zero) + case bitRange(val, 14): + b.writeByte(0b10<<6 | (uint8(val>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod. + b.writeByte(uint8(val)) // Bottom 8 bits of dod. + case bitRange(val, 17): + b.writeBits(0b110, 3) + b.writeBits(uint64(val), 17) + case bitRange(val, 20): + b.writeBits(0b1110, 4) + b.writeBits(uint64(val), 20) + default: + b.writeBits(0b1111, 4) + b.writeBits(uint64(val), 64) + } +} + +// readClassicVarbitInt reads an int64 encoded with putClassicVarbitInt. +// This is copied into production code to make it inline. +func readClassicVarbitInt(b *bstreamReader) (int64, error) { + var d byte + // read delta-of-delta + for range 4 { + d <<= 1 + bit, err := b.readBitFast() + if err != nil { + bit, err = b.readBit() + if err != nil { + return 0, err + } + } + if bit == zero { + break + } + d |= 1 + } + var sz uint8 + var val int64 + switch d { + case 0b0: + // dod == 0 + case 0b10: + sz = 14 + case 0b110: + sz = 17 + case 0b1110: + sz = 20 + case 0b1111: + // Do not use fast because it's very unlikely it will succeed. + bits, err := b.readBits(64) + if err != nil { + return 0, err + } + + val = int64(bits) + } + + if sz != 0 { + bits, err := b.readBitsFast(sz) + if err != nil { + bits, err = b.readBits(sz) + if err != nil { + return 0, err + } + } + + // Account for negative numbers, which come back as high unsigned numbers. + // See docs/bstream.md. + if bits > (1 << (sz - 1)) { + bits -= 1 << sz + } + val = int64(bits) + } + + return val, nil +} diff --git a/tsdb/chunkenc/varbit_classic_test.go b/tsdb/chunkenc/varbit_classic_test.go new file mode 100644 index 0000000000..f64d2ca9a9 --- /dev/null +++ b/tsdb/chunkenc/varbit_classic_test.go @@ -0,0 +1,57 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestClassicVarbitInt(t *testing.T) { + numbers := []int64{ + math.MinInt64, + -36028797018963968, -36028797018963967, + -16777216, -16777215, + -131072, -131071, + -2048, -2047, + -256, -255, + -32, -31, + -4, -3, + -1, 0, 1, + 4, 5, + 32, 33, + 256, 257, + 2048, 2049, + 131072, 131073, + 16777216, 16777217, + 36028797018963968, 36028797018963969, + math.MaxInt64, + } + + bs := bstream{} + + for _, n := range numbers { + putClassicVarbitInt(&bs, n) + } + + bsr := newBReader(bs.bytes()) + + for _, want := range numbers { + got, err := readClassicVarbitInt(&bsr) + require.NoError(t, err) + require.Equal(t, want, got) + } +} diff --git a/tsdb/chunkenc/xoroptst.go b/tsdb/chunkenc/xoroptst.go new file mode 100644 index 0000000000..6ac4122d73 --- /dev/null +++ b/tsdb/chunkenc/xoroptst.go @@ -0,0 +1,681 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "encoding/binary" + "math" + + "github.com/prometheus/prometheus/model/histogram" +) + +const ( + chunkSTHeaderSize = 1 + maxFirstSTChangeOn = 0x7F +) + +func writeHeaderFirstSTKnown(b []byte) { + b[0] = 0x80 +} + +func writeHeaderFirstSTChangeOn(b []byte, firstSTChangeOn uint16) { + // First bit indicates the initial ST value. + // Here we save the sample number from where the first change occurs in the + // rest of the byte (7 bits) + + if firstSTChangeOn > maxFirstSTChangeOn { + // This should never happen, would cause corruption (ST already skipped but shouldn't). + return + } + b[0] |= uint8(firstSTChangeOn) +} + +func readSTHeader(b []byte) (firstSTKnown bool, firstSTChangeOn uint16) { + if b[0] == 0x00 { + return false, 0 + } + if b[0] == 0x80 { + return true, 0 + } + mask := byte(0x80) + if b[0]&mask != 0 { + firstSTKnown = true + } + mask = 0x7F + return firstSTKnown, uint16(b[0] & mask) +} + +// XorOptSTChunk holds encoded sample data: +// 2B(numSamples), 1B(stHeader), ?varint(st), varint(t), xor(v), ?varuint(stDelta), varuint(tDelta), xor(v), ?classicvarbitint(stDod), classicvarbitint(tDod), xor(v), ... +// stHeader: 1b(firstSTKnown), 7b(firstSTChangeOn). +type XorOptSTChunk struct { + b bstream +} + +// NewXOROptSTChunk returns a new chunk with XORv2 encoding. +func NewXOROptSTChunk() *XorOptSTChunk { + b := make([]byte, chunkHeaderSize+chunkSTHeaderSize, chunkAllocationSize) + return &XorOptSTChunk{b: bstream{stream: b, count: 0}} +} + +func (c *XorOptSTChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + +// Encoding returns the encoding type. +func (*XorOptSTChunk) Encoding() Encoding { + return EncXOROptST +} + +// Bytes returns the underlying byte slice of the chunk. +func (c *XorOptSTChunk) Bytes() []byte { + return c.b.bytes() +} + +// NumSamples returns the number of samples in the chunk. +func (c *XorOptSTChunk) NumSamples() int { + return int(binary.BigEndian.Uint16(c.Bytes())) +} + +// Compact implements the Chunk interface. +func (c *XorOptSTChunk) Compact() { + if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { + buf := make([]byte, l) + copy(buf, c.b.stream) + c.b.stream = buf + } +} + +// Appender implements the Chunk interface. +// It is not valid to call Appender() multiple times concurrently or to use multiple +// Appenders on the same chunk. +func (c *XorOptSTChunk) Appender() (Appender, error) { + if len(c.b.stream) == chunkHeaderSize+chunkSTHeaderSize { // Avoid allocating an Iterator when chunk is empty. + return &xorOptSTAppender{b: &c.b, t: math.MinInt64, leading: 0xff}, nil + } + it := c.iterator(nil) + + // To get an appender we must know the state it would have if we had + // appended all existing data from scratch. + // We iterate through the end and populate via the iterator's state. + for it.Next() != ValNone { + } + if err := it.Err(); err != nil { + return nil, err + } + + a := &xorOptSTAppender{ + b: &c.b, + st: it.st, + t: it.t, + v: it.val, + stDelta: it.stDelta, + tDelta: it.tDelta, + leading: it.leading, + trailing: it.trailing, + + numTotal: it.numTotal, + firstSTKnown: it.firstSTKnown, + firstSTChangeOn: it.firstSTChangeOn, + } + return a, nil +} + +func (c *XorOptSTChunk) iterator(it Iterator) *xorOptSTtIterator { + xorIter, ok := it.(*xorOptSTtIterator) + if !ok { + xorIter = &xorOptSTtIterator{} + } + + xorIter.Reset(c.b.bytes()) + return xorIter +} + +// Iterator implements the Chunk interface. +// Iterator() must not be called concurrently with any modifications to the chunk, +// but after it returns you can use an Iterator concurrently with an Appender or +// other Iterators. +func (c *XorOptSTChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) +} + +type xorOptSTAppender struct { + b *bstream + numTotal uint16 + + firstSTKnown bool + firstSTChangeOn uint16 + + st, t int64 + v float64 + stDelta int64 + tDelta uint64 + + leading uint8 + trailing uint8 +} + +func (a *xorOptSTAppender) writeVDelta(v float64) { + xorWrite(a.b, v, a.v, &a.leading, &a.trailing) +} + +func (*xorOptSTAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) { + panic("appended a histogram sample to a float chunk") +} + +func (*xorOptSTAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) { + panic("appended a float histogram sample to a float chunk") +} + +const ( + read0State uint8 = iota + read1State + readDoDMaybeSTState + readDoDNoSTState + readDoDState + + eofState uint8 = 1<<8 - 1 +) + +type xorOptSTtIterator struct { + br bstreamReader + numTotal uint16 + + firstSTKnown bool + firstSTChangeOn uint16 + + state uint8 + numRead uint16 + + st, t int64 + val float64 + + leading uint8 + trailing uint8 + + stDelta int64 + tDelta uint64 + err error +} + +func (it *xorOptSTtIterator) Seek(t int64) ValueType { + if it.state == eofState { + return ValNone + } + + for t > it.t || it.state == read0State { + if it.Next() == ValNone { + return ValNone + } + } + return ValFloat +} + +func (it *xorOptSTtIterator) At() (int64, float64) { + return it.t, it.val +} + +func (*xorOptSTtIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { + panic("cannot call xorIterator.AtHistogram") +} + +func (*xorOptSTtIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + panic("cannot call xorIterator.AtFloatHistogram") +} + +func (it *xorOptSTtIterator) AtT() int64 { + return it.t +} + +func (it *xorOptSTtIterator) AtST() int64 { + return it.st +} + +func (it *xorOptSTtIterator) Err() error { + return it.err +} + +func (it *xorOptSTtIterator) Reset(b []byte) { + // We skip initial headers for actual samples. + it.br = newBReader(b[chunkHeaderSize+chunkSTHeaderSize:]) + it.numTotal = binary.BigEndian.Uint16(b) + it.firstSTKnown, it.firstSTChangeOn = readSTHeader(b[chunkHeaderSize:]) + it.numRead = 0 + it.st = 0 + it.t = 0 + it.val = 0 + it.leading = 0 + it.trailing = 0 + it.stDelta = 0 + it.tDelta = 0 + it.err = nil + it.state = read0State + if it.numRead >= it.numTotal { + it.state = eofState + } +} + +func (a *xorOptSTAppender) Append(st, t int64, v float64) { + if st == 0 && a.numTotal != maxFirstSTChangeOn && a.firstSTChangeOn == 0 && !a.firstSTKnown { + // Fast path for no ST usage at all. + // Same as classic XOR chunk appender. + + var tDelta uint64 + + switch a.numTotal { + case 0: + buf := make([]byte, binary.MaxVarintLen64) + for _, b := range buf[:binary.PutVarint(buf, t)] { + a.b.writeByte(b) + } + a.b.writeBits(math.Float64bits(v), 64) + case 1: + buf := make([]byte, binary.MaxVarintLen64) + tDelta = uint64(t - a.t) + for _, b := range buf[:binary.PutUvarint(buf, tDelta)] { + a.b.writeByte(b) + } + a.writeVDelta(v) + default: + tDelta = uint64(t - a.t) + dod := int64(tDelta - a.tDelta) + + // Gorilla has a max resolution of seconds, Prometheus milliseconds. + // Thus we use higher value range steps with larger bit size. + // + // TODO(beorn7): This seems to needlessly jump to large bit + // sizes even for very small deviations from zero. Timestamp + // compression can probably benefit from some smaller bit + // buckets. See also what was done for histogram encoding in + // varbit.go. + switch { + case dod == 0: + a.b.writeBit(zero) + case bitRange(dod, 14): + a.b.writeByte(0b10<<6 | (uint8(dod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod. + a.b.writeByte(uint8(dod)) // Bottom 8 bits of dod. + case bitRange(dod, 17): + a.b.writeBits(0b110, 3) + a.b.writeBits(uint64(dod), 17) + case bitRange(dod, 20): + a.b.writeBits(0b1110, 4) + a.b.writeBits(uint64(dod), 20) + default: + a.b.writeBits(0b1111, 4) + a.b.writeBits(uint64(dod), 64) + } + + a.writeVDelta(v) + } + + a.t = t + a.v = v + a.tDelta = tDelta + a.numTotal++ + binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) + + return + } + + var ( + stDelta int64 + tDelta uint64 + stChanged bool + ) + + // Slow path for ST usage. + switch a.numTotal { + case 0: + buf := make([]byte, binary.MaxVarintLen64) + + for _, b := range buf[:binary.PutVarint(buf, st)] { + a.b.writeByte(b) + } + writeHeaderFirstSTKnown(a.b.bytes()[chunkHeaderSize:]) + a.firstSTKnown = true + + for _, b := range buf[:binary.PutVarint(buf, t)] { + a.b.writeByte(b) + } + a.b.writeBits(math.Float64bits(v), 64) + case 1: + buf := make([]byte, binary.MaxVarintLen64) + stDelta = st - a.st + if stDelta != 0 { + stChanged = true + for _, b := range buf[:binary.PutVarint(buf, stDelta)] { + a.b.writeByte(b) + } + } + + tDelta = uint64(t - a.t) + for _, b := range buf[:binary.PutUvarint(buf, tDelta)] { + a.b.writeByte(b) + } + a.writeVDelta(v) + default: + if a.firstSTChangeOn == 0 && a.numTotal == maxFirstSTChangeOn { + // We are at the 127th sample. firstSTChangeOn can only fit 7 bits due to a + // single byte header constrain, which is fine, given typical 120 sample size. + a.firstSTChangeOn = a.numTotal + writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.firstSTChangeOn) + } + + stDelta = st - a.st + sdod := stDelta - a.stDelta + if sdod != 0 || a.firstSTChangeOn != 0 { + stChanged = true + // Gorilla has a max resolution of seconds, Prometheus milliseconds. + // Thus we use higher value range steps with larger bit size. + // + // TODO(beorn7): This seems to needlessly jump to large bit + // sizes even for very small deviations from zero. Timestamp + // compression can probably benefit from some smaller bit + // buckets. See also what was done for histogram encoding in + // varbit.go. + switch { + case sdod == 0: + a.b.writeBit(zero) + case bitRange(sdod, 14): + a.b.writeByte(0b10<<6 | (uint8(sdod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod. + a.b.writeByte(uint8(sdod)) // Bottom 8 bits of dod. + case bitRange(sdod, 17): + a.b.writeBits(0b110, 3) + a.b.writeBits(uint64(sdod), 17) + case bitRange(sdod, 20): + a.b.writeBits(0b1110, 4) + a.b.writeBits(uint64(sdod), 20) + default: + a.b.writeBits(0b1111, 4) + a.b.writeBits(uint64(sdod), 64) + } + // putClassicVarbitInt(a.b, sdod) + } + + tDelta = uint64(t - a.t) + dod := int64(tDelta - a.tDelta) + + // Gorilla has a max resolution of seconds, Prometheus milliseconds. + // Thus we use higher value range steps with larger bit size. + // + // TODO(beorn7): This seems to needlessly jump to large bit + // sizes even for very small deviations from zero. Timestamp + // compression can probably benefit from some smaller bit + // buckets. See also what was done for histogram encoding in + // varbit.go. + switch { + case dod == 0: + a.b.writeBit(zero) + case bitRange(dod, 14): + a.b.writeByte(0b10<<6 | (uint8(dod>>8) & (1<<6 - 1))) // 0b10 size code combined with 6 bits of dod. + a.b.writeByte(uint8(dod)) // Bottom 8 bits of dod. + case bitRange(dod, 17): + a.b.writeBits(0b110, 3) + a.b.writeBits(uint64(dod), 17) + case bitRange(dod, 20): + a.b.writeBits(0b1110, 4) + a.b.writeBits(uint64(dod), 20) + default: + a.b.writeBits(0b1111, 4) + a.b.writeBits(uint64(dod), 64) + } + + a.writeVDelta(v) + } + + a.st = st + a.t = t + a.v = v + a.tDelta = tDelta + a.stDelta = stDelta + + a.numTotal++ + binary.BigEndian.PutUint16(a.b.bytes(), a.numTotal) + + // firstSTChangeOn == 0 indicates that we have one ST value (zero or not) + // for all STs in the appends until now. If we see a first "update" + // we are saving this number in the header and continue tracking all DoD (including zeros). + if a.firstSTChangeOn == 0 && stChanged { + a.firstSTChangeOn = a.numTotal - 1 + writeHeaderFirstSTChangeOn(a.b.bytes()[chunkHeaderSize:], a.firstSTChangeOn) + } +} + +func (it *xorOptSTtIterator) retErr(err error) ValueType { + it.err = err + it.state = eofState + return ValNone +} + +func (it *xorOptSTtIterator) Next() ValueType { + switch it.state { + case eofState: + return ValNone + case read0State: + it.state++ + + // Optional ST read. + if it.firstSTKnown { + st, err := binary.ReadVarint(&it.br) + if err != nil { + return it.retErr(err) + } + it.st = st + } + + // TS. + t, err := binary.ReadVarint(&it.br) + if err != nil { + return it.retErr(err) + } + // Value. + v, err := it.br.readBits(64) + if err != nil { + return it.retErr(err) + } + + it.t = t + it.val = math.Float64frombits(v) + + // State EOF check. + it.numRead++ + if it.numRead >= it.numTotal { + it.state = eofState + } + return ValFloat + case read1State: + it.state++ + switch it.firstSTChangeOn { + case 0: + // This means we have same (zero or non-zero) ST value for the rest of + // chunk. We can simply use ~classic XOR chunk iterations. + it.state = readDoDNoSTState + case 1: + // We got early ST change on the second sample, likely delta. + // Continue ST rich flow from the next iteration. + it.state = readDoDState + + stDelta, err := binary.ReadVarint(&it.br) + if err != nil { + return it.retErr(err) + } + it.stDelta = stDelta + it.st += it.stDelta + } + // TS. + tDelta, err := binary.ReadUvarint(&it.br) + if err != nil { + return it.retErr(err) + } + it.tDelta = tDelta + it.t += int64(it.tDelta) + + // Value. + if err := xorRead(&it.br, &it.val, &it.leading, &it.trailing); err != nil { + return it.retErr(err) + } + + // State EOF check. + it.numRead++ + if it.numRead >= it.numTotal { + it.state = eofState + } + return ValFloat + case readDoDMaybeSTState: + if it.firstSTChangeOn == it.numRead { + // ST changes from this iteration, change state for future. + it.state = readDoDState + return it.dodNext() + } + return it.dodNoSTNext() + case readDoDState: + return it.dodNext() + case readDoDNoSTState: + return it.dodNoSTNext() + default: + panic("xorOptSTtIterator: broken machine state") + } +} + +func (it *xorOptSTtIterator) dodNext() ValueType { + // Inlined readClassicVarbitInt(&it.br) + var d byte + // read delta-of-delta + for range 4 { + d <<= 1 + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + if err != nil { + return it.retErr(err) + } + } + if bit == zero { + break + } + d |= 1 + } + var sz uint8 + var sdod int64 + switch d { + case 0b0: + // dod == 0 + case 0b10: + sz = 14 + case 0b110: + sz = 17 + case 0b1110: + sz = 20 + case 0b1111: + // Do not use fast because it's very unlikely it will succeed. + bits, err := it.br.readBits(64) + if err != nil { + return it.retErr(err) + } + + sdod = int64(bits) + } + + if sz != 0 { + bits, err := it.br.readBitsFast(sz) + if err != nil { + bits, err = it.br.readBits(sz) + if err != nil { + return it.retErr(err) + } + } + + // Account for negative numbers, which come back as high unsigned numbers. + // See docs/bstream.md. + if bits > (1 << (sz - 1)) { + bits -= 1 << sz + } + sdod = int64(bits) + } + + it.stDelta += sdod + it.st += it.stDelta + return it.dodNoSTNext() +} + +func (it *xorOptSTtIterator) dodNoSTNext() ValueType { + // Inlined readClassicVarbitInt(&it.br) + var d byte + // read delta-of-delta + for range 4 { + d <<= 1 + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + if err != nil { + return it.retErr(err) + } + } + if bit == zero { + break + } + d |= 1 + } + var sz uint8 + var dod int64 + switch d { + case 0b0: + // dod == 0 + case 0b10: + sz = 14 + case 0b110: + sz = 17 + case 0b1110: + sz = 20 + case 0b1111: + // Do not use fast because it's very unlikely it will succeed. + bits, err := it.br.readBits(64) + if err != nil { + return it.retErr(err) + } + + dod = int64(bits) + } + + if sz != 0 { + bits, err := it.br.readBitsFast(sz) + if err != nil { + bits, err = it.br.readBits(sz) + if err != nil { + return it.retErr(err) + } + } + + // Account for negative numbers, which come back as high unsigned numbers. + // See docs/bstream.md. + if bits > (1 << (sz - 1)) { + bits -= 1 << sz + } + dod = int64(bits) + } + + it.tDelta = uint64(int64(it.tDelta) + dod) + it.t += int64(it.tDelta) + // Value. + if err := xorRead(&it.br, &it.val, &it.leading, &it.trailing); err != nil { + return it.retErr(err) + } + + // State EOF check. + it.numRead++ + if it.numRead >= it.numTotal { + it.state = eofState + } + return ValFloat +} diff --git a/tsdb/chunkenc/xoroptst_test.go b/tsdb/chunkenc/xoroptst_test.go new file mode 100644 index 0000000000..fe41b751fc --- /dev/null +++ b/tsdb/chunkenc/xoroptst_test.go @@ -0,0 +1,108 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestXorOptSTChunk(t *testing.T) { + testChunkSTHandling(t, ValFloat, func() Chunk { + return NewXOROptSTChunk() + }, + ) +} + +func TestXorOptSTChunk_MoreThan127Samples(t *testing.T) { + const afterMax = maxFirstSTChangeOn + 3 + t.Run("zero ST", func(t *testing.T) { + chunk := NewXOROptSTChunk() + app, err := chunk.Appender() + require.NoError(t, err) + for i := range afterMax { + app.Append(0, int64(i*10+1), float64(i)*1.5) + } + + it := chunk.Iterator(nil) + for i := range afterMax { + require.Equal(t, ValFloat, it.Next()) + st := it.AtST() + ts, v := it.At() + require.Equal(t, int64(0), st) + require.Equal(t, int64(i*10+1), ts) + require.Equal(t, float64(i)*1.5, v) + } + + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + }) + + t.Run("non-zero ST after 127", func(t *testing.T) { + chunk := NewXOROptSTChunk() + app, err := chunk.Appender() + require.NoError(t, err) + for i := range afterMax { + st := int64(0) + if i == afterMax-1 { + st = int64((afterMax - 1) * 10) + } + app.Append(st, int64(i*10+1), float64(i)*1.5) + } + + it := chunk.Iterator(nil) + for i := range afterMax { + require.Equal(t, ValFloat, it.Next()) + st := it.AtST() + ts, v := it.At() + if i == afterMax-1 { + require.Equal(t, int64((afterMax-1)*10), st) + } else { + require.Equal(t, int64(0), st) + } + require.Equal(t, int64(i*10+1), ts) + require.Equal(t, float64(i)*1.5, v) + } + + require.Equal(t, ValNone, it.Next()) + require.NoError(t, it.Err()) + }) +} + +func TestXorOptSTChunk_STHeader(t *testing.T) { + b := make([]byte, 1) + writeHeaderFirstSTKnown(b) + firstSTKnown, firstSTChangeOn := readSTHeader(b) + require.True(t, firstSTKnown) + require.Equal(t, uint16(0), firstSTChangeOn) + + b = make([]byte, 1) + firstSTKnown, firstSTChangeOn = readSTHeader(b) + require.False(t, firstSTKnown) + require.Equal(t, uint16(0), firstSTChangeOn) + + b = make([]byte, 1) + writeHeaderFirstSTChangeOn(b, 1) + firstSTKnown, firstSTChangeOn = readSTHeader(b) + require.False(t, firstSTKnown) + require.Equal(t, uint16(1), firstSTChangeOn) + + b = make([]byte, 1) + writeHeaderFirstSTKnown(b) + writeHeaderFirstSTChangeOn(b, 119) + firstSTKnown, firstSTChangeOn = readSTHeader(b) + require.True(t, firstSTKnown) + require.Equal(t, uint16(119), firstSTChangeOn) +}