feat(storage): seriesToChunkEncoder select target encoding by config

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2026-02-08 10:14:23 +01:00
parent 06c6201748
commit 080fa034a6
No known key found for this signature in database
GPG Key ID: 47A8F9CE80FD7C7F
13 changed files with 87 additions and 77 deletions

View File

@ -838,10 +838,16 @@ func main() {
template.RegisterFeatures(features.DefaultRegistry)
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
storeST = cfg.tsdb.EnableSTStorage
)
if agentMode {
storeST = cfg.agent.EnableSTStorage
}
var (
remoteStorage = remote.NewStorageWithStoreST(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels, storeST)
fanoutStorage = storage.NewFanoutWithStoreST(logger, storeST, localStorage, remoteStorage)
)
var (

View File

@ -31,6 +31,7 @@ type fanout struct {
primary Storage
secondaries []Storage
storeST bool
}
// NewFanout returns a new fanout Storage, which proxies reads and writes
@ -43,10 +44,16 @@ type fanout struct {
//
// NOTE: In the case of Prometheus, it treats all remote storages as secondary / best effort.
func NewFanout(logger *slog.Logger, primary Storage, secondaries ...Storage) Storage {
return NewFanoutWithStoreST(logger, false, primary, secondaries...)
}
// NewFanoutWithStoreST returns a new fanout Storage with start timestamp storage enabled or disabled.
func NewFanoutWithStoreST(logger *slog.Logger, storeST bool, primary Storage, secondaries ...Storage) Storage {
return &fanout{
logger: logger,
primary: primary,
secondaries: secondaries,
storeST: storeST,
}
}
@ -120,7 +127,7 @@ func (f *fanout) ChunkQuerier(mint, maxt int64) (ChunkQuerier, error) {
}
secondaries = append(secondaries, querier)
}
return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil
return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMergerWithStoreST(ChainedSeriesMerge, f.storeST)), nil
}
func (f *fanout) Appender(ctx context.Context) Appender {

View File

@ -722,6 +722,11 @@ func (h *samplesIteratorHeap) Pop() any {
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return NewCompactingChunkSeriesMergerWithStoreST(mergeFunc, false)
}
// NewCompactingChunkSeriesMergerWithStoreST is like NewCompactingChunkSeriesMerger, but uses storeST when re-encoding.
func NewCompactingChunkSeriesMergerWithStoreST(mergeFunc VerticalSeriesMergeFunc, storeST bool) VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
@ -736,6 +741,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
storeST: storeST,
}
},
}
@ -748,6 +754,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
iterators []chunks.Iterator
storeST bool
h chunkIteratorHeap
@ -813,7 +820,7 @@ func (c *compactChunkIterator) Next() bool {
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...)).Iterator(nil)
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...), c.storeST).Iterator(nil)
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false

View File

@ -29,6 +29,7 @@ type sampleAndChunkQueryableClient struct {
requiredMatchers []*labels.Matcher
readRecent bool
callback startTimeCallback
storeST bool
}
// NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.
@ -38,6 +39,7 @@ func NewSampleAndChunkQueryableClient(
requiredMatchers []*labels.Matcher,
readRecent bool,
callback startTimeCallback,
storeST bool,
) storage.SampleAndChunkQueryable {
return &sampleAndChunkQueryableClient{
client: c,
@ -46,6 +48,7 @@ func NewSampleAndChunkQueryableClient(
requiredMatchers: requiredMatchers,
readRecent: readRecent,
callback: callback,
storeST: storeST,
}
}
@ -84,6 +87,7 @@ func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage.
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
},
storeST: c.storeST,
}
if c.readRecent {
return cq, nil
@ -229,13 +233,14 @@ func (*querier) Close() error {
// chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier.
type chunkQuerier struct {
querier
storeST bool
}
// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client.
// It uses remote.querier.Select so it supports external labels and required matchers if specified.
func (q *chunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
// TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket).
return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...))
return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...), q.storeST)
}
// Note strings in toFilter must be sorted.

View File

@ -527,6 +527,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
tc.requiredMatchers,
tc.readRecent,
tc.callback,
false,
)
q, err := c.Querier(tc.mint, tc.maxt)
require.NoError(t, err)

View File

@ -56,7 +56,8 @@ type Storage struct {
logger *slog.Logger
mtx sync.Mutex
rws *WriteStorage
rws *WriteStorage
storeST bool
// For reads.
queryables []storage.SampleAndChunkQueryable
@ -67,6 +68,11 @@ var _ storage.Storage = &Storage{}
// NewStorage returns a remote.Storage.
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage {
return NewStorageWithStoreST(l, reg, stCallback, walDir, flushDeadline, sm, enableTypeAndUnitLabels, false)
}
// NewStorageWithStoreST returns a remote.Storage with start timestamp storage enabled or disabled.
func NewStorageWithStoreST(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels, storeST bool) *Storage {
if l == nil {
l = promslog.NewNopLogger()
}
@ -77,6 +83,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC
logger: logger,
deduper: deduper,
localStartTimeCallback: stCallback,
storeST: storeST,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, enableTypeAndUnitLabels)
return s
@ -139,6 +146,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
labelsToEqualityMatchers(rrConf.RequiredMatchers),
rrConf.ReadRecent,
s.localStartTimeCallback,
s.storeST,
))
}
s.queryables = queryables
@ -187,7 +195,7 @@ func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
}
queriers = append(queriers, q)
}
return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, s.storeST)), nil
}
// Appender implements storage.Storage.

View File

@ -289,11 +289,12 @@ func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series {
type seriesSetToChunkSet struct {
SeriesSet
storeST bool
}
// NewSeriesSetToChunkSet converts SeriesSet to ChunkSeriesSet by encoding chunks from samples.
func NewSeriesSetToChunkSet(chk SeriesSet) ChunkSeriesSet {
return &seriesSetToChunkSet{SeriesSet: chk}
func NewSeriesSetToChunkSet(chk SeriesSet, storeST bool) ChunkSeriesSet {
return &seriesSetToChunkSet{SeriesSet: chk, storeST: storeST}
}
func (c *seriesSetToChunkSet) Next() bool {
@ -304,7 +305,7 @@ func (c *seriesSetToChunkSet) Next() bool {
}
func (c *seriesSetToChunkSet) At() ChunkSeries {
return NewSeriesToChunkEncoder(c.SeriesSet.At())
return NewSeriesToChunkEncoder(c.SeriesSet.At(), c.storeST)
}
func (c *seriesSetToChunkSet) Err() error {
@ -313,13 +314,14 @@ func (c *seriesSetToChunkSet) Err() error {
type seriesToChunkEncoder struct {
Series
storeST bool
}
const seriesToChunkEncoderSplit = 120
// NewSeriesToChunkEncoder encodes samples to chunks with 120 samples limit.
func NewSeriesToChunkEncoder(series Series) ChunkSeries {
return &seriesToChunkEncoder{series}
func NewSeriesToChunkEncoder(series Series, storeST bool) ChunkSeries {
return &seriesToChunkEncoder{Series: series, storeST: storeST}
}
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
@ -341,14 +343,12 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
i := 0
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
lastHadST := false
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
st := seriesIter.AtST()
hasST := st != 0
if typ != lastType || lastHadST != hasST || i >= seriesToChunkEncoderSplit {
if typ != lastType || i >= seriesToChunkEncoderSplit {
// Create a new chunk if the sample type changed or too many samples in the current one.
chks = appendChunk(chks, mint, maxt, chk)
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(), hasST)
chk, err = typ.NewChunk(s.storeST)
if err != nil {
return errChunksIterator{err: err}
}
@ -361,7 +361,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
i = 0
}
lastType = typ
lastHadST = hasST
var (
t int64

View File

@ -601,7 +601,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
}
}
series := NewListSeries(lbs, copiedSamples)
encoder := NewSeriesToChunkEncoder(series)
encoder := NewSeriesToChunkEncoder(series, false)
require.Equal(t, lbs, encoder.Labels())
chks, err := ExpandChunks(encoder.Iterator(nil))

View File

@ -49,9 +49,11 @@ func (e Encoding) String() string {
return "<unknown>"
}
const EncodingForFloatST = EncXOROptST
// IsValidEncoding returns true for supported encodings.
func IsValidEncoding(e Encoding) bool {
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOROptST
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncodingForFloatST
}
const (
@ -191,40 +193,11 @@ func (v ValueType) String() string {
}
}
func (v ValueType) ChunkEncoding() Encoding {
switch v {
case ValFloat:
return EncXOR
case ValHistogram:
return EncHistogram
case ValFloatHistogram:
return EncFloatHistogram
default:
return EncNone
}
}
func (v ValueType) ChunkEncodingWithST(st int64) Encoding {
switch v {
case ValFloat:
if st != 0 {
return EncXOROptST
}
return EncXOR
case ValHistogram:
return EncHistogram
case ValFloatHistogram:
return EncFloatHistogram
default:
return EncNone
}
}
func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding {
func (v ValueType) ChunkEncoding(storeST bool) Encoding {
switch v {
case ValFloat:
if storeST {
return EncXOROptST
return EncodingForFloatST
}
return EncXOR
case ValHistogram:
@ -237,21 +210,7 @@ func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding {
}
func (v ValueType) NewChunk(storeST bool) (Chunk, error) {
switch v {
case ValFloat:
if storeST {
return NewXOROptSTChunk(), nil
}
return NewXORChunk(), nil
case ValHistogram:
// TODO(krajorama): return a ST capable histogram chunk when they are supported.
return NewHistogramChunk(), nil
case ValFloatHistogram:
// TODO(krajorama): return a ST capable float histogram chunk when they are supported.
return NewFloatHistogramChunk(), nil
default:
return nil, fmt.Errorf("value type %v unsupported", v)
}
return NewEmptyChunk(v.ChunkEncoding(storeST), storeST)
}
// MockSeriesIterator returns an iterator for a mock series with custom
@ -438,11 +397,12 @@ func FromData(e Encoding, d []byte) (Chunk, error) {
}
// NewEmptyChunk returns an empty chunk for the given encoding.
// TODO(krajorama): support storeST for histogram and float histogram chunks when they are implemented.
func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) {
switch e {
case EncXOR:
if storeST {
return NewXOROptSTChunk(), nil
return newEmptyChunkWithST(EncodingForFloatST), nil
}
return NewXORChunk(), nil
case EncHistogram:
@ -454,3 +414,13 @@ func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) {
}
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}
func newEmptyChunkWithST(e Encoding) Chunk {
switch e {
case EncXOROptST:
return NewXOROptSTChunk()
default:
// The caller code is literally right above this function.
panic(fmt.Sprintf("invalid chunk encoding %q", e))
}
}

View File

@ -166,7 +166,7 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
}
// Request storing ST in the chunk if available.
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(), hasST)
c, err := sampleType.NewChunk(hasST)
if err != nil {
return Meta{}, err
}

View File

@ -178,6 +178,9 @@ type LeveledCompactorOptions struct {
// It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction.
EnableOverlappingCompaction bool
// EnableSTStorage determines whether compaction should re-encode chunks with start timestamps.
EnableSTStorage bool
// Metrics is set of metrics for Compactor. By default, NewCompactorMetrics would be called to initialize metrics unless it is provided.
Metrics *CompactorMetrics
// UseUncachedIO allows bypassing the page cache when appropriate.
@ -211,7 +214,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
}
mergeFunc := opts.MergeFunc
if mergeFunc == nil {
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
mergeFunc = storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, opts.EnableSTStorage)
}
maxBlockChunkSegmentSize := opts.MaxBlockChunkSegmentSize
if maxBlockChunkSegmentSize == 0 {

View File

@ -571,7 +571,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
return nil
}
func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQueryable, error) {
func (db *DBReadOnly) loadDataAsQueryable(maxt int64, enableSTStorage bool) (storage.SampleAndChunkQueryable, error) {
select {
case <-db.closed:
return nil, ErrClosed
@ -643,9 +643,12 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
}
db.closers = append(db.closers, head)
dbOpts := DefaultOptions()
dbOpts.EnableSTStorage = enableSTStorage
return &DB{
dir: db.dir,
logger: db.logger,
opts: dbOpts,
blocks: blocks,
head: head,
blockQuerierFunc: NewBlockQuerier,
@ -656,7 +659,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
// Querier loads the blocks and wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) {
q, err := db.loadDataAsQueryable(maxt)
q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled)
if err != nil {
return nil, err
}
@ -666,7 +669,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) {
// ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range.
// Current implementation doesn't support multiple ChunkQueriers.
func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := db.loadDataAsQueryable(maxt)
q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled)
if err != nil {
return nil, err
}
@ -965,6 +968,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
PD: opts.PostingsDecoderFactory,
UseUncachedIO: opts.UseUncachedIO,
BlockExcludeFilter: opts.BlockCompactionExcludeFunc,
EnableSTStorage: opts.EnableSTStorage,
})
}
if err != nil {
@ -2408,7 +2412,7 @@ func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
if err != nil {
return nil, err
}
return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, db.opts.EnableSTStorage)), nil
}
func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {

View File

@ -1839,7 +1839,7 @@ type chunkOpts struct {
// isolation for this append.)
// Series lock must be held when calling.
func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.ValFloat.ChunkEncodingWithStoreST(storeST), o)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.ValFloat.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1870,7 +1870,7 @@ func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Hist
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValHistogram.ChunkEncodingWithStoreST(storeST), o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValHistogram.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1927,7 +1927,7 @@ func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogra
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValFloatHistogram.ChunkEncodingWithStoreST(storeST), o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValFloatHistogram.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}