fix(chunkenc): get rid of Iterator.Encoding()

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2026-02-18 12:57:32 +01:00
parent 7991bcbff9
commit cf929d6460
No known key found for this signature in database
GPG Key ID: 47A8F9CE80FD7C7F
18 changed files with 114 additions and 172 deletions

View File

@ -213,10 +213,6 @@ type histogramIterator struct {
histograms []*histogram.Histogram
}
func (*histogramIterator) Encoding() chunkenc.Encoding {
return chunkenc.EncFloatHistogram
}
func (h *histogramIterator) Next() chunkenc.ValueType {
h.i++
if h.i < len(h.histograms) {

View File

@ -442,13 +442,6 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator {
}
}
func (ssi *storageSeriesIterator) Encoding() chunkenc.Encoding {
if ssi.currH != nil {
return chunkenc.EncFloatHistogram
}
return chunkenc.EncXOR
}
func (ssi *storageSeriesIterator) reset(series Series) {
ssi.floats = series.Floats
ssi.histograms = series.Histograms

View File

@ -416,7 +416,6 @@ type mockSeriesIterator struct {
err func() error
}
func (*mockSeriesIterator) Encoding() chunkenc.Encoding { return chunkenc.EncXOR }
func (m *mockSeriesIterator) Seek(t int64) chunkenc.ValueType { return m.seek(t) }
func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) Next() chunkenc.ValueType { return m.next() }
@ -468,10 +467,6 @@ func (*fakeSeriesIterator) AtST() int64 {
return 0 // No start timestamps in this fake iterator.
}
func (*fakeSeriesIterator) Encoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (it *fakeSeriesIterator) Next() chunkenc.ValueType {
it.idx++
if it.idx >= it.nsamples {

View File

@ -551,13 +551,6 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
return chunkenc.ValNone
}
func (c *chainSampleIterator) Encoding() chunkenc.Encoding {
if c.curr == nil {
panic("chainSampleIterator.At called before first .Next or after .Next returned false.")
}
return c.curr.Encoding()
}
func (c *chainSampleIterator) At() (t int64, v float64) {
if c.curr == nil {
panic("chainSampleIterator.At called before first .Next or after .Next returned false.")

View File

@ -1692,10 +1692,6 @@ type errIterator struct {
err error
}
func (errIterator) Encoding() chunkenc.Encoding {
return chunkenc.EncNone
}
func (errIterator) Next() chunkenc.ValueType {
return chunkenc.ValNone
}

View File

@ -564,22 +564,6 @@ func (c *concreteSeriesIterator) AtT() int64 {
return c.series.floats[c.floatsCur].Timestamp
}
// TODO(krajorama): implement Encoding() with ST. Maybe. concreteSeriesIterator
// is used for turning query results into an iterable, but query results do not
// have ST.
func (c *concreteSeriesIterator) Encoding() chunkenc.Encoding {
switch c.curValType {
case chunkenc.ValFloat:
return chunkenc.EncXOR
case chunkenc.ValHistogram:
return chunkenc.EncHistogram
case chunkenc.ValFloatHistogram:
return chunkenc.EncFloatHistogram
default:
return chunkenc.EncNone
}
}
// TODO(krajorama): implement AtST. Maybe. concreteSeriesIterator is used
// for turning query results into an iterable, but query results do not have ST.
func (*concreteSeriesIterator) AtST() int64 {
@ -838,10 +822,6 @@ func (it *chunkedSeriesIterator) reset(chunks []prompb.Chunk, mint, maxt int64)
}
}
func (it *chunkedSeriesIterator) Encoding() chunkenc.Encoding {
return it.cur.Encoding()
}
func (it *chunkedSeriesIterator) At() (ts int64, v float64) {
return it.cur.At()
}

View File

@ -113,15 +113,6 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
return &listSeriesIterator{samples: samples, idx: -1}
}
func (it *listSeriesIterator) Encoding() chunkenc.Encoding {
s := it.samples.Get(it.idx)
encoding := s.Type().ChunkEncoding(s.ST() != 0)
if encoding == chunkenc.EncNone {
panic(fmt.Sprintf("unknown sample type %s", s.Type().String()))
}
return encoding
}
func (it *listSeriesIterator) Reset(samples Samples) {
it.samples = samples
it.idx = -1

View File

@ -160,11 +160,6 @@ type Iterator interface {
// Returns 0 if the start timestamp is not implemented or not set.
// Before the iterator has advanced, the behaviour is unspecified.
AtST() int64
// Encoding returns what encoding to use for storing the current sample.
// Only call this as last resort if the encoding is really needed, and
// the current chunk isn't accessible otherwise.
// Before the iterator has advanced, the behaviour is unspecified.
Encoding() Encoding
// Err returns the current error. It should be used only after the
// iterator is exhausted, i.e. `Next` or `Seek` have returned ValNone.
Err() error
@ -236,13 +231,6 @@ type mockSeriesIterator struct {
currIndex int
}
func (it *mockSeriesIterator) Encoding() Encoding {
if it.AtST() != 0 {
return EncXOROptST
}
return EncXOR
}
func (*mockSeriesIterator) Seek(int64) ValueType { return ValNone }
func (it *mockSeriesIterator) At() (int64, float64) {
@ -285,7 +273,6 @@ func NewNopIterator() Iterator {
type nopIterator struct{}
func (nopIterator) Encoding() Encoding { return EncNone }
func (nopIterator) Next() ValueType { return ValNone }
func (nopIterator) Seek(int64) ValueType { return ValNone }
func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 }

View File

@ -839,10 +839,6 @@ type floatHistogramIterator struct {
atFloatHistogramCalled bool
}
func (*floatHistogramIterator) Encoding() Encoding {
return EncFloatHistogram
}
func (it *floatHistogramIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone

View File

@ -898,10 +898,6 @@ type histogramIterator struct {
err error
}
func (*histogramIterator) Encoding() Encoding {
return EncHistogram
}
func (it *histogramIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone

View File

@ -248,10 +248,6 @@ type xorIterator struct {
err error
}
func (*xorIterator) Encoding() Encoding {
return EncXOR
}
func (it *xorIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone

View File

@ -197,10 +197,6 @@ type xorOptSTtIterator struct {
err error
}
func (*xorOptSTtIterator) Encoding() Encoding {
return EncXOROptST
}
func (it *xorOptSTtIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone

View File

@ -7529,18 +7529,17 @@ func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) {
ctx := context.Background()
app := db.AppenderV2(ctx)
maxt := 100
for i := range maxt {
mint := 100
maxt := 200
for i := mint; i < maxt; i++ {
// AppendV2 signature: (ref, labels, st, t, v, h, fh, opts)
// st=0 (start timestamp), t=i (sample timestamp)
// TODO(krajorama): verify with non zero st once the API supports it.
_, err := app.Append(0, labels.FromStrings("a", "b"), 0, int64(i), float64(i), nil, nil, storage.AOptions{})
_, err := app.Append(0, labels.FromStrings("a", "b"), 50, int64(i), float64(i), nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Compact the Head to create a new block.
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1)))
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), int64(mint), int64(maxt)-1)))
// Check that we have exactly one block.
require.Len(t, db.Blocks(), 1)
b := db.Blocks()[0]
@ -7568,7 +7567,7 @@ func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) {
c, _, err := chunkr.ChunkOrIterable(chk)
require.NoError(t, err)
require.Equal(t, chunkenc.EncXOROptST, c.Encoding(),
"expected EncXOROptST encoding when EnableSTStorage=true, got %s", c.Encoding())
"unexpected chunk encoding, got %s", c.Encoding())
chunkCount++
}
}

View File

@ -9627,9 +9627,10 @@ func TestStaleSeriesCompactionWithZeroSeries(t *testing.T) {
require.Empty(t, db.Blocks())
}
// TestCompactHeadWithSTStorage ensures that when EnableSTStorage is true,
// compacted blocks contain chunks with EncXOR encoding for float samples
// when using the original Appender (which does not support start timestamps).
// TestCompactHeadWithSTStorage demonstrates that when no ST is stored in the
// head and truncated chunk is compacted, then recoding via
// populateWithDelChunkSeriesIterator.populateCurrForSingleChunk
// results in non ST capable chunk.
func TestCompactHeadWithSTStorage(t *testing.T) {
t.Parallel()
@ -9681,7 +9682,7 @@ func TestCompactHeadWithSTStorage(t *testing.T) {
for _, chk := range chks {
c, _, err := chunkr.ChunkOrIterable(chk)
require.NoError(t, err)
require.Equal(t, chunkenc.EncXOROptST, c.Encoding(),
require.Equal(t, chunkenc.EncXOR, c.Encoding(),
"expected EncXOR encoding when using original Appender, got %s", c.Encoding())
chunkCount++
}

View File

@ -5823,10 +5823,6 @@ func newUnsupportedChunk() *unsupportedChunk {
return &unsupportedChunk{chunkenc.NewXORChunk()}
}
func (*unsupportedChunk) Encoding() chunkenc.Encoding {
return EncUnsupportedXOR
}
// Tests https://github.com/prometheus/prometheus/issues/10277.
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
dir := t.TempDir()

View File

@ -771,10 +771,6 @@ func (p *populateWithDelSeriesIterator) Seek(t int64) chunkenc.ValueType {
return chunkenc.ValNone
}
func (p *populateWithDelSeriesIterator) Encoding() chunkenc.Encoding {
return p.curr.Encoding()
}
func (p *populateWithDelSeriesIterator) At() (int64, float64) {
return p.curr.At()
}
@ -870,7 +866,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
// populateCurrForSingleChunk sets the fields within p.currMetaWithChunk. This
// should be called if the samples in p.currDelIter only form one chunk.
// TODO(krajorama): test ST when chunks support it.
func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
valueType := p.currDelIter.Next()
if valueType == chunkenc.ValNone {
@ -880,6 +875,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
return false
}
p.currMetaWithChunk.MinTime = p.currDelIter.AtT()
needTS := p.currDelIter.AtST() != 0
// Re-encode the chunk if iterator is provided. This means that it has
// some samples to be deleted or chunk is opened.
@ -889,66 +885,56 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
st, t int64
err error
)
switch valueType {
case chunkenc.ValHistogram:
// TODO(krajorama): handle ST capable histogram chunks when they are supported.
newChunk = chunkenc.NewHistogramChunk()
if app, err = newChunk.Appender(); err != nil {
newChunk, err = valueType.NewChunk(needTS)
if err != nil {
p.err = fmt.Errorf("create new chunk while re-encoding: %w", err)
return false
}
app, err = newChunk.Appender()
if err != nil {
p.err = fmt.Errorf("create appender while re-encoding: %w", err)
return false
}
loop:
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != valueType {
err = fmt.Errorf("found value type %v in chunk with %v", vt, valueType)
break
}
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram(nil)
st = p.currDelIter.AtST()
_, _, app, err = app.AppendHistogram(nil, st, t, h, true)
st = p.currDelIter.AtST()
if !needTS && st != 0 {
// A sample with non-zero ST was found after creating a non-ST chunk.
// Recode all previously written samples into a new ST-capable chunk.
needTS = true
newChunk, app, err = recodeChunkToST(valueType, newChunk)
if err != nil {
break
break loop
}
}
case chunkenc.ValFloat:
if p.currMeta.Chunk.Encoding() == chunkenc.EncXOROptST {
newChunk = chunkenc.NewXOROptSTChunk()
} else {
newChunk = chunkenc.NewXORChunk()
}
if app, err = newChunk.Appender(); err != nil {
break
}
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloat {
err = fmt.Errorf("found value type %v in float chunk", vt)
break
}
switch vt {
case chunkenc.ValFloat:
var v float64
t, v = p.currDelIter.At()
st = p.currDelIter.AtST()
app.Append(st, t, v)
}
case chunkenc.ValFloatHistogram:
// TODO(krajorama): handle ST capable float histogram chunks when they are supported.
newChunk = chunkenc.NewFloatHistogramChunk()
if app, err = newChunk.Appender(); err != nil {
break
}
for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloatHistogram {
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
case chunkenc.ValHistogram:
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram(nil)
_, _, app, err = app.AppendHistogram(nil, st, t, h, true)
if err != nil {
break loop
}
case chunkenc.ValFloatHistogram:
var h *histogram.FloatHistogram
t, h = p.currDelIter.AtFloatHistogram(nil)
st = p.currDelIter.AtST()
_, _, app, err = app.AppendFloatHistogram(nil, st, t, h, true)
if err != nil {
break
break loop
}
default:
err = fmt.Errorf("populateCurrForSingleChunk: value type %v unsupported", valueType)
break loop
}
default:
err = fmt.Errorf("populateCurrForSingleChunk: value type %v unsupported", valueType)
}
if err != nil {
@ -965,6 +951,44 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
return true
}
// recodeChunkToST creates a new ST-capable chunk of the given value type and
// copies all samples from oldChunk into it, preserving their ST values.
func recodeChunkToST(vt chunkenc.ValueType, oldChunk chunkenc.Chunk) (chunkenc.Chunk, chunkenc.Appender, error) {
newChunk, err := vt.NewChunk(true)
if err != nil {
return nil, nil, fmt.Errorf("create ST chunk for recoding: %w", err)
}
app, err := newChunk.Appender()
if err != nil {
return nil, nil, fmt.Errorf("create appender for ST chunk: %w", err)
}
it := oldChunk.Iterator(nil)
for {
switch it.Next() {
case chunkenc.ValNone:
if err := it.Err(); err != nil {
return nil, nil, fmt.Errorf("iterate old chunk for recoding: %w", err)
}
return newChunk, app, nil
case chunkenc.ValFloat:
t, v := it.At()
app.Append(it.AtST(), t, v)
case chunkenc.ValHistogram:
t, h := it.AtHistogram(nil)
_, _, app, err = app.AppendHistogram(nil, it.AtST(), t, h, true)
if err != nil {
return nil, nil, fmt.Errorf("recode histogram sample: %w", err)
}
case chunkenc.ValFloatHistogram:
t, h := it.AtFloatHistogram(nil)
_, _, app, err = app.AppendFloatHistogram(nil, it.AtST(), t, h, true)
if err != nil {
return nil, nil, fmt.Errorf("recode float histogram sample: %w", err)
}
}
}
}
// populateChunksFromIterable reads the samples from currDelIter to create
// chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first
// chunk.
@ -995,7 +1019,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
)
prevValueType := chunkenc.ValNone
prevEncoding := chunkenc.EncNone
hasTS := false
for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() {
var (
@ -1006,21 +1030,24 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
// chunk as chunks can't have multiple encoding types).
// For the first sample, the following condition will always be true as
// ValNone != ValFloat | ValHistogram | ValFloatHistogram.
encoding := p.currDelIter.Encoding()
if currentValueType != prevValueType || (encoding != prevEncoding) {
// Also if we need to store start time (ST), but the current chunk is
// not capable.
st = p.currDelIter.AtST()
needTS := st != 0
if currentValueType != prevValueType || !hasTS && needTS {
if prevValueType != chunkenc.ValNone {
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
}
cmint = p.currDelIter.AtT()
if currentChunk, err = chunkenc.NewEmptyChunk(encoding); err != nil {
if currentChunk, err = currentValueType.NewChunk(needTS); err != nil {
break
}
if app, err = currentChunk.Appender(); err != nil {
break
}
hasTS = needTS
}
st = p.currDelIter.AtST()
switch currentValueType {
case chunkenc.ValFloat:
{
@ -1060,7 +1087,6 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
cmaxt = t
prevValueType = currentValueType
prevEncoding = encoding
}
if err != nil {
@ -1207,10 +1233,6 @@ type DeletedIterator struct {
Intervals tombstones.Intervals
}
func (it *DeletedIterator) Encoding() chunkenc.Encoding {
return it.Iter.Encoding()
}
func (it *DeletedIterator) At() (int64, float64) {
return it.Iter.At()
}

View File

@ -764,10 +764,6 @@ type mockSampleIterator struct {
idx int
}
func (it *mockSampleIterator) Encoding() chunkenc.Encoding {
return it.s[it.idx].Type().ChunkEncoding(it.s[it.idx].ST() != 0)
}
func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType {
for ; it.idx < len(it.s); it.idx++ {
if it.idx != -1 && it.s[it.idx].T() >= t {
@ -2125,6 +2121,13 @@ func TestPopulateWithDelChunkSeriesIterator_WithST(t *testing.T) {
sample{st: 400, t: 4000, f: 4.0},
sample{st: 500, t: 5000, f: 5.0},
}
samplesWithNoLeadingST := []chunks.Sample{
sample{st: 0, t: 1000, f: 1.0},
sample{st: 0, t: 2000, f: 2.0},
sample{st: 0, t: 3000, f: 3.0},
sample{st: 400, t: 4000, f: 4.0},
sample{st: 500, t: 5000, f: 5.0},
}
cases := []struct {
name string
@ -2161,6 +2164,20 @@ func TestPopulateWithDelChunkSeriesIterator_WithST(t *testing.T) {
sample{st: 500, t: 5000, f: 5.0},
},
},
{
// This tests that populateCurrForSingleChunk can handle
// chunks that don't start with ST, but introduce ST later.
name: "delete first sample - ST late preserved",
samples: [][]chunks.Sample{samplesWithNoLeadingST},
// Delete first sample.
intervals: tombstones.Intervals{{Mint: 1000, Maxt: 1000}},
expected: []chunks.Sample{
sample{st: 0, t: 2000, f: 2.0},
sample{st: 0, t: 3000, f: 3.0},
sample{st: 400, t: 4000, f: 4.0},
sample{st: 500, t: 5000, f: 5.0},
},
},
}
for _, tc := range cases {

View File

@ -245,10 +245,6 @@ type FakeSeriesIterator struct {
idx int
}
func (*FakeSeriesIterator) Encoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (f *FakeSeriesIterator) Next() chunkenc.ValueType {
f.idx++
if f.idx < len(f.samples) {
@ -312,10 +308,6 @@ type FakeHistogramSeriesIterator struct {
idx int
}
func (*FakeHistogramSeriesIterator) Encoding() chunkenc.Encoding {
return chunkenc.EncFloatHistogram
}
func (f *FakeHistogramSeriesIterator) Next() chunkenc.ValueType {
f.idx++
if f.idx < len(f.histograms) {