diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c5ff339656..0fd108ec22 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -838,10 +838,16 @@ func main() { template.RegisterFeatures(features.DefaultRegistry) var ( - localStorage = &readyStorage{stats: tsdb.NewDBStats()} - scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels) - fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) + localStorage = &readyStorage{stats: tsdb.NewDBStats()} + scraper = &readyScrapeManager{} + storeST = cfg.tsdb.EnableSTStorage + ) + if agentMode { + storeST = cfg.agent.EnableSTStorage + } + var ( + remoteStorage = remote.NewStorageWithStoreST(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels, storeST) + fanoutStorage = storage.NewFanoutWithStoreST(logger, storeST, localStorage, remoteStorage) ) var ( diff --git a/storage/fanout.go b/storage/fanout.go index 21f5f715e4..248fc43e58 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -31,6 +31,7 @@ type fanout struct { primary Storage secondaries []Storage + storeST bool } // NewFanout returns a new fanout Storage, which proxies reads and writes @@ -43,10 +44,16 @@ type fanout struct { // // NOTE: In the case of Prometheus, it treats all remote storages as secondary / best effort. func NewFanout(logger *slog.Logger, primary Storage, secondaries ...Storage) Storage { + return NewFanoutWithStoreST(logger, false, primary, secondaries...) +} + +// NewFanoutWithStoreST returns a new fanout Storage with start timestamp storage enabled or disabled. +func NewFanoutWithStoreST(logger *slog.Logger, storeST bool, primary Storage, secondaries ...Storage) Storage { return &fanout{ logger: logger, primary: primary, secondaries: secondaries, + storeST: storeST, } } @@ -120,7 +127,7 @@ func (f *fanout) ChunkQuerier(mint, maxt int64) (ChunkQuerier, error) { } secondaries = append(secondaries, querier) } - return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil + return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMergerWithStoreST(ChainedSeriesMerge, f.storeST)), nil } func (f *fanout) Appender(ctx context.Context) Appender { diff --git a/storage/merge.go b/storage/merge.go index 76bf0994e0..ec17e39ee1 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -722,6 +722,11 @@ func (h *samplesIteratorHeap) Pop() any { // NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead // to handle overlaps between series. func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc { + return NewCompactingChunkSeriesMergerWithStoreST(mergeFunc, false) +} + +// NewCompactingChunkSeriesMergerWithStoreST is like NewCompactingChunkSeriesMerger, but uses storeST when re-encoding. +func NewCompactingChunkSeriesMergerWithStoreST(mergeFunc VerticalSeriesMergeFunc, storeST bool) VerticalChunkSeriesMergeFunc { return func(series ...ChunkSeries) ChunkSeries { if len(series) == 0 { return nil @@ -736,6 +741,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC return &compactChunkIterator{ mergeFunc: mergeFunc, iterators: iterators, + storeST: storeST, } }, } @@ -748,6 +754,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC type compactChunkIterator struct { mergeFunc VerticalSeriesMergeFunc iterators []chunks.Iterator + storeST bool h chunkIteratorHeap @@ -813,7 +820,7 @@ func (c *compactChunkIterator) Next() bool { } // Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here. - iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...)).Iterator(nil) + iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...), c.storeST).Iterator(nil) if !iter.Next() { if c.err = iter.Err(); c.err != nil { return false diff --git a/storage/remote/read.go b/storage/remote/read.go index 70b55980b8..0bd9877bda 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -29,6 +29,7 @@ type sampleAndChunkQueryableClient struct { requiredMatchers []*labels.Matcher readRecent bool callback startTimeCallback + storeST bool } // NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets. @@ -38,6 +39,7 @@ func NewSampleAndChunkQueryableClient( requiredMatchers []*labels.Matcher, readRecent bool, callback startTimeCallback, + storeST bool, ) storage.SampleAndChunkQueryable { return &sampleAndChunkQueryableClient{ client: c, @@ -46,6 +48,7 @@ func NewSampleAndChunkQueryableClient( requiredMatchers: requiredMatchers, readRecent: readRecent, callback: callback, + storeST: storeST, } } @@ -84,6 +87,7 @@ func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage. externalLabels: c.externalLabels, requiredMatchers: c.requiredMatchers, }, + storeST: c.storeST, } if c.readRecent { return cq, nil @@ -229,13 +233,14 @@ func (*querier) Close() error { // chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier. type chunkQuerier struct { querier + storeST bool } // Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client. // It uses remote.querier.Select so it supports external labels and required matchers if specified. func (q *chunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { // TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket). - return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...)) + return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...), q.storeST) } // Note strings in toFilter must be sorted. diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 49f29d9001..bb2cc3204f 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -527,6 +527,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { tc.requiredMatchers, tc.readRecent, tc.callback, + false, ) q, err := c.Querier(tc.mint, tc.maxt) require.NoError(t, err) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index be75d23383..e3d7cb366c 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -56,7 +56,8 @@ type Storage struct { logger *slog.Logger mtx sync.Mutex - rws *WriteStorage + rws *WriteStorage + storeST bool // For reads. queryables []storage.SampleAndChunkQueryable @@ -67,6 +68,11 @@ var _ storage.Storage = &Storage{} // NewStorage returns a remote.Storage. func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage { + return NewStorageWithStoreST(l, reg, stCallback, walDir, flushDeadline, sm, enableTypeAndUnitLabels, false) +} + +// NewStorageWithStoreST returns a remote.Storage with start timestamp storage enabled or disabled. +func NewStorageWithStoreST(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels, storeST bool) *Storage { if l == nil { l = promslog.NewNopLogger() } @@ -77,6 +83,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC logger: logger, deduper: deduper, localStartTimeCallback: stCallback, + storeST: storeST, } s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, enableTypeAndUnitLabels) return s @@ -139,6 +146,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { labelsToEqualityMatchers(rrConf.RequiredMatchers), rrConf.ReadRecent, s.localStartTimeCallback, + s.storeST, )) } s.queryables = queryables @@ -187,7 +195,7 @@ func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { } queriers = append(queriers, q) } - return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil + return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, s.storeST)), nil } // Appender implements storage.Storage. diff --git a/storage/series.go b/storage/series.go index 2d7f643826..69e35bb3ce 100644 --- a/storage/series.go +++ b/storage/series.go @@ -289,11 +289,12 @@ func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series { type seriesSetToChunkSet struct { SeriesSet + storeST bool } // NewSeriesSetToChunkSet converts SeriesSet to ChunkSeriesSet by encoding chunks from samples. -func NewSeriesSetToChunkSet(chk SeriesSet) ChunkSeriesSet { - return &seriesSetToChunkSet{SeriesSet: chk} +func NewSeriesSetToChunkSet(chk SeriesSet, storeST bool) ChunkSeriesSet { + return &seriesSetToChunkSet{SeriesSet: chk, storeST: storeST} } func (c *seriesSetToChunkSet) Next() bool { @@ -304,7 +305,7 @@ func (c *seriesSetToChunkSet) Next() bool { } func (c *seriesSetToChunkSet) At() ChunkSeries { - return NewSeriesToChunkEncoder(c.SeriesSet.At()) + return NewSeriesToChunkEncoder(c.SeriesSet.At(), c.storeST) } func (c *seriesSetToChunkSet) Err() error { @@ -313,13 +314,14 @@ func (c *seriesSetToChunkSet) Err() error { type seriesToChunkEncoder struct { Series + storeST bool } const seriesToChunkEncoderSplit = 120 // NewSeriesToChunkEncoder encodes samples to chunks with 120 samples limit. -func NewSeriesToChunkEncoder(series Series) ChunkSeries { - return &seriesToChunkEncoder{series} +func NewSeriesToChunkEncoder(series Series, storeST bool) ChunkSeries { + return &seriesToChunkEncoder{Series: series, storeST: storeST} } func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { @@ -341,14 +343,12 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { i := 0 seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone - lastHadST := false for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { st := seriesIter.AtST() - hasST := st != 0 - if typ != lastType || lastHadST != hasST || i >= seriesToChunkEncoderSplit { + if typ != lastType || i >= seriesToChunkEncoderSplit { // Create a new chunk if the sample type changed or too many samples in the current one. chks = appendChunk(chks, mint, maxt, chk) - chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(), hasST) + chk, err = typ.NewChunk(s.storeST) if err != nil { return errChunksIterator{err: err} } @@ -361,7 +361,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { i = 0 } lastType = typ - lastHadST = hasST var ( t int64 diff --git a/storage/series_test.go b/storage/series_test.go index b33d6cb1b3..06224b8aa4 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -601,7 +601,7 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { } } series := NewListSeries(lbs, copiedSamples) - encoder := NewSeriesToChunkEncoder(series) + encoder := NewSeriesToChunkEncoder(series, false) require.Equal(t, lbs, encoder.Labels()) chks, err := ExpandChunks(encoder.Iterator(nil)) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 4962f2825b..4d4cde8c3f 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -49,9 +49,11 @@ func (e Encoding) String() string { return "" } +const EncodingForFloatST = EncXOROptST + // IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { - return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncXOROptST + return e == EncXOR || e == EncHistogram || e == EncFloatHistogram || e == EncodingForFloatST } const ( @@ -191,40 +193,11 @@ func (v ValueType) String() string { } } -func (v ValueType) ChunkEncoding() Encoding { - switch v { - case ValFloat: - return EncXOR - case ValHistogram: - return EncHistogram - case ValFloatHistogram: - return EncFloatHistogram - default: - return EncNone - } -} - -func (v ValueType) ChunkEncodingWithST(st int64) Encoding { - switch v { - case ValFloat: - if st != 0 { - return EncXOROptST - } - return EncXOR - case ValHistogram: - return EncHistogram - case ValFloatHistogram: - return EncFloatHistogram - default: - return EncNone - } -} - -func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding { +func (v ValueType) ChunkEncoding(storeST bool) Encoding { switch v { case ValFloat: if storeST { - return EncXOROptST + return EncodingForFloatST } return EncXOR case ValHistogram: @@ -237,21 +210,7 @@ func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding { } func (v ValueType) NewChunk(storeST bool) (Chunk, error) { - switch v { - case ValFloat: - if storeST { - return NewXOROptSTChunk(), nil - } - return NewXORChunk(), nil - case ValHistogram: - // TODO(krajorama): return a ST capable histogram chunk when they are supported. - return NewHistogramChunk(), nil - case ValFloatHistogram: - // TODO(krajorama): return a ST capable float histogram chunk when they are supported. - return NewFloatHistogramChunk(), nil - default: - return nil, fmt.Errorf("value type %v unsupported", v) - } + return NewEmptyChunk(v.ChunkEncoding(storeST), storeST) } // MockSeriesIterator returns an iterator for a mock series with custom @@ -438,11 +397,12 @@ func FromData(e Encoding, d []byte) (Chunk, error) { } // NewEmptyChunk returns an empty chunk for the given encoding. +// TODO(krajorama): support storeST for histogram and float histogram chunks when they are implemented. func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) { switch e { case EncXOR: if storeST { - return NewXOROptSTChunk(), nil + return newEmptyChunkWithST(EncodingForFloatST), nil } return NewXORChunk(), nil case EncHistogram: @@ -454,3 +414,13 @@ func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) { } return nil, fmt.Errorf("invalid chunk encoding %q", e) } + +func newEmptyChunkWithST(e Encoding) Chunk { + switch e { + case EncXOROptST: + return NewXOROptSTChunk() + default: + // The caller code is literally right above this function. + panic(fmt.Sprintf("invalid chunk encoding %q", e)) + } +} diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 97739c1b51..9cc06a32bd 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -166,7 +166,7 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) { } // Request storing ST in the chunk if available. - c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(), hasST) + c, err := sampleType.NewChunk(hasST) if err != nil { return Meta{}, err } diff --git a/tsdb/compact.go b/tsdb/compact.go index 7091d34d50..a3da75a2d9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -178,6 +178,9 @@ type LeveledCompactorOptions struct { // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. EnableOverlappingCompaction bool + // EnableSTStorage determines whether compaction should re-encode chunks with start timestamps. + EnableSTStorage bool + // Metrics is set of metrics for Compactor. By default, NewCompactorMetrics would be called to initialize metrics unless it is provided. Metrics *CompactorMetrics // UseUncachedIO allows bypassing the page cache when appropriate. @@ -211,7 +214,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer } mergeFunc := opts.MergeFunc if mergeFunc == nil { - mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) + mergeFunc = storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, opts.EnableSTStorage) } maxBlockChunkSegmentSize := opts.MaxBlockChunkSegmentSize if maxBlockChunkSegmentSize == 0 { diff --git a/tsdb/db.go b/tsdb/db.go index 9d9396eae0..19b6ffb6e6 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -571,7 +571,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { return nil } -func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQueryable, error) { +func (db *DBReadOnly) loadDataAsQueryable(maxt int64, enableSTStorage bool) (storage.SampleAndChunkQueryable, error) { select { case <-db.closed: return nil, ErrClosed @@ -643,9 +643,12 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue } db.closers = append(db.closers, head) + dbOpts := DefaultOptions() + dbOpts.EnableSTStorage = enableSTStorage return &DB{ dir: db.dir, logger: db.logger, + opts: dbOpts, blocks: blocks, head: head, blockQuerierFunc: NewBlockQuerier, @@ -656,7 +659,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue // Querier loads the blocks and wal and returns a new querier over the data partition for the given time range. // Current implementation doesn't support multiple Queriers. func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) { - q, err := db.loadDataAsQueryable(maxt) + q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled) if err != nil { return nil, err } @@ -666,7 +669,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) { // ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. // Current implementation doesn't support multiple ChunkQueriers. func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { - q, err := db.loadDataAsQueryable(maxt) + q, err := db.loadDataAsQueryable(maxt, db.stStorageEnabled) if err != nil { return nil, err } @@ -965,6 +968,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn PD: opts.PostingsDecoderFactory, UseUncachedIO: opts.UseUncachedIO, BlockExcludeFilter: opts.BlockCompactionExcludeFunc, + EnableSTStorage: opts.EnableSTStorage, }) } if err != nil { @@ -2408,7 +2412,7 @@ func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { if err != nil { return nil, err } - return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil + return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMergerWithStoreST(storage.ChainedSeriesMerge, db.opts.EnableSTStorage)), nil } func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index dde3c746f2..b12427cfc8 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1839,7 +1839,7 @@ type chunkOpts struct { // isolation for this append.) // Series lock must be held when calling. func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.ValFloat.ChunkEncodingWithStoreST(storeST), o) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.ValFloat.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1870,7 +1870,7 @@ func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Hist // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.HistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValHistogram.ChunkEncodingWithStoreST(storeST), o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValHistogram.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1927,7 +1927,7 @@ func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogra // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValFloatHistogram.ChunkEncodingWithStoreST(storeST), o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValFloatHistogram.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated }