diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go index ddaf250c1b..d3a76820da 100644 --- a/promql/histogram_stats_iterator_test.go +++ b/promql/histogram_stats_iterator_test.go @@ -213,10 +213,6 @@ type histogramIterator struct { histograms []*histogram.Histogram } -func (*histogramIterator) Encoding() chunkenc.Encoding { - return chunkenc.EncFloatHistogram -} - func (h *histogramIterator) Next() chunkenc.ValueType { h.i++ if h.i < len(h.histograms) { diff --git a/promql/value.go b/promql/value.go index 499a2ca32b..17afdfc410 100644 --- a/promql/value.go +++ b/promql/value.go @@ -442,13 +442,6 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator { } } -func (ssi *storageSeriesIterator) Encoding() chunkenc.Encoding { - if ssi.currH != nil { - return chunkenc.EncFloatHistogram - } - return chunkenc.EncXOR -} - func (ssi *storageSeriesIterator) reset(series Series) { ssi.floats = series.Floats ssi.histograms = series.Histograms diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 9118211d59..61d1601bc0 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -416,7 +416,6 @@ type mockSeriesIterator struct { err func() error } -func (*mockSeriesIterator) Encoding() chunkenc.Encoding { return chunkenc.EncXOR } func (m *mockSeriesIterator) Seek(t int64) chunkenc.ValueType { return m.seek(t) } func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } func (m *mockSeriesIterator) Next() chunkenc.ValueType { return m.next() } @@ -468,10 +467,6 @@ func (*fakeSeriesIterator) AtST() int64 { return 0 // No start timestamps in this fake iterator. } -func (*fakeSeriesIterator) Encoding() chunkenc.Encoding { - return chunkenc.EncXOR -} - func (it *fakeSeriesIterator) Next() chunkenc.ValueType { it.idx++ if it.idx >= it.nsamples { diff --git a/storage/merge.go b/storage/merge.go index 4666338910..76bf0994e0 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -551,13 +551,6 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { return chunkenc.ValNone } -func (c *chainSampleIterator) Encoding() chunkenc.Encoding { - if c.curr == nil { - panic("chainSampleIterator.At called before first .Next or after .Next returned false.") - } - return c.curr.Encoding() -} - func (c *chainSampleIterator) At() (t int64, v float64) { if c.curr == nil { panic("chainSampleIterator.At called before first .Next or after .Next returned false.") diff --git a/storage/merge_test.go b/storage/merge_test.go index 04c6580ba4..e42a6a4ce1 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1692,10 +1692,6 @@ type errIterator struct { err error } -func (errIterator) Encoding() chunkenc.Encoding { - return chunkenc.EncNone -} - func (errIterator) Next() chunkenc.ValueType { return chunkenc.ValNone } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index f8c759fa6a..c689a51164 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -564,22 +564,6 @@ func (c *concreteSeriesIterator) AtT() int64 { return c.series.floats[c.floatsCur].Timestamp } -// TODO(krajorama): implement Encoding() with ST. Maybe. concreteSeriesIterator -// is used for turning query results into an iterable, but query results do not -// have ST. -func (c *concreteSeriesIterator) Encoding() chunkenc.Encoding { - switch c.curValType { - case chunkenc.ValFloat: - return chunkenc.EncXOR - case chunkenc.ValHistogram: - return chunkenc.EncHistogram - case chunkenc.ValFloatHistogram: - return chunkenc.EncFloatHistogram - default: - return chunkenc.EncNone - } -} - // TODO(krajorama): implement AtST. Maybe. concreteSeriesIterator is used // for turning query results into an iterable, but query results do not have ST. func (*concreteSeriesIterator) AtST() int64 { @@ -838,10 +822,6 @@ func (it *chunkedSeriesIterator) reset(chunks []prompb.Chunk, mint, maxt int64) } } -func (it *chunkedSeriesIterator) Encoding() chunkenc.Encoding { - return it.cur.Encoding() -} - func (it *chunkedSeriesIterator) At() (ts int64, v float64) { return it.cur.At() } diff --git a/storage/series.go b/storage/series.go index d7b21a0dc2..e51f8cfd96 100644 --- a/storage/series.go +++ b/storage/series.go @@ -113,15 +113,6 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator { return &listSeriesIterator{samples: samples, idx: -1} } -func (it *listSeriesIterator) Encoding() chunkenc.Encoding { - s := it.samples.Get(it.idx) - encoding := s.Type().ChunkEncoding(s.ST() != 0) - if encoding == chunkenc.EncNone { - panic(fmt.Sprintf("unknown sample type %s", s.Type().String())) - } - return encoding -} - func (it *listSeriesIterator) Reset(samples Samples) { it.samples = samples it.idx = -1 diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index f06a5bf208..de5fa0c2de 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -160,11 +160,6 @@ type Iterator interface { // Returns 0 if the start timestamp is not implemented or not set. // Before the iterator has advanced, the behaviour is unspecified. AtST() int64 - // Encoding returns what encoding to use for storing the current sample. - // Only call this as last resort if the encoding is really needed, and - // the current chunk isn't accessible otherwise. - // Before the iterator has advanced, the behaviour is unspecified. - Encoding() Encoding // Err returns the current error. It should be used only after the // iterator is exhausted, i.e. `Next` or `Seek` have returned ValNone. Err() error @@ -236,13 +231,6 @@ type mockSeriesIterator struct { currIndex int } -func (it *mockSeriesIterator) Encoding() Encoding { - if it.AtST() != 0 { - return EncXOROptST - } - return EncXOR -} - func (*mockSeriesIterator) Seek(int64) ValueType { return ValNone } func (it *mockSeriesIterator) At() (int64, float64) { @@ -285,7 +273,6 @@ func NewNopIterator() Iterator { type nopIterator struct{} -func (nopIterator) Encoding() Encoding { return EncNone } func (nopIterator) Next() ValueType { return ValNone } func (nopIterator) Seek(int64) ValueType { return ValNone } func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 28908325c0..6af2fa68e2 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -839,10 +839,6 @@ type floatHistogramIterator struct { atFloatHistogramCalled bool } -func (*floatHistogramIterator) Encoding() Encoding { - return EncFloatHistogram -} - func (it *floatHistogramIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 27d3318a2b..4e77f387d3 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -898,10 +898,6 @@ type histogramIterator struct { err error } -func (*histogramIterator) Encoding() Encoding { - return EncHistogram -} - func (it *histogramIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 860ed105b1..5a9a59dc22 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -248,10 +248,6 @@ type xorIterator struct { err error } -func (*xorIterator) Encoding() Encoding { - return EncXOR -} - func (it *xorIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/chunkenc/xoroptst.go b/tsdb/chunkenc/xoroptst.go index 90265b4543..b138ddbdf4 100644 --- a/tsdb/chunkenc/xoroptst.go +++ b/tsdb/chunkenc/xoroptst.go @@ -197,10 +197,6 @@ type xorOptSTtIterator struct { err error } -func (*xorOptSTtIterator) Encoding() Encoding { - return EncXOROptST -} - func (it *xorOptSTtIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/db_append_v2_test.go b/tsdb/db_append_v2_test.go index 9fb5ea07ac..72f609f49e 100644 --- a/tsdb/db_append_v2_test.go +++ b/tsdb/db_append_v2_test.go @@ -7529,18 +7529,17 @@ func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) { ctx := context.Background() app := db.AppenderV2(ctx) - maxt := 100 - for i := range maxt { + mint := 100 + maxt := 200 + for i := mint; i < maxt; i++ { // AppendV2 signature: (ref, labels, st, t, v, h, fh, opts) - // st=0 (start timestamp), t=i (sample timestamp) - // TODO(krajorama): verify with non zero st once the API supports it. - _, err := app.Append(0, labels.FromStrings("a", "b"), 0, int64(i), float64(i), nil, nil, storage.AOptions{}) + _, err := app.Append(0, labels.FromStrings("a", "b"), 50, int64(i), float64(i), nil, nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) // Compact the Head to create a new block. - require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1))) + require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), int64(mint), int64(maxt)-1))) // Check that we have exactly one block. require.Len(t, db.Blocks(), 1) b := db.Blocks()[0] @@ -7568,7 +7567,7 @@ func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) { c, _, err := chunkr.ChunkOrIterable(chk) require.NoError(t, err) require.Equal(t, chunkenc.EncXOROptST, c.Encoding(), - "expected EncXOROptST encoding when EnableSTStorage=true, got %s", c.Encoding()) + "unexpected chunk encoding, got %s", c.Encoding()) chunkCount++ } } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index a9db077228..cbdcfc5266 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -9627,9 +9627,10 @@ func TestStaleSeriesCompactionWithZeroSeries(t *testing.T) { require.Empty(t, db.Blocks()) } -// TestCompactHeadWithSTStorage ensures that when EnableSTStorage is true, -// compacted blocks contain chunks with EncXOR encoding for float samples -// when using the original Appender (which does not support start timestamps). +// TestCompactHeadWithSTStorage demonstrates that when no ST is stored in the +// head and truncated chunk is compacted, then recoding via +// populateWithDelChunkSeriesIterator.populateCurrForSingleChunk +// results in non ST capable chunk. func TestCompactHeadWithSTStorage(t *testing.T) { t.Parallel() @@ -9681,7 +9682,7 @@ func TestCompactHeadWithSTStorage(t *testing.T) { for _, chk := range chks { c, _, err := chunkr.ChunkOrIterable(chk) require.NoError(t, err) - require.Equal(t, chunkenc.EncXOROptST, c.Encoding(), + require.Equal(t, chunkenc.EncXOR, c.Encoding(), "expected EncXOR encoding when using original Appender, got %s", c.Encoding()) chunkCount++ } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ed447c5d50..da08cb2847 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5823,10 +5823,6 @@ func newUnsupportedChunk() *unsupportedChunk { return &unsupportedChunk{chunkenc.NewXORChunk()} } -func (*unsupportedChunk) Encoding() chunkenc.Encoding { - return EncUnsupportedXOR -} - // Tests https://github.com/prometheus/prometheus/issues/10277. func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { dir := t.TempDir() diff --git a/tsdb/querier.go b/tsdb/querier.go index 461b09a7f2..bcb7ba953e 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -771,10 +771,6 @@ func (p *populateWithDelSeriesIterator) Seek(t int64) chunkenc.ValueType { return chunkenc.ValNone } -func (p *populateWithDelSeriesIterator) Encoding() chunkenc.Encoding { - return p.curr.Encoding() -} - func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() } @@ -870,7 +866,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { // populateCurrForSingleChunk sets the fields within p.currMetaWithChunk. This // should be called if the samples in p.currDelIter only form one chunk. -// TODO(krajorama): test ST when chunks support it. func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { valueType := p.currDelIter.Next() if valueType == chunkenc.ValNone { @@ -880,6 +875,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { return false } p.currMetaWithChunk.MinTime = p.currDelIter.AtT() + needTS := p.currDelIter.AtST() != 0 // Re-encode the chunk if iterator is provided. This means that it has // some samples to be deleted or chunk is opened. @@ -889,66 +885,56 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { st, t int64 err error ) - switch valueType { - case chunkenc.ValHistogram: - // TODO(krajorama): handle ST capable histogram chunks when they are supported. - newChunk = chunkenc.NewHistogramChunk() - if app, err = newChunk.Appender(); err != nil { + newChunk, err = valueType.NewChunk(needTS) + if err != nil { + p.err = fmt.Errorf("create new chunk while re-encoding: %w", err) + return false + } + app, err = newChunk.Appender() + if err != nil { + p.err = fmt.Errorf("create appender while re-encoding: %w", err) + return false + } + +loop: + for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() { + if vt != valueType { + err = fmt.Errorf("found value type %v in chunk with %v", vt, valueType) break } - for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() { - if vt != chunkenc.ValHistogram { - err = fmt.Errorf("found value type %v in histogram chunk", vt) - break - } - var h *histogram.Histogram - t, h = p.currDelIter.AtHistogram(nil) - st = p.currDelIter.AtST() - _, _, app, err = app.AppendHistogram(nil, st, t, h, true) + st = p.currDelIter.AtST() + if !needTS && st != 0 { + // A sample with non-zero ST was found after creating a non-ST chunk. + // Recode all previously written samples into a new ST-capable chunk. + needTS = true + newChunk, app, err = recodeChunkToST(valueType, newChunk) if err != nil { - break + break loop } } - case chunkenc.ValFloat: - if p.currMeta.Chunk.Encoding() == chunkenc.EncXOROptST { - newChunk = chunkenc.NewXOROptSTChunk() - } else { - newChunk = chunkenc.NewXORChunk() - } - if app, err = newChunk.Appender(); err != nil { - break - } - for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() { - if vt != chunkenc.ValFloat { - err = fmt.Errorf("found value type %v in float chunk", vt) - break - } + switch vt { + case chunkenc.ValFloat: var v float64 t, v = p.currDelIter.At() - st = p.currDelIter.AtST() app.Append(st, t, v) - } - case chunkenc.ValFloatHistogram: - // TODO(krajorama): handle ST capable float histogram chunks when they are supported. - newChunk = chunkenc.NewFloatHistogramChunk() - if app, err = newChunk.Appender(); err != nil { - break - } - for vt := valueType; vt != chunkenc.ValNone; vt = p.currDelIter.Next() { - if vt != chunkenc.ValFloatHistogram { - err = fmt.Errorf("found value type %v in histogram chunk", vt) - break + case chunkenc.ValHistogram: + var h *histogram.Histogram + t, h = p.currDelIter.AtHistogram(nil) + _, _, app, err = app.AppendHistogram(nil, st, t, h, true) + if err != nil { + break loop } + case chunkenc.ValFloatHistogram: var h *histogram.FloatHistogram t, h = p.currDelIter.AtFloatHistogram(nil) - st = p.currDelIter.AtST() _, _, app, err = app.AppendFloatHistogram(nil, st, t, h, true) if err != nil { - break + break loop } + default: + err = fmt.Errorf("populateCurrForSingleChunk: value type %v unsupported", valueType) + break loop } - default: - err = fmt.Errorf("populateCurrForSingleChunk: value type %v unsupported", valueType) } if err != nil { @@ -965,6 +951,44 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { return true } +// recodeChunkToST creates a new ST-capable chunk of the given value type and +// copies all samples from oldChunk into it, preserving their ST values. +func recodeChunkToST(vt chunkenc.ValueType, oldChunk chunkenc.Chunk) (chunkenc.Chunk, chunkenc.Appender, error) { + newChunk, err := vt.NewChunk(true) + if err != nil { + return nil, nil, fmt.Errorf("create ST chunk for recoding: %w", err) + } + app, err := newChunk.Appender() + if err != nil { + return nil, nil, fmt.Errorf("create appender for ST chunk: %w", err) + } + it := oldChunk.Iterator(nil) + for { + switch it.Next() { + case chunkenc.ValNone: + if err := it.Err(); err != nil { + return nil, nil, fmt.Errorf("iterate old chunk for recoding: %w", err) + } + return newChunk, app, nil + case chunkenc.ValFloat: + t, v := it.At() + app.Append(it.AtST(), t, v) + case chunkenc.ValHistogram: + t, h := it.AtHistogram(nil) + _, _, app, err = app.AppendHistogram(nil, it.AtST(), t, h, true) + if err != nil { + return nil, nil, fmt.Errorf("recode histogram sample: %w", err) + } + case chunkenc.ValFloatHistogram: + t, h := it.AtFloatHistogram(nil) + _, _, app, err = app.AppendFloatHistogram(nil, it.AtST(), t, h, true) + if err != nil { + return nil, nil, fmt.Errorf("recode float histogram sample: %w", err) + } + } + } +} + // populateChunksFromIterable reads the samples from currDelIter to create // chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first // chunk. @@ -995,7 +1019,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { ) prevValueType := chunkenc.ValNone - prevEncoding := chunkenc.EncNone + hasTS := false for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() { var ( @@ -1006,21 +1030,24 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { // chunk as chunks can't have multiple encoding types). // For the first sample, the following condition will always be true as // ValNone != ValFloat | ValHistogram | ValFloatHistogram. - encoding := p.currDelIter.Encoding() - if currentValueType != prevValueType || (encoding != prevEncoding) { + // Also if we need to store start time (ST), but the current chunk is + // not capable. + st = p.currDelIter.AtST() + needTS := st != 0 + if currentValueType != prevValueType || !hasTS && needTS { if prevValueType != chunkenc.ValNone { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) } cmint = p.currDelIter.AtT() - if currentChunk, err = chunkenc.NewEmptyChunk(encoding); err != nil { + if currentChunk, err = currentValueType.NewChunk(needTS); err != nil { break } if app, err = currentChunk.Appender(); err != nil { break } + hasTS = needTS } - st = p.currDelIter.AtST() switch currentValueType { case chunkenc.ValFloat: { @@ -1060,7 +1087,6 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { cmaxt = t prevValueType = currentValueType - prevEncoding = encoding } if err != nil { @@ -1207,10 +1233,6 @@ type DeletedIterator struct { Intervals tombstones.Intervals } -func (it *DeletedIterator) Encoding() chunkenc.Encoding { - return it.Iter.Encoding() -} - func (it *DeletedIterator) At() (int64, float64) { return it.Iter.At() } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 46a8bc6127..b0c8f0a1f1 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -764,10 +764,6 @@ type mockSampleIterator struct { idx int } -func (it *mockSampleIterator) Encoding() chunkenc.Encoding { - return it.s[it.idx].Type().ChunkEncoding(it.s[it.idx].ST() != 0) -} - func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType { for ; it.idx < len(it.s); it.idx++ { if it.idx != -1 && it.s[it.idx].T() >= t { @@ -2125,6 +2121,13 @@ func TestPopulateWithDelChunkSeriesIterator_WithST(t *testing.T) { sample{st: 400, t: 4000, f: 4.0}, sample{st: 500, t: 5000, f: 5.0}, } + samplesWithNoLeadingST := []chunks.Sample{ + sample{st: 0, t: 1000, f: 1.0}, + sample{st: 0, t: 2000, f: 2.0}, + sample{st: 0, t: 3000, f: 3.0}, + sample{st: 400, t: 4000, f: 4.0}, + sample{st: 500, t: 5000, f: 5.0}, + } cases := []struct { name string @@ -2161,6 +2164,20 @@ func TestPopulateWithDelChunkSeriesIterator_WithST(t *testing.T) { sample{st: 500, t: 5000, f: 5.0}, }, }, + { + // This tests that populateCurrForSingleChunk can handle + // chunks that don't start with ST, but introduce ST later. + name: "delete first sample - ST late preserved", + samples: [][]chunks.Sample{samplesWithNoLeadingST}, + // Delete first sample. + intervals: tombstones.Intervals{{Mint: 1000, Maxt: 1000}}, + expected: []chunks.Sample{ + sample{st: 0, t: 2000, f: 2.0}, + sample{st: 0, t: 3000, f: 3.0}, + sample{st: 400, t: 4000, f: 4.0}, + sample{st: 500, t: 5000, f: 5.0}, + }, + }, } for _, tc := range cases { diff --git a/web/api/testhelpers/mocks.go b/web/api/testhelpers/mocks.go index 107728e37e..527febb727 100644 --- a/web/api/testhelpers/mocks.go +++ b/web/api/testhelpers/mocks.go @@ -245,10 +245,6 @@ type FakeSeriesIterator struct { idx int } -func (*FakeSeriesIterator) Encoding() chunkenc.Encoding { - return chunkenc.EncXOR -} - func (f *FakeSeriesIterator) Next() chunkenc.ValueType { f.idx++ if f.idx < len(f.samples) { @@ -312,10 +308,6 @@ type FakeHistogramSeriesIterator struct { idx int } -func (*FakeHistogramSeriesIterator) Encoding() chunkenc.Encoding { - return chunkenc.EncFloatHistogram -} - func (f *FakeHistogramSeriesIterator) Next() chunkenc.ValueType { f.idx++ if f.idx < len(f.histograms) {