From 6c23048450a7569be319613210841c43ef861aae Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 16 Jan 2025 17:16:14 +0100 Subject: [PATCH 1/6] Revert "Merge pull request #15455 from bboreham/compact-cache-symbols" This reverts commit 4dacd7572a831f0ad8b83e785cb4da82be143a97, reversing changes made to 5e124cf4f2b9467e4ae1c679840005e727efd599. Signed-off-by: Arve Knudsen --- tsdb/index/index.go | 60 +++++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 42ecd7245d..c07edfaf3f 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,6 +110,12 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } +type symbolCacheEntry struct { + index uint32 + lastValueIndex uint32 + lastValue string +} + type PostingsEncoder func(*encoding.Encbuf, []uint32) error type PostingsDecoder func(encoding.Decbuf) (int, Postings, error) @@ -140,7 +146,7 @@ type Writer struct { symbols *Symbols symbolFile *fileutil.MmapFile lastSymbol string - symbolCache map[string]uint32 // From symbol to index in table. + symbolCache map[string]symbolCacheEntry labelIndexes []labelIndexHashEntry // Label index offsets. labelNames map[string]uint64 // Label names, and their usage. @@ -240,7 +246,7 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - symbolCache: make(map[string]uint32, 1<<16), + symbolCache: make(map[string]symbolCacheEntry, 1<<8), labelNames: make(map[string]uint64, 1<<8), crc32: newCRC32(), postingsEncoder: encoder, @@ -472,16 +478,29 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... w.buf2.PutUvarint(lset.Len()) if err := lset.Validate(func(l labels.Label) error { - nameIndex, ok := w.symbolCache[l.Name] + var err error + cacheEntry, ok := w.symbolCache[l.Name] + nameIndex := cacheEntry.index if !ok { - return fmt.Errorf("symbol entry for %q does not exist", l.Name) + nameIndex, err = w.symbols.ReverseLookup(l.Name) + if err != nil { + return fmt.Errorf("symbol entry for %q does not exist, %w", l.Name, err) + } } w.labelNames[l.Name]++ w.buf2.PutUvarint32(nameIndex) - valueIndex, ok := w.symbolCache[l.Value] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", l.Value) + valueIndex := cacheEntry.lastValueIndex + if !ok || cacheEntry.lastValue != l.Value { + valueIndex, err = w.symbols.ReverseLookup(l.Value) + if err != nil { + return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err) + } + w.symbolCache[l.Name] = symbolCacheEntry{ + index: nameIndex, + lastValueIndex: valueIndex, + lastValue: l.Value, + } } w.buf2.PutUvarint32(valueIndex) return nil @@ -540,7 +559,6 @@ func (w *Writer) AddSymbol(sym string) error { return fmt.Errorf("symbol %q out-of-order", sym) } w.lastSymbol = sym - w.symbolCache[sym] = uint32(w.numSymbols) w.numSymbols++ w.buf1.Reset() w.buf1.PutUvarintStr(sym) @@ -610,10 +628,10 @@ func (w *Writer) writeLabelIndices() error { values := []uint32{} for d.Err() == nil && cnt > 0 { cnt-- - d.Uvarint() // Keycount. - name := d.UvarintBytes() // Label name. - value := d.UvarintBytes() // Label value. - d.Uvarint64() // Offset. + d.Uvarint() // Keycount. + name := d.UvarintBytes() // Label name. + value := yoloString(d.UvarintBytes()) // Label value. + d.Uvarint64() // Offset. if len(name) == 0 { continue // All index is ignored. } @@ -626,9 +644,9 @@ func (w *Writer) writeLabelIndices() error { values = values[:0] } current = name - sid, ok := w.symbolCache[string(value)] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", string(value)) + sid, err := w.symbols.ReverseLookup(value) + if err != nil { + return err } values = append(values, sid) } @@ -900,9 +918,9 @@ func (w *Writer) writePostingsToTmpFiles() error { nameSymbols := map[uint32]string{} for _, name := range batchNames { - sid, ok := w.symbolCache[name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", name) + sid, err := w.symbols.ReverseLookup(name) + if err != nil { + return err } nameSymbols[sid] = name } @@ -939,9 +957,9 @@ func (w *Writer) writePostingsToTmpFiles() error { for _, name := range batchNames { // Write out postings for this label name. - sid, ok := w.symbolCache[name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", name) + sid, err := w.symbols.ReverseLookup(name) + if err != nil { + return err } values := make([]uint32, 0, len(postings[sid])) for v := range postings[sid] { From b5b4968f24234571e0392b5536515dc7689c5060 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 6 Mar 2025 14:32:16 +0100 Subject: [PATCH 2/6] Put caching of all symbols for compaction behind feature flag Signed-off-by: Arve Knudsen --- CHANGELOG.md | 1 + cmd/prometheus/main.go | 7 +- cmd/promtool/backfill.go | 4 +- cmd/promtool/rules.go | 2 +- cmd/promtool/tsdb.go | 6 +- docs/command-line/prometheus.md | 2 +- docs/feature_flags.md | 7 ++ tsdb/block_test.go | 14 ++-- tsdb/blockwriter.go | 24 ++++--- tsdb/blockwriter_test.go | 2 +- tsdb/compact.go | 16 ++--- tsdb/compact_test.go | 48 +++++++++---- tsdb/db.go | 35 +++++---- tsdb/db_test.go | 10 +-- tsdb/index/index.go | 122 ++++++++++++++++++++++---------- tsdb/index/index_test.go | 10 +-- tsdb/tsdbblockutil.go | 4 +- 17 files changed, 209 insertions(+), 105 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 468854d9e3..1a9dec21d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## unreleased +* [CHANGE] TSDB: Put caching of all symbols for compaction behind feature flag `--enable-feature=cache-all-symbols-for-compaction`. #15836 * [BUGFIX] TSDB: fix unknown series errors and possible lost data during WAL replay when series are removed from the head due to inactivity and reappear before the next WAL checkpoint. #16060 ## 3.2.1 / 2025-02-25 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 347bae470c..3caae96b79 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -269,6 +269,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "delayed-compaction": c.tsdb.EnableDelayedCompaction = true logger.Info("Experimental delayed compaction is enabled.") + case "cache-all-symbols-for-compaction": + c.tsdb.CacheAllSymbols = true + logger.Info("Caching of all TSDB symbols for compaction enabled") case "promql-delayed-name-removal": c.promqlEnableDelayedNameRemoval = true logger.Info("Experimental PromQL delayed name removal enabled.") @@ -537,7 +540,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, cache-all-symbols-for-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode) @@ -1817,6 +1820,7 @@ type tsdbOptions struct { EnableDelayedCompaction bool CompactionDelayMaxPercent int EnableOverlappingCompaction bool + CacheAllSymbols bool EnableOOONativeHistograms bool } @@ -1842,6 +1846,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableDelayedCompaction: opts.EnableDelayedCompaction, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + CacheAllSymbols: opts.CacheAllSymbols, } } diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index 0964f0d546..6a2ae2b5b1 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -90,7 +90,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn blockDuration := getCompatibleBlockDuration(maxBlockDuration) mint = blockDuration * (mint / blockDuration) - db, err := tsdb.OpenDBReadOnly(outputDir, "", nil) + db, err := tsdb.OpenDBReadOnly(outputDir, "", false, nil) if err != nil { return err } @@ -121,7 +121,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration, false) if err != nil { return fmt.Errorf("block writer: %w", err) } diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index b2eb18ca8e..a2b2d8ad36 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -133,7 +133,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration, false) if err != nil { return fmt.Errorf("new block writer: %w", err) } diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 6a62e2e8bc..8402ed34aa 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -337,7 +337,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { } func listBlocks(path string, humanReadable bool) error { - db, err := tsdb.OpenDBReadOnly(path, "", nil) + db, err := tsdb.OpenDBReadOnly(path, "", false, nil) if err != nil { return err } @@ -392,7 +392,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string { } func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) { - db, err := tsdb.OpenDBReadOnly(path, "", nil) + db, err := tsdb.OpenDBReadOnly(path, "", false, nil) if err != nil { return nil, nil, err } @@ -711,7 +711,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. type SeriesSetFormatter func(series storage.SeriesSet) error func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) { - db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil) + db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, false, nil) if err != nil { return err } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 0f58ff4b18..256abf1632 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -60,7 +60,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature ... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature ... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, cache-all-symbols-for-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --agent | Run Prometheus in 'Agent mode'. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 6973d6d73b..ad67950262 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -184,3 +184,10 @@ Enabling this _can_ have negative impact on performance, because the in-memory state is mutex guarded. Cumulative-only OTLP requests are not affected. [d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor + +## Cache all symbols for compaction + +`--enable-feature=cache-all-symbols-for-compaction` + +When enabled, as a performance measure, all TSDB symbols are cached for compaction. +Be aware that this may increase memory usage significantly. diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 776beb4396..d2ab8b3ef7 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -372,7 +372,9 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") - c, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) blockDirsAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) @@ -621,13 +623,15 @@ func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { - blockDir, err := CreateBlock(series, dir, 0, promslog.NewNopLogger()) + blockDir, err := CreateBlock(series, dir, 0, false, promslog.NewNopLogger()) require.NoError(tb, err) return blockDir } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, nil) + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) @@ -641,7 +645,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, nil) + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 5eb8a649a9..bcd8ef8848 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -33,9 +33,10 @@ type BlockWriter struct { logger *slog.Logger destinationDir string - head *Head - blockSize int64 // in ms - chunkDir string + head *Head + blockSize int64 // in ms + chunkDir string + cacheAllSymbols bool } // ErrNoSeriesAppended is returned if the series count is zero while flushing blocks. @@ -49,11 +50,12 @@ var ErrNoSeriesAppended = errors.New("no series appended, aborting") // contains anything at all. It is the caller's responsibility to // ensure that the resulting blocks do not overlap etc. // Writer ensures the block flush is atomic (via rename). -func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64) (*BlockWriter, error) { +func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64, cacheAllSymbols bool) (*BlockWriter, error) { w := &BlockWriter{ - logger: logger, - destinationDir: dir, - blockSize: blockSize, + logger: logger, + destinationDir: dir, + blockSize: blockSize, + cacheAllSymbols: cacheAllSymbols, } if err := w.initHead(); err != nil { return nil, err @@ -96,11 +98,15 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { maxt := w.head.MaxTime() + 1 w.logger.Info("flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) - compactor, err := NewLeveledCompactor(ctx, + compactor, err := NewLeveledCompactorWithOptions(ctx, nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool(), nil) + chunkenc.NewPool(), + LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + CacheAllSymbols: w.cacheAllSymbols, + }) if err != nil { return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err) } diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index 2704b53566..d091d0c0c6 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -30,7 +30,7 @@ import ( func TestBlockWriter(t *testing.T) { ctx := context.Background() outputDir := t.TempDir() - w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration) + w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration, false) require.NoError(t, err) // Add some series. diff --git a/tsdb/compact.go b/tsdb/compact.go index 0e2c0485d9..44cb24a29b 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -89,6 +89,7 @@ type LeveledCompactor struct { postingsEncoder index.PostingsEncoder postingsDecoderFactory PostingsDecoderFactory enableOverlappingCompaction bool + cacheAllSymbols bool } type CompactorMetrics struct { @@ -169,6 +170,8 @@ type LeveledCompactorOptions struct { // EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled. // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. EnableOverlappingCompaction bool + // CacheAllSymbols enables caching of all TSDB symbols for compaction. + CacheAllSymbols bool } type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder @@ -177,18 +180,12 @@ func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder { return index.DecodePostingsRaw } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, cacheAllSymbols bool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, MergeFunc: mergeFunc, EnableOverlappingCompaction: true, - }) -} - -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { - return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ - MergeFunc: mergeFunc, - EnableOverlappingCompaction: true, + CacheAllSymbols: cacheAllSymbols, }) } @@ -225,6 +222,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer postingsEncoder: pe, postingsDecoderFactory: opts.PD, enableOverlappingCompaction: opts.EnableOverlappingCompaction, + cacheAllSymbols: opts.CacheAllSymbols, }, nil } @@ -661,7 +659,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } } - indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) + indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder, c.cacheAllSymbols) if err != nil { return fmt.Errorf("open index writer: %w", err) } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 4b10a42ef7..3355f2e7e3 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -167,7 +167,9 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil) + c, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{50}, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) c.plan(metas) @@ -175,13 +177,15 @@ func TestNoPanicFor0Tombstones(t *testing.T) { func TestLeveledCompactor_plan(t *testing.T) { // This mimics our default ExponentialBlockRanges with min block size equals to 20. - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{ 20, 60, 180, 540, 1620, - }, nil, nil) + }, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) cases := map[string]struct { @@ -384,13 +388,15 @@ func TestLeveledCompactor_plan(t *testing.T) { } func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{ 20, 60, 240, 720, 2160, - }, nil, nil) + }, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) cases := []struct { @@ -434,13 +440,15 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } func TestCompactionFailWillCleanUpTempDir(t *testing.T) { - compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{ + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{ 20, 60, 240, 720, 2160, - }, nil, nil) + }, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) tmpdir := t.TempDir() @@ -1026,7 +1034,9 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil) + c, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{0}, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) meta := &BlockMeta{ @@ -1162,7 +1172,9 @@ func BenchmarkCompaction(b *testing.B) { blockDirs = append(blockDirs, block.Dir()) } - c, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(b, err) b.ResetTimer() @@ -1358,7 +1370,7 @@ func TestCancelCompactions(t *testing.T) { // This checks that the `context.Canceled` error is properly checked at all levels: // - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks. // - callers should check with errors.Is() instead of ==. - readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger()) + readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", false, promslog.NewNopLogger()) require.NoError(t, err) blocks, err := readOnlyDB.Blocks() require.NoError(t, err) @@ -1542,7 +1554,9 @@ func TestHeadCompactionWithHistograms(t *testing.T) { // Compaction. mint := head.MinTime() maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) ids, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) require.NoError(t, err) @@ -1685,7 +1699,9 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Sparse head compaction. mint := sparseHead.MinTime() maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) sparseULIDs, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) @@ -1736,7 +1752,9 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Old head compaction. mint := oldHead.MinTime() maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) oldULIDs, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) require.NoError(t, err) @@ -1918,7 +1936,9 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) { err = block.Delete(ctx, 0, 10, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "0")) require.NoError(t, err) - c, err := NewLeveledCompactor(ctx, nil, promslog.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactorWithOptions(ctx, nil, promslog.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + }) require.NoError(t, err) ulids, err := c.Compact(tmpdir, []string{blockDir}, []*Block{block}) diff --git a/tsdb/db.go b/tsdb/db.go index ff24c8800b..091e01687b 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -90,6 +90,7 @@ func DefaultOptions() *Options { EnableOverlappingCompaction: true, EnableSharding: false, EnableDelayedCompaction: false, + CacheAllSymbols: false, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelay: time.Duration(0), PostingsDecoderFactory: DefaultPostingsDecoderFactory, @@ -213,6 +214,9 @@ type Options struct { // CompactionDelayMaxPercent is the upper limit for CompactionDelay, specified as a percentage of the head chunk range. CompactionDelayMaxPercent int + // CacheAllSymbols enables caching of all TSDB symbols for compaction. + CacheAllSymbols bool + // NewCompactorFunc is a function that returns a TSDB compactor. NewCompactorFunc NewCompactorFunc @@ -432,15 +436,16 @@ var ErrClosed = errors.New("db already closed") // Current implementation doesn't support concurrency so // all API calls should happen in the same go routine. type DBReadOnly struct { - logger *slog.Logger - dir string - sandboxDir string - closers []io.Closer - closed chan struct{} + logger *slog.Logger + dir string + sandboxDir string + cacheAllSymbols bool + closers []io.Closer + closed chan struct{} } // OpenDBReadOnly opens DB in the given directory for read only operations. -func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, error) { +func OpenDBReadOnly(dir, sandboxDirRoot string, cacheAllSymbols bool, l *slog.Logger) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { return nil, fmt.Errorf("opening the db dir: %w", err) } @@ -458,10 +463,11 @@ func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, er } return &DBReadOnly{ - logger: l, - dir: dir, - sandboxDir: sandboxDir, - closed: make(chan struct{}), + logger: l, + dir: dir, + cacheAllSymbols: cacheAllSymbols, + sandboxDir: sandboxDir, + closed: make(chan struct{}), }, nil } @@ -511,12 +517,16 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { mint := head.MinTime() maxt := head.MaxTime() rh := NewRangeHead(head, mint, maxt) - compactor, err := NewLeveledCompactor( + compactor, err := NewLeveledCompactorWithOptions( context.Background(), nil, db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), - chunkenc.NewPool(), nil, + chunkenc.NewPool(), + LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + CacheAllSymbols: db.cacheAllSymbols, + }, ) if err != nil { return fmt.Errorf("create leveled compactor: %w", err) @@ -908,6 +918,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + CacheAllSymbols: opts.CacheAllSymbols, PD: opts.PostingsDecoderFactory, }) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4e96680c02..c3d3527ab8 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2443,7 +2443,7 @@ func TestDBReadOnly(t *testing.T) { } // Open a read only db and ensure that the API returns the same result as the normal DB. - dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger) + dbReadOnly, err := OpenDBReadOnly(dbDir, "", false, logger) require.NoError(t, err) defer func() { require.NoError(t, dbReadOnly.Close()) }() @@ -2498,7 +2498,7 @@ func TestDBReadOnly(t *testing.T) { // all api methods return an ErrClosed. func TestDBReadOnlyClosing(t *testing.T) { sandboxDir := t.TempDir() - db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, promslog.New(&promslog.Config{})) + db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, false, promslog.New(&promslog.Config{})) require.NoError(t, err) // The sandboxDir was there. require.DirExists(t, db.sandboxDir) @@ -2540,7 +2540,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { } // Flush WAL. - db, err := OpenDBReadOnly(dbDir, "", logger) + db, err := OpenDBReadOnly(dbDir, "", false, logger) require.NoError(t, err) flush := t.TempDir() @@ -2548,7 +2548,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { require.NoError(t, db.Close()) // Reopen the DB from the flushed WAL block. - db, err = OpenDBReadOnly(flush, "", logger) + db, err = OpenDBReadOnly(flush, "", false, logger) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() blocks, err := db.Blocks() @@ -2596,7 +2596,7 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) { spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) { dBDirHash := dirHash(dir) // Bootstrap a RO db from the same dir and set up a querier. - dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil) + dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, false, nil) require.NoError(t, err) require.Equal(t, chunksCount, countChunks(dir)) q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index c07edfaf3f..168e0e13dc 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -147,6 +147,7 @@ type Writer struct { symbolFile *fileutil.MmapFile lastSymbol string symbolCache map[string]symbolCacheEntry + indexCache map[string]uint32 // From symbol to index in table. labelIndexes []labelIndexHashEntry // Label index offsets. labelNames map[string]uint64 // Label names, and their usage. @@ -203,7 +204,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { // NewWriterWithEncoder returns a new Writer to the given filename. It // serializes data in format version 2. It uses the given encoder to encode each // postings list. -func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) { +func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder, cacheAllSymbols bool) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -246,11 +247,15 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - symbolCache: make(map[string]symbolCacheEntry, 1<<8), labelNames: make(map[string]uint64, 1<<8), crc32: newCRC32(), postingsEncoder: encoder, } + if cacheAllSymbols { + iw.indexCache = make(map[string]uint32, 1<<16) + } else { + iw.symbolCache = make(map[string]symbolCacheEntry, 1<<8) + } if err := iw.writeMeta(); err != nil { return nil, err } @@ -259,8 +264,8 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode // NewWriter creates a new index writer using the default encoder. See // NewWriterWithEncoder. -func NewWriter(ctx context.Context, fn string) (*Writer, error) { - return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw) +func NewWriter(ctx context.Context, fn string, cacheAllSymbols bool) (*Writer, error) { + return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw, cacheAllSymbols) } func (w *Writer) write(bufs ...[]byte) error { @@ -478,28 +483,43 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... w.buf2.PutUvarint(lset.Len()) if err := lset.Validate(func(l labels.Label) error { - var err error - cacheEntry, ok := w.symbolCache[l.Name] - nameIndex := cacheEntry.index - if !ok { - nameIndex, err = w.symbols.ReverseLookup(l.Name) - if err != nil { - return fmt.Errorf("symbol entry for %q does not exist, %w", l.Name, err) + var valueIndex uint32 + if w.indexCache != nil { + nameIndex, ok := w.indexCache[l.Name] + if !ok { + return fmt.Errorf("symbol entry for %q does not exist", l.Name) } - } - w.labelNames[l.Name]++ - w.buf2.PutUvarint32(nameIndex) + w.labelNames[l.Name]++ + w.buf2.PutUvarint32(nameIndex) - valueIndex := cacheEntry.lastValueIndex - if !ok || cacheEntry.lastValue != l.Value { - valueIndex, err = w.symbols.ReverseLookup(l.Value) - if err != nil { - return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err) + valueIndex, ok = w.indexCache[l.Value] + if !ok { + return fmt.Errorf("symbol entry for %q does not exist", l.Value) } - w.symbolCache[l.Name] = symbolCacheEntry{ - index: nameIndex, - lastValueIndex: valueIndex, - lastValue: l.Value, + } else { + var err error + cacheEntry, ok := w.symbolCache[l.Name] + nameIndex := cacheEntry.index + if !ok { + nameIndex, err = w.symbols.ReverseLookup(l.Name) + if err != nil { + return fmt.Errorf("symbol entry for %q does not exist, %w", l.Name, err) + } + } + w.labelNames[l.Name]++ + w.buf2.PutUvarint32(nameIndex) + + valueIndex = cacheEntry.lastValueIndex + if !ok || cacheEntry.lastValue != l.Value { + valueIndex, err = w.symbols.ReverseLookup(l.Value) + if err != nil { + return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err) + } + w.symbolCache[l.Name] = symbolCacheEntry{ + index: nameIndex, + lastValueIndex: valueIndex, + lastValue: l.Value, + } } } w.buf2.PutUvarint32(valueIndex) @@ -559,6 +579,9 @@ func (w *Writer) AddSymbol(sym string) error { return fmt.Errorf("symbol %q out-of-order", sym) } w.lastSymbol = sym + if w.indexCache != nil { + w.indexCache[sym] = uint32(w.numSymbols) + } w.numSymbols++ w.buf1.Reset() w.buf1.PutUvarintStr(sym) @@ -628,10 +651,10 @@ func (w *Writer) writeLabelIndices() error { values := []uint32{} for d.Err() == nil && cnt > 0 { cnt-- - d.Uvarint() // Keycount. - name := d.UvarintBytes() // Label name. - value := yoloString(d.UvarintBytes()) // Label value. - d.Uvarint64() // Offset. + d.Uvarint() // Keycount. + name := d.UvarintBytes() // Label name. + value := d.UvarintBytes() // Label value. + d.Uvarint64() // Offset. if len(name) == 0 { continue // All index is ignored. } @@ -644,9 +667,18 @@ func (w *Writer) writeLabelIndices() error { values = values[:0] } current = name - sid, err := w.symbols.ReverseLookup(value) - if err != nil { - return err + var sid uint32 + if w.indexCache != nil { + var ok bool + sid, ok = w.indexCache[string(value)] + if !ok { + return fmt.Errorf("symbol entry for %q does not exist", string(value)) + } + } else { + sid, err = w.symbols.ReverseLookup(string(value)) + if err != nil { + return err + } } values = append(values, sid) } @@ -918,9 +950,18 @@ func (w *Writer) writePostingsToTmpFiles() error { nameSymbols := map[uint32]string{} for _, name := range batchNames { - sid, err := w.symbols.ReverseLookup(name) - if err != nil { - return err + var sid uint32 + if w.indexCache != nil { + var ok bool + sid, ok = w.indexCache[name] + if !ok { + return fmt.Errorf("symbol entry for %q does not exist", name) + } + } else { + sid, err = w.symbols.ReverseLookup(name) + if err != nil { + return err + } } nameSymbols[sid] = name } @@ -957,9 +998,18 @@ func (w *Writer) writePostingsToTmpFiles() error { for _, name := range batchNames { // Write out postings for this label name. - sid, err := w.symbols.ReverseLookup(name) - if err != nil { - return err + var sid uint32 + if w.indexCache != nil { + var ok bool + sid, ok = w.indexCache[name] + if !ok { + return fmt.Errorf("symbol entry for %q does not exist", name) + } + } else { + sid, err = w.symbols.ReverseLookup(name) + if err != nil { + return err + } } values := make([]uint32, 0, len(postings[sid])) for v := range postings[sid] { diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index ee186c1d95..dcceda929d 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -142,7 +142,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(context.Background(), fn) + iw, err := NewWriter(context.Background(), fn, false) require.NoError(t, err) require.NoError(t, iw.Close()) @@ -445,7 +445,7 @@ func TestPersistence_index_e2e(t *testing.T) { } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { - w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index")) + w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), false) require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__")) @@ -568,7 +568,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { func TestChunksRefOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -588,7 +588,7 @@ func TestChunksRefOrdering(t *testing.T) { func TestChunksTimeOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -713,7 +713,7 @@ func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSerie fn := filepath.Join(tb.TempDir(), indexFilename) - iw, err := NewWriter(ctx, fn) + iw, err := NewWriter(ctx, fn, false) require.NoError(tb, err) symbols := map[string]struct{}{} diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index af2348019a..971b180a65 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -27,7 +27,7 @@ import ( var ErrInvalidTimes = errors.New("max time is lesser than min time") // CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk. -func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger *slog.Logger) (string, error) { +func CreateBlock(series []storage.Series, dir string, chunkRange int64, cacheAllSymbols bool, logger *slog.Logger) (string, error) { if chunkRange == 0 { chunkRange = DefaultBlockDuration } @@ -35,7 +35,7 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger * return "", ErrInvalidTimes } - w, err := NewBlockWriter(logger, dir, chunkRange) + w, err := NewBlockWriter(logger, dir, chunkRange, cacheAllSymbols) if err != nil { return "", err } From 59e90e5f6904f4f7df907c8d385489a8aa39e4c5 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 10 Mar 2025 17:50:41 +0100 Subject: [PATCH 3/6] Put caching of all symbols for compaction behind hidden flag Signed-off-by: Arve Knudsen --- CHANGELOG.md | 2 +- cmd/prometheus/main.go | 8 +++--- cmd/promtool/backfill.go | 4 +-- cmd/promtool/rules.go | 2 +- cmd/promtool/tsdb.go | 6 ++--- docs/command-line/prometheus.md | 2 +- docs/feature_flags.md | 7 ----- tsdb/block_test.go | 14 +++------- tsdb/blockwriter_test.go | 2 +- tsdb/compact.go | 16 +++++++++-- tsdb/compact_test.go | 48 ++++++++++----------------------- tsdb/db.go | 2 +- tsdb/db_test.go | 10 +++---- tsdb/index/index_test.go | 10 +++---- 14 files changed, 56 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a9dec21d1..8c52e056d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,8 @@ ## unreleased -* [CHANGE] TSDB: Put caching of all symbols for compaction behind feature flag `--enable-feature=cache-all-symbols-for-compaction`. #15836 * [BUGFIX] TSDB: fix unknown series errors and possible lost data during WAL replay when series are removed from the head due to inactivity and reappear before the next WAL checkpoint. #16060 +* [ENHANCEMENT] TSDB: Add hidden flag for disabling caching of all symbols for compaction: `--storage.tsdb.cache-all-symbols-for-compaction`. #15836 ## 3.2.1 / 2025-02-25 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 3caae96b79..7ceaf4ddaf 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -269,9 +269,6 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "delayed-compaction": c.tsdb.EnableDelayedCompaction = true logger.Info("Experimental delayed compaction is enabled.") - case "cache-all-symbols-for-compaction": - c.tsdb.CacheAllSymbols = true - logger.Info("Caching of all TSDB symbols for compaction enabled") case "promql-delayed-name-removal": c.promqlEnableDelayedNameRemoval = true logger.Info("Experimental PromQL delayed name removal enabled.") @@ -457,6 +454,9 @@ func main() { serverOnlyFlag(a, "storage.tsdb.delayed-compaction.max-percent", "Sets the upper limit for the random compaction delay, specified as a percentage of the head chunk range. 100 means the compaction can be delayed by up to the entire head chunk range. Only effective when the delayed-compaction feature flag is enabled."). Default("10").Hidden().IntVar(&cfg.tsdb.CompactionDelayMaxPercent) + serverOnlyFlag(a, "storage.tsdb.cache-all-symbols-for-compaction", "Enable caching of all symbols for compaction."). + Default("true").Hidden().BoolVar(&cfg.tsdb.CacheAllSymbols) + agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage."). Default("data-agent/").StringVar(&cfg.agentStoragePath) @@ -540,7 +540,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, cache-all-symbols-for-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode) diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index 6a2ae2b5b1..c39490543a 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -90,7 +90,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn blockDuration := getCompatibleBlockDuration(maxBlockDuration) mint = blockDuration * (mint / blockDuration) - db, err := tsdb.OpenDBReadOnly(outputDir, "", false, nil) + db, err := tsdb.OpenDBReadOnly(outputDir, "", true, nil) if err != nil { return err } @@ -121,7 +121,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration, false) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration, true) if err != nil { return fmt.Errorf("block writer: %w", err) } diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index a2b2d8ad36..6cc50f10ec 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -133,7 +133,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration, false) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration, true) if err != nil { return fmt.Errorf("new block writer: %w", err) } diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 8402ed34aa..b7629bf40f 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -337,7 +337,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { } func listBlocks(path string, humanReadable bool) error { - db, err := tsdb.OpenDBReadOnly(path, "", false, nil) + db, err := tsdb.OpenDBReadOnly(path, "", true, nil) if err != nil { return err } @@ -392,7 +392,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string { } func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) { - db, err := tsdb.OpenDBReadOnly(path, "", false, nil) + db, err := tsdb.OpenDBReadOnly(path, "", true, nil) if err != nil { return nil, nil, err } @@ -711,7 +711,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. type SeriesSetFormatter func(series storage.SeriesSet) error func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) { - db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, false, nil) + db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, true, nil) if err != nil { return err } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 256abf1632..0f58ff4b18 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -60,7 +60,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature ... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, cache-all-symbols-for-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature ... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --agent | Run Prometheus in 'Agent mode'. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index ad67950262..6973d6d73b 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -184,10 +184,3 @@ Enabling this _can_ have negative impact on performance, because the in-memory state is mutex guarded. Cumulative-only OTLP requests are not affected. [d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor - -## Cache all symbols for compaction - -`--enable-feature=cache-all-symbols-for-compaction` - -When enabled, as a performance measure, all TSDB symbols are cached for compaction. -Be aware that this may increase memory usage significantly. diff --git a/tsdb/block_test.go b/tsdb/block_test.go index d2ab8b3ef7..d70d28e887 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -372,9 +372,7 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") - c, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + c, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, nil) require.NoError(t, err) blockDirsAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) @@ -623,15 +621,13 @@ func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { - blockDir, err := CreateBlock(series, dir, 0, false, promslog.NewNopLogger()) + blockDir, err := CreateBlock(series, dir, 0, true, promslog.NewNopLogger()) require.NoError(tb, err) return blockDir } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, nil) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) @@ -645,9 +641,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{1000000}, nil, nil) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index d091d0c0c6..8672f553df 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -30,7 +30,7 @@ import ( func TestBlockWriter(t *testing.T) { ctx := context.Background() outputDir := t.TempDir() - w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration, false) + w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration, true) require.NoError(t, err) // Add some series. diff --git a/tsdb/compact.go b/tsdb/compact.go index 44cb24a29b..6f5c5c71bc 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -180,12 +180,24 @@ func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder { return index.DecodePostingsRaw } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, cacheAllSymbols bool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { +// NewLeveledCompactorWithChunkSize returns a new LeveledCompactor with a certain max block segment chunk size. +// It's the same as calling NewLeveledCompactorWithOptions with maxBlockChunkSegmentSize, mergeFunc, enabled overlapping compaction, and caching of all symbols during compaction. +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, MergeFunc: mergeFunc, EnableOverlappingCompaction: true, - CacheAllSymbols: cacheAllSymbols, + CacheAllSymbols: true, + }) +} + +// NewLeveledCompactor returns a new LeveledCompactor. +// It's the same as calling NewLeveledCompactorWithOptions with mergeFunc, enabled overlapping compaction, and caching of all symbols during compaction. +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ + MergeFunc: mergeFunc, + EnableOverlappingCompaction: true, + CacheAllSymbols: true, }) } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 3355f2e7e3..4fc3d8743f 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -167,9 +167,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{50}, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil) require.NoError(t, err) c.plan(metas) @@ -177,15 +175,13 @@ func TestNoPanicFor0Tombstones(t *testing.T) { func TestLeveledCompactor_plan(t *testing.T) { // This mimics our default ExponentialBlockRanges with min block size equals to 20. - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 180, 540, 1620, - }, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + }, nil, nil) require.NoError(t, err) cases := map[string]struct { @@ -388,15 +384,13 @@ func TestLeveledCompactor_plan(t *testing.T) { } func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 20, 60, 240, 720, 2160, - }, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + }, nil, nil) require.NoError(t, err) cases := []struct { @@ -440,15 +434,13 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } func TestCompactionFailWillCleanUpTempDir(t *testing.T) { - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{ + compactor, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{ 20, 60, 240, 720, 2160, - }, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + }, nil, nil) require.NoError(t, err) tmpdir := t.TempDir() @@ -1034,9 +1026,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{0}, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil) require.NoError(t, err) meta := &BlockMeta{ @@ -1172,9 +1162,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs = append(blockDirs, block.Dir()) } - c, err := NewLeveledCompactorWithOptions(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + c, err := NewLeveledCompactor(context.Background(), nil, promslog.NewNopLogger(), []int64{0}, nil, nil) require.NoError(b, err) b.ResetTimer() @@ -1370,7 +1358,7 @@ func TestCancelCompactions(t *testing.T) { // This checks that the `context.Canceled` error is properly checked at all levels: // - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks. // - callers should check with errors.Is() instead of ==. - readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", false, promslog.NewNopLogger()) + readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", true, promslog.NewNopLogger()) require.NoError(t, err) blocks, err := readOnlyDB.Blocks() require.NoError(t, err) @@ -1554,9 +1542,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) { // Compaction. mint := head.MinTime() maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) ids, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) require.NoError(t, err) @@ -1699,9 +1685,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Sparse head compaction. mint := sparseHead.MinTime() maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) sparseULIDs, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) @@ -1752,9 +1736,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Old head compaction. mint := oldHead.MinTime() maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) oldULIDs, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) require.NoError(t, err) @@ -1936,9 +1918,7 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) { err = block.Delete(ctx, 0, 10, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "0")) require.NoError(t, err) - c, err := NewLeveledCompactorWithOptions(ctx, nil, promslog.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{ - EnableOverlappingCompaction: true, - }) + c, err := NewLeveledCompactor(ctx, nil, promslog.NewNopLogger(), []int64{0}, nil, nil) require.NoError(t, err) ulids, err := c.Compact(tmpdir, []string{blockDir}, []*Block{block}) diff --git a/tsdb/db.go b/tsdb/db.go index 091e01687b..2aee1cb21b 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -90,7 +90,7 @@ func DefaultOptions() *Options { EnableOverlappingCompaction: true, EnableSharding: false, EnableDelayedCompaction: false, - CacheAllSymbols: false, + CacheAllSymbols: true, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelay: time.Duration(0), PostingsDecoderFactory: DefaultPostingsDecoderFactory, diff --git a/tsdb/db_test.go b/tsdb/db_test.go index c3d3527ab8..b65429716e 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2443,7 +2443,7 @@ func TestDBReadOnly(t *testing.T) { } // Open a read only db and ensure that the API returns the same result as the normal DB. - dbReadOnly, err := OpenDBReadOnly(dbDir, "", false, logger) + dbReadOnly, err := OpenDBReadOnly(dbDir, "", true, logger) require.NoError(t, err) defer func() { require.NoError(t, dbReadOnly.Close()) }() @@ -2498,7 +2498,7 @@ func TestDBReadOnly(t *testing.T) { // all api methods return an ErrClosed. func TestDBReadOnlyClosing(t *testing.T) { sandboxDir := t.TempDir() - db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, false, promslog.New(&promslog.Config{})) + db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, true, promslog.New(&promslog.Config{})) require.NoError(t, err) // The sandboxDir was there. require.DirExists(t, db.sandboxDir) @@ -2540,7 +2540,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { } // Flush WAL. - db, err := OpenDBReadOnly(dbDir, "", false, logger) + db, err := OpenDBReadOnly(dbDir, "", true, logger) require.NoError(t, err) flush := t.TempDir() @@ -2548,7 +2548,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { require.NoError(t, db.Close()) // Reopen the DB from the flushed WAL block. - db, err = OpenDBReadOnly(flush, "", false, logger) + db, err = OpenDBReadOnly(flush, "", true, logger) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() blocks, err := db.Blocks() @@ -2596,7 +2596,7 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) { spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) { dBDirHash := dirHash(dir) // Bootstrap a RO db from the same dir and set up a querier. - dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, false, nil) + dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, true, nil) require.NoError(t, err) require.Equal(t, chunksCount, countChunks(dir)) q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt) diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index dcceda929d..4b8c8fffb7 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -142,7 +142,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(context.Background(), fn, false) + iw, err := NewWriter(context.Background(), fn, true) require.NoError(t, err) require.NoError(t, iw.Close()) @@ -445,7 +445,7 @@ func TestPersistence_index_e2e(t *testing.T) { } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { - w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), false) + w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), true) require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__")) @@ -568,7 +568,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { func TestChunksRefOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), true) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -588,7 +588,7 @@ func TestChunksRefOrdering(t *testing.T) { func TestChunksTimeOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), true) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -713,7 +713,7 @@ func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSerie fn := filepath.Join(tb.TempDir(), indexFilename) - iw, err := NewWriter(ctx, fn, false) + iw, err := NewWriter(ctx, fn, true) require.NoError(tb, err) symbols := map[string]struct{}{} From c645e7a49b14044b5a2e42b5a29eda318e52b8d1 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 2 Apr 2025 09:04:30 +0100 Subject: [PATCH 4/6] tsdb/index.writeLabelIndices: Use yoloString Signed-off-by: Arve Knudsen --- tsdb/index/index.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 168e0e13dc..4e200bd9c7 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -651,10 +651,10 @@ func (w *Writer) writeLabelIndices() error { values := []uint32{} for d.Err() == nil && cnt > 0 { cnt-- - d.Uvarint() // Keycount. - name := d.UvarintBytes() // Label name. - value := d.UvarintBytes() // Label value. - d.Uvarint64() // Offset. + d.Uvarint() // Keycount. + name := d.UvarintBytes() // Label name. + value := yoloString(d.UvarintBytes()) // Label value. + d.Uvarint64() // Offset. if len(name) == 0 { continue // All index is ignored. } @@ -670,12 +670,12 @@ func (w *Writer) writeLabelIndices() error { var sid uint32 if w.indexCache != nil { var ok bool - sid, ok = w.indexCache[string(value)] + sid, ok = w.indexCache[value] if !ok { - return fmt.Errorf("symbol entry for %q does not exist", string(value)) + return fmt.Errorf("symbol entry for %q does not exist", value) } } else { - sid, err = w.symbols.ReverseLookup(string(value)) + sid, err = w.symbols.ReverseLookup(value) if err != nil { return err } From 9aecf395522e1b6333e29fcaeafd5511366a0088 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 6 May 2025 16:27:07 +0200 Subject: [PATCH 5/6] Apply reviewer feedback Signed-off-by: Arve Knudsen --- tsdb/index/index.go | 96 +++++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 52 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 4e200bd9c7..440460899b 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,7 +110,7 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } -type symbolCacheEntry struct { +type nameCacheEntry struct { index uint32 lastValueIndex uint32 lastValue string @@ -142,12 +142,17 @@ type Writer struct { buf1 encoding.Encbuf buf2 encoding.Encbuf - numSymbols int - symbols *Symbols - symbolFile *fileutil.MmapFile - lastSymbol string - symbolCache map[string]symbolCacheEntry - indexCache map[string]uint32 // From symbol to index in table. + numSymbols int + symbols *Symbols + symbolFile *fileutil.MmapFile + lastSymbol string + cacheAllSymbols bool + // TODO. + // Used when cacheAllSymbols is false. + nameCache map[string]nameCacheEntry + // From symbol to index in table. + // Used when cacheAllSymbols is true. + symbolToIndex map[string]uint32 labelIndexes []labelIndexHashEntry // Label index offsets. labelNames map[string]uint64 // Label names, and their usage. @@ -247,14 +252,15 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, + cacheAllSymbols: cacheAllSymbols, labelNames: make(map[string]uint64, 1<<8), crc32: newCRC32(), postingsEncoder: encoder, } if cacheAllSymbols { - iw.indexCache = make(map[string]uint32, 1<<16) + iw.symbolToIndex = make(map[string]uint32, 1<<16) } else { - iw.symbolCache = make(map[string]symbolCacheEntry, 1<<8) + iw.nameCache = make(map[string]nameCacheEntry, 1<<8) } if err := iw.writeMeta(); err != nil { return nil, err @@ -484,21 +490,21 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... if err := lset.Validate(func(l labels.Label) error { var valueIndex uint32 - if w.indexCache != nil { - nameIndex, ok := w.indexCache[l.Name] + if w.cacheAllSymbols { + nameIndex, ok := w.symbolToIndex[l.Name] if !ok { return fmt.Errorf("symbol entry for %q does not exist", l.Name) } w.labelNames[l.Name]++ w.buf2.PutUvarint32(nameIndex) - valueIndex, ok = w.indexCache[l.Value] + valueIndex, ok = w.symbolToIndex[l.Value] if !ok { return fmt.Errorf("symbol entry for %q does not exist", l.Value) } } else { var err error - cacheEntry, ok := w.symbolCache[l.Name] + cacheEntry, ok := w.nameCache[l.Name] nameIndex := cacheEntry.index if !ok { nameIndex, err = w.symbols.ReverseLookup(l.Name) @@ -515,7 +521,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... if err != nil { return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err) } - w.symbolCache[l.Name] = symbolCacheEntry{ + w.nameCache[l.Name] = nameCacheEntry{ index: nameIndex, lastValueIndex: valueIndex, lastValue: l.Value, @@ -579,8 +585,8 @@ func (w *Writer) AddSymbol(sym string) error { return fmt.Errorf("symbol %q out-of-order", sym) } w.lastSymbol = sym - if w.indexCache != nil { - w.indexCache[sym] = uint32(w.numSymbols) + if w.cacheAllSymbols { + w.symbolToIndex[sym] = uint32(w.numSymbols) } w.numSymbols++ w.buf1.Reset() @@ -667,18 +673,9 @@ func (w *Writer) writeLabelIndices() error { values = values[:0] } current = name - var sid uint32 - if w.indexCache != nil { - var ok bool - sid, ok = w.indexCache[value] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", value) - } - } else { - sid, err = w.symbols.ReverseLookup(value) - if err != nil { - return err - } + sid, err := w.indexForSymbol(value) + if err != nil { + return err } values = append(values, sid) } @@ -950,18 +947,9 @@ func (w *Writer) writePostingsToTmpFiles() error { nameSymbols := map[uint32]string{} for _, name := range batchNames { - var sid uint32 - if w.indexCache != nil { - var ok bool - sid, ok = w.indexCache[name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", name) - } - } else { - sid, err = w.symbols.ReverseLookup(name) - if err != nil { - return err - } + sid, err := w.indexForSymbol(name) + if err != nil { + return err } nameSymbols[sid] = name } @@ -998,18 +986,9 @@ func (w *Writer) writePostingsToTmpFiles() error { for _, name := range batchNames { // Write out postings for this label name. - var sid uint32 - if w.indexCache != nil { - var ok bool - sid, ok = w.indexCache[name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", name) - } - } else { - sid, err = w.symbols.ReverseLookup(name) - if err != nil { - return err - } + sid, err := w.indexForSymbol(name) + if err != nil { + return err } values := make([]uint32, 0, len(postings[sid])) for v := range postings[sid] { @@ -1036,6 +1015,19 @@ func (w *Writer) writePostingsToTmpFiles() error { return nil } +// indexForSymbol returns the index corresponding to a symbol, or an error if it's not found. +func (w *Writer) indexForSymbol(symbol string) (uint32, error) { + if w.cacheAllSymbols { + var ok bool + sid, ok := w.symbolToIndex[symbol] + if !ok { + return 0, fmt.Errorf("symbol entry for %q does not exist", symbol) + } + return sid, nil + } + return w.symbols.ReverseLookup(symbol) +} + // EncodePostingsRaw uses the "basic" postings list encoding format with no compression: // .... func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error { From 3c640f5759bb520ddfb74eb473a66d0084f04677 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Sun, 25 May 2025 15:34:42 +0200 Subject: [PATCH 6/6] Remove hidden flag, disable new behaviour by default Signed-off-by: Arve Knudsen --- CHANGELOG.md | 3 +- cmd/prometheus/main.go | 7 +-- cmd/promtool/backfill.go | 4 +- cmd/promtool/rules.go | 2 +- cmd/promtool/tsdb.go | 6 +-- tsdb/block_test.go | 2 +- tsdb/blockwriter.go | 20 ++++---- tsdb/blockwriter_test.go | 2 +- tsdb/compact.go | 16 +++--- tsdb/compact_test.go | 2 +- tsdb/db.go | 34 ++++++------- tsdb/db_test.go | 10 ++-- tsdb/index/index.go | 102 +++++++++++++++++++-------------------- tsdb/index/index_test.go | 10 ++-- tsdb/tsdbblockutil.go | 4 +- 15 files changed, 110 insertions(+), 114 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 335115ffe9..f07055ba4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Changelog +* [ENHANCEMENT] TSDB: Add configuration parameters for only caching series label symbols when writing TSDB indexes. #15836 + ## 3.4.0 / 2025-05-17 * [CHANGE] Config: Make setting out-of-order native histograms feature (`--enable-feature=ooo-native-histograms`) a no-op. Out-of-order native histograms are now always enabled when `out_of_order_time_window` is greater than zero and `--enable-feature=native-histograms` is set. #16207 @@ -12,7 +14,6 @@ * [ENHANCEMENT] Scraping: Add config option for escaping scheme request. #16066 * [ENHANCEMENT] Config: Add global config option for convert_classic_histograms_to_nhcb. #16226 * [ENHANCEMENT] Alerting: make batch size configurable (`--alertmanager.notification-batch-size`). #16254 -* [ENHANCEMENT] TSDB: Add hidden flag for disabling caching of all symbols for compaction: `--storage.tsdb.cache-all-symbols-for-compaction`. #15836 * [PERF] Kubernetes SD: make endpointSlice discovery more efficient. #16433 * [BUGFIX] Config: Fix auto-reload on changes to rule and scrape config files. #16340 * [BUGFIX] Scraping: Skip native histogram series if ingestion is disabled. #16218 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 967e94fe38..fc95154e63 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -474,9 +474,6 @@ func main() { serverOnlyFlag(a, "storage.tsdb.delayed-compaction.max-percent", "Sets the upper limit for the random compaction delay, specified as a percentage of the head chunk range. 100 means the compaction can be delayed by up to the entire head chunk range. Only effective when the delayed-compaction feature flag is enabled."). Default("10").Hidden().IntVar(&cfg.tsdb.CompactionDelayMaxPercent) - serverOnlyFlag(a, "storage.tsdb.cache-all-symbols-for-compaction", "Enable caching of all symbols for compaction."). - Default("true").Hidden().BoolVar(&cfg.tsdb.CacheAllSymbols) - agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage."). Default("data-agent/").StringVar(&cfg.agentStoragePath) @@ -1851,7 +1848,7 @@ type tsdbOptions struct { CompactionDelayMaxPercent int EnableOverlappingCompaction bool UseUncachedIO bool - CacheAllSymbols bool + CacheOnlySeriesSymbols bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1876,7 +1873,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, UseUncachedIO: opts.UseUncachedIO, - CacheAllSymbols: opts.CacheAllSymbols, + CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols, } } diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index 4511cf8fec..424bcf5660 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -89,7 +89,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn blockDuration := getCompatibleBlockDuration(maxBlockDuration) mint = blockDuration * (mint / blockDuration) - db, err := tsdb.OpenDBReadOnly(outputDir, "", true, nil) + db, err := tsdb.OpenDBReadOnly(outputDir, "", false, nil) if err != nil { return err } @@ -120,7 +120,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration, true) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration, false) if err != nil { return fmt.Errorf("block writer: %w", err) } diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 6cc50f10ec..a2b2d8ad36 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -133,7 +133,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration, true) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration, false) if err != nil { return fmt.Errorf("new block writer: %w", err) } diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 83e7fb067d..29ae00fdd2 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -336,7 +336,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { } func listBlocks(path string, humanReadable bool) error { - db, err := tsdb.OpenDBReadOnly(path, "", true, nil) + db, err := tsdb.OpenDBReadOnly(path, "", false, nil) if err != nil { return err } @@ -391,7 +391,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string { } func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) { - db, err := tsdb.OpenDBReadOnly(path, "", true, nil) + db, err := tsdb.OpenDBReadOnly(path, "", false, nil) if err != nil { return nil, nil, err } @@ -710,7 +710,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. type SeriesSetFormatter func(series storage.SeriesSet) error func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) { - db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, true, nil) + db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, false, nil) if err != nil { return err } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 41445588ed..5b8e2e2f6f 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -621,7 +621,7 @@ func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { - blockDir, err := CreateBlock(series, dir, 0, true, promslog.NewNopLogger()) + blockDir, err := CreateBlock(series, dir, 0, false, promslog.NewNopLogger()) require.NoError(tb, err) return blockDir } diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index bcd8ef8848..ccadb0ace0 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -33,10 +33,10 @@ type BlockWriter struct { logger *slog.Logger destinationDir string - head *Head - blockSize int64 // in ms - chunkDir string - cacheAllSymbols bool + head *Head + blockSize int64 // in ms + chunkDir string + CacheOnlySeriesSymbols bool } // ErrNoSeriesAppended is returned if the series count is zero while flushing blocks. @@ -50,12 +50,12 @@ var ErrNoSeriesAppended = errors.New("no series appended, aborting") // contains anything at all. It is the caller's responsibility to // ensure that the resulting blocks do not overlap etc. // Writer ensures the block flush is atomic (via rename). -func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64, cacheAllSymbols bool) (*BlockWriter, error) { +func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64, cacheOnlySeriesSymbols bool) (*BlockWriter, error) { w := &BlockWriter{ - logger: logger, - destinationDir: dir, - blockSize: blockSize, - cacheAllSymbols: cacheAllSymbols, + logger: logger, + destinationDir: dir, + blockSize: blockSize, + CacheOnlySeriesSymbols: cacheOnlySeriesSymbols, } if err := w.initHead(); err != nil { return nil, err @@ -105,7 +105,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { chunkenc.NewPool(), LeveledCompactorOptions{ EnableOverlappingCompaction: true, - CacheAllSymbols: w.cacheAllSymbols, + CacheOnlySeriesSymbols: w.CacheOnlySeriesSymbols, }) if err != nil { return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err) diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index 0e5cce3fa7..ade4a69522 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -29,7 +29,7 @@ import ( func TestBlockWriter(t *testing.T) { ctx := context.Background() outputDir := t.TempDir() - w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration, true) + w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration, false) require.NoError(t, err) // Add some series. diff --git a/tsdb/compact.go b/tsdb/compact.go index cdc2a676ad..4f54786b99 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -90,7 +90,7 @@ type LeveledCompactor struct { postingsEncoder index.PostingsEncoder postingsDecoderFactory PostingsDecoderFactory enableOverlappingCompaction bool - cacheAllSymbols bool + CacheOnlySeriesSymbols bool } type CompactorMetrics struct { @@ -175,8 +175,8 @@ type LeveledCompactorOptions struct { Metrics *CompactorMetrics // UseUncachedIO allows bypassing the page cache when appropriate. UseUncachedIO bool - // CacheAllSymbols enables caching of all TSDB symbols for compaction. - CacheAllSymbols bool + // CacheOnlySeriesSymbols enables caching of TSDB symbols only when adding series. + CacheOnlySeriesSymbols bool } type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder @@ -186,23 +186,21 @@ func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder { } // NewLeveledCompactorWithChunkSize returns a new LeveledCompactor with a certain max block segment chunk size. -// It's the same as calling NewLeveledCompactorWithOptions with maxBlockChunkSegmentSize, mergeFunc, enabled overlapping compaction, and caching of all symbols during compaction. +// It's the same as calling NewLeveledCompactorWithOptions with maxBlockChunkSegmentSize, mergeFunc, and enabled overlapping compaction. func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, MergeFunc: mergeFunc, EnableOverlappingCompaction: true, - CacheAllSymbols: true, }) } // NewLeveledCompactor returns a new LeveledCompactor. -// It's the same as calling NewLeveledCompactorWithOptions with mergeFunc, enabled overlapping compaction, and caching of all symbols during compaction. +// It's the same as calling NewLeveledCompactorWithOptions with mergeFunc, and enabled overlapping compaction. func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ MergeFunc: mergeFunc, EnableOverlappingCompaction: true, - CacheAllSymbols: true, }) } @@ -243,7 +241,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer postingsEncoder: pe, postingsDecoderFactory: opts.PD, enableOverlappingCompaction: opts.EnableOverlappingCompaction, - cacheAllSymbols: opts.CacheAllSymbols, + CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols, }, nil } @@ -686,7 +684,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } } - indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder, c.cacheAllSymbols) + indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder, c.CacheOnlySeriesSymbols) if err != nil { return fmt.Errorf("open index writer: %w", err) } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 61d7057c43..d142597c37 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1358,7 +1358,7 @@ func TestCancelCompactions(t *testing.T) { // This checks that the `context.Canceled` error is properly checked at all levels: // - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks. // - callers should check with errors.Is() instead of ==. - readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", true, promslog.NewNopLogger()) + readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", false, promslog.NewNopLogger()) require.NoError(t, err) blocks, err := readOnlyDB.Blocks() require.NoError(t, err) diff --git a/tsdb/db.go b/tsdb/db.go index b608fa90db..fc9172d0bb 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -90,7 +90,7 @@ func DefaultOptions() *Options { EnableOverlappingCompaction: true, EnableSharding: false, EnableDelayedCompaction: false, - CacheAllSymbols: true, + CacheOnlySeriesSymbols: false, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelay: time.Duration(0), PostingsDecoderFactory: DefaultPostingsDecoderFactory, @@ -208,8 +208,8 @@ type Options struct { // CompactionDelayMaxPercent is the upper limit for CompactionDelay, specified as a percentage of the head chunk range. CompactionDelayMaxPercent int - // CacheAllSymbols enables caching of all TSDB symbols for compaction. - CacheAllSymbols bool + // CacheOnlySeriesSymbols enables caching of TSDB symbols only when adding series, during compaction. + CacheOnlySeriesSymbols bool // NewCompactorFunc is a function that returns a TSDB compactor. NewCompactorFunc NewCompactorFunc @@ -433,16 +433,16 @@ var ErrClosed = errors.New("db already closed") // Current implementation doesn't support concurrency so // all API calls should happen in the same go routine. type DBReadOnly struct { - logger *slog.Logger - dir string - sandboxDir string - cacheAllSymbols bool - closers []io.Closer - closed chan struct{} + logger *slog.Logger + dir string + sandboxDir string + CacheOnlySeriesSymbols bool + closers []io.Closer + closed chan struct{} } // OpenDBReadOnly opens DB in the given directory for read only operations. -func OpenDBReadOnly(dir, sandboxDirRoot string, cacheAllSymbols bool, l *slog.Logger) (*DBReadOnly, error) { +func OpenDBReadOnly(dir, sandboxDirRoot string, cacheOnlySeriesSymbols bool, l *slog.Logger) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { return nil, fmt.Errorf("opening the db dir: %w", err) } @@ -460,11 +460,11 @@ func OpenDBReadOnly(dir, sandboxDirRoot string, cacheAllSymbols bool, l *slog.Lo } return &DBReadOnly{ - logger: l, - dir: dir, - cacheAllSymbols: cacheAllSymbols, - sandboxDir: sandboxDir, - closed: make(chan struct{}), + logger: l, + dir: dir, + CacheOnlySeriesSymbols: cacheOnlySeriesSymbols, + sandboxDir: sandboxDir, + closed: make(chan struct{}), }, nil } @@ -522,7 +522,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { chunkenc.NewPool(), LeveledCompactorOptions{ EnableOverlappingCompaction: true, - CacheAllSymbols: db.cacheAllSymbols, + CacheOnlySeriesSymbols: db.CacheOnlySeriesSymbols, }, ) if err != nil { @@ -915,7 +915,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, - CacheAllSymbols: opts.CacheAllSymbols, + CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols, PD: opts.PostingsDecoderFactory, UseUncachedIO: opts.UseUncachedIO, }) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 151ad7cb21..12634e7a69 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2443,7 +2443,7 @@ func TestDBReadOnly(t *testing.T) { } // Open a read only db and ensure that the API returns the same result as the normal DB. - dbReadOnly, err := OpenDBReadOnly(dbDir, "", true, logger) + dbReadOnly, err := OpenDBReadOnly(dbDir, "", false, logger) require.NoError(t, err) defer func() { require.NoError(t, dbReadOnly.Close()) }() @@ -2498,7 +2498,7 @@ func TestDBReadOnly(t *testing.T) { // all api methods return an ErrClosed. func TestDBReadOnlyClosing(t *testing.T) { sandboxDir := t.TempDir() - db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, true, promslog.New(&promslog.Config{})) + db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, false, promslog.New(&promslog.Config{})) require.NoError(t, err) // The sandboxDir was there. require.DirExists(t, db.sandboxDir) @@ -2540,7 +2540,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { } // Flush WAL. - db, err := OpenDBReadOnly(dbDir, "", true, logger) + db, err := OpenDBReadOnly(dbDir, "", false, logger) require.NoError(t, err) flush := t.TempDir() @@ -2548,7 +2548,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { require.NoError(t, db.Close()) // Reopen the DB from the flushed WAL block. - db, err = OpenDBReadOnly(flush, "", true, logger) + db, err = OpenDBReadOnly(flush, "", false, logger) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() blocks, err := db.Blocks() @@ -2596,7 +2596,7 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) { spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) { dBDirHash := dirHash(dir) // Bootstrap a RO db from the same dir and set up a querier. - dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, true, nil) + dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, false, nil) require.NoError(t, err) require.Equal(t, chunksCount, countChunks(dir)) q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 52a89240d6..36319d0a4d 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,7 +110,7 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } -type nameCacheEntry struct { +type seriesSymbolCacheEntry struct { index uint32 lastValueIndex uint32 lastValue string @@ -142,16 +142,14 @@ type Writer struct { buf1 encoding.Encbuf buf2 encoding.Encbuf - numSymbols int - symbols *Symbols - symbolFile *fileutil.MmapFile - lastSymbol string - cacheAllSymbols bool - // TODO. - // Used when cacheAllSymbols is false. - nameCache map[string]nameCacheEntry - // From symbol to index in table. - // Used when cacheAllSymbols is true. + numSymbols int + symbols *Symbols + symbolFile *fileutil.MmapFile + lastSymbol string + cacheOnlySeriesSymbols bool + // seriesSymbolCache is used for caching series label symbols when cacheOnlySeriesSymbols is true. + seriesSymbolCache map[string]seriesSymbolCacheEntry + // symbolToIndex is used for caching symbols with their index when cacheOnlySeriesSymbols is false. symbolToIndex map[string]uint32 labelIndexes []labelIndexHashEntry // Label index offsets. @@ -209,7 +207,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { // NewWriterWithEncoder returns a new Writer to the given filename. It // serializes data in format version 2. It uses the given encoder to encode each // postings list. -func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder, cacheAllSymbols bool) (*Writer, error) { +func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder, cacheOnlySeriesSymbols bool) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -252,15 +250,15 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - cacheAllSymbols: cacheAllSymbols, - labelNames: make(map[string]uint64, 1<<8), - crc32: newCRC32(), - postingsEncoder: encoder, + cacheOnlySeriesSymbols: cacheOnlySeriesSymbols, + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), + postingsEncoder: encoder, } - if cacheAllSymbols { + if !cacheOnlySeriesSymbols { iw.symbolToIndex = make(map[string]uint32, 1<<16) } else { - iw.nameCache = make(map[string]nameCacheEntry, 1<<8) + iw.seriesSymbolCache = make(map[string]seriesSymbolCacheEntry, 1<<8) } if err := iw.writeMeta(); err != nil { return nil, err @@ -270,8 +268,8 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode // NewWriter creates a new index writer using the default encoder. See // NewWriterWithEncoder. -func NewWriter(ctx context.Context, fn string, cacheAllSymbols bool) (*Writer, error) { - return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw, cacheAllSymbols) +func NewWriter(ctx context.Context, fn string, cacheOnlySeriesSymbols bool) (*Writer, error) { + return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw, cacheOnlySeriesSymbols) } func (w *Writer) write(bufs ...[]byte) error { @@ -489,45 +487,43 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... w.buf2.PutUvarint(lset.Len()) if err := lset.Validate(func(l labels.Label) error { - var valueIndex uint32 - if w.cacheAllSymbols { - nameIndex, ok := w.symbolToIndex[l.Name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", l.Name) + var ( + nameIndex uint32 + valueIndex uint32 + ) + if !w.cacheOnlySeriesSymbols { + var err error + if nameIndex, err = w.indexForSymbol(l.Name); err != nil { + return err } - w.labelNames[l.Name]++ - w.buf2.PutUvarint32(nameIndex) - - valueIndex, ok = w.symbolToIndex[l.Value] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", l.Value) + if valueIndex, err = w.indexForSymbol(l.Value); err != nil { + return err } } else { var err error - cacheEntry, ok := w.nameCache[l.Name] - nameIndex := cacheEntry.index - if !ok { - nameIndex, err = w.symbols.ReverseLookup(l.Name) - if err != nil { - return fmt.Errorf("symbol entry for %q does not exist, %w", l.Name, err) + cacheEntry, ok := w.seriesSymbolCache[l.Name] + if ok { + nameIndex = cacheEntry.index + valueIndex = cacheEntry.lastValueIndex + } else { + if nameIndex, err = w.indexForSymbol(l.Name); err != nil { + return err } } - w.labelNames[l.Name]++ - w.buf2.PutUvarint32(nameIndex) - valueIndex = cacheEntry.lastValueIndex if !ok || cacheEntry.lastValue != l.Value { - valueIndex, err = w.symbols.ReverseLookup(l.Value) - if err != nil { - return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err) + if valueIndex, err = w.indexForSymbol(l.Value); err != nil { + return err } - w.nameCache[l.Name] = nameCacheEntry{ + w.seriesSymbolCache[l.Name] = seriesSymbolCacheEntry{ index: nameIndex, lastValueIndex: valueIndex, lastValue: l.Value, } } } + w.labelNames[l.Name]++ + w.buf2.PutUvarint32(nameIndex) w.buf2.PutUvarint32(valueIndex) return nil }); err != nil { @@ -585,7 +581,7 @@ func (w *Writer) AddSymbol(sym string) error { return fmt.Errorf("symbol %q out-of-order", sym) } w.lastSymbol = sym - if w.cacheAllSymbols { + if !w.cacheOnlySeriesSymbols { w.symbolToIndex[sym] = uint32(w.numSymbols) } w.numSymbols++ @@ -1017,15 +1013,19 @@ func (w *Writer) writePostingsToTmpFiles() error { // indexForSymbol returns the index corresponding to a symbol, or an error if it's not found. func (w *Writer) indexForSymbol(symbol string) (uint32, error) { - if w.cacheAllSymbols { - var ok bool - sid, ok := w.symbolToIndex[symbol] - if !ok { - return 0, fmt.Errorf("symbol entry for %q does not exist", symbol) + if w.cacheOnlySeriesSymbols { + sid, err := w.symbols.ReverseLookup(symbol) + if err != nil { + return 0, fmt.Errorf("symbol entry for %q does not exist, %w", symbol, err) } return sid, nil } - return w.symbols.ReverseLookup(symbol) + + sid, ok := w.symbolToIndex[symbol] + if !ok { + return 0, fmt.Errorf("symbol entry for %q does not exist", symbol) + } + return sid, nil } // EncodePostingsRaw uses the "basic" postings list encoding format with no compression: diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 6e38046a20..81f95fea0a 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -142,7 +142,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(context.Background(), fn, true) + iw, err := NewWriter(context.Background(), fn, false) require.NoError(t, err) require.NoError(t, iw.Close()) @@ -445,7 +445,7 @@ func TestPersistence_index_e2e(t *testing.T) { } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { - w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), true) + w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), false) require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__")) @@ -568,7 +568,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { func TestChunksRefOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), true) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -588,7 +588,7 @@ func TestChunksRefOrdering(t *testing.T) { func TestChunksTimeOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), true) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -713,7 +713,7 @@ func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSerie fn := filepath.Join(tb.TempDir(), indexFilename) - iw, err := NewWriter(ctx, fn, true) + iw, err := NewWriter(ctx, fn, false) require.NoError(tb, err) symbols := map[string]struct{}{} diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 971b180a65..0a81d28ddc 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -27,7 +27,7 @@ import ( var ErrInvalidTimes = errors.New("max time is lesser than min time") // CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk. -func CreateBlock(series []storage.Series, dir string, chunkRange int64, cacheAllSymbols bool, logger *slog.Logger) (string, error) { +func CreateBlock(series []storage.Series, dir string, chunkRange int64, cacheOnlySeriesSymbols bool, logger *slog.Logger) (string, error) { if chunkRange == 0 { chunkRange = DefaultBlockDuration } @@ -35,7 +35,7 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, cacheAll return "", ErrInvalidTimes } - w, err := NewBlockWriter(logger, dir, chunkRange, cacheAllSymbols) + w, err := NewBlockWriter(logger, dir, chunkRange, cacheOnlySeriesSymbols) if err != nil { return "", err }