diff --git a/CHANGELOG.md b/CHANGELOG.md index dd3a163c06..368a7b6c36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [ENHANCEMENT] UI: Clear search field on `/targets` page. #16567 * [ENHANCEMENT] Rules: Check that rules parse without error earlier at startup. #16601 * [ENHANCEMENT] Promtool: Optional fuzzy float64 comparison in rules unittests. #16395 +* [ENHANCEMENT] TSDB: Add configuration parameters for only caching series label symbols when writing TSDB indexes. #15836 * [PERF] PromQL: Reuse `histogramStatsIterator` where possible. #16686 * [PERF] PromQL: Reuse storage for custom bucket values for native histograms. #16565 * [PERF] UI: Optimize memoization and search debouncing on `/targets` page. #16589 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c4e3fe7914..6a1356ff4d 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1876,6 +1876,7 @@ type tsdbOptions struct { CompactionDelayMaxPercent int EnableOverlappingCompaction bool UseUncachedIO bool + CacheOnlySeriesSymbols bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1900,6 +1901,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, UseUncachedIO: opts.UseUncachedIO, + CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols, } } diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index 47de3b5c1c..424bcf5660 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -89,7 +89,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn blockDuration := getCompatibleBlockDuration(maxBlockDuration) mint = blockDuration * (mint / blockDuration) - db, err := tsdb.OpenDBReadOnly(outputDir, "", nil) + db, err := tsdb.OpenDBReadOnly(outputDir, "", false, nil) if err != nil { return err } @@ -120,7 +120,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration, false) if err != nil { return fmt.Errorf("block writer: %w", err) } diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index b2eb18ca8e..a2b2d8ad36 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -133,7 +133,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName // also need to append samples throughout the whole block range. To allow that, we // pretend that the block is twice as large here, but only really add sample in the // original interval later. - w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration) + w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration, false) if err != nil { return fmt.Errorf("new block writer: %w", err) } diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index f512728ac9..29ae00fdd2 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -336,7 +336,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { } func listBlocks(path string, humanReadable bool) error { - db, err := tsdb.OpenDBReadOnly(path, "", nil) + db, err := tsdb.OpenDBReadOnly(path, "", false, nil) if err != nil { return err } @@ -391,7 +391,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string { } func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) { - db, err := tsdb.OpenDBReadOnly(path, "", nil) + db, err := tsdb.OpenDBReadOnly(path, "", false, nil) if err != nil { return nil, nil, err } @@ -710,7 +710,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. type SeriesSetFormatter func(series storage.SeriesSet) error func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) { - db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil) + db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, false, nil) if err != nil { return err } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 0f892a3782..5b8e2e2f6f 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -621,7 +621,7 @@ func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { - blockDir, err := CreateBlock(series, dir, 0, promslog.NewNopLogger()) + blockDir, err := CreateBlock(series, dir, 0, false, promslog.NewNopLogger()) require.NoError(tb, err) return blockDir } diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 5eb8a649a9..ccadb0ace0 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -33,9 +33,10 @@ type BlockWriter struct { logger *slog.Logger destinationDir string - head *Head - blockSize int64 // in ms - chunkDir string + head *Head + blockSize int64 // in ms + chunkDir string + CacheOnlySeriesSymbols bool } // ErrNoSeriesAppended is returned if the series count is zero while flushing blocks. @@ -49,11 +50,12 @@ var ErrNoSeriesAppended = errors.New("no series appended, aborting") // contains anything at all. It is the caller's responsibility to // ensure that the resulting blocks do not overlap etc. // Writer ensures the block flush is atomic (via rename). -func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64) (*BlockWriter, error) { +func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64, cacheOnlySeriesSymbols bool) (*BlockWriter, error) { w := &BlockWriter{ - logger: logger, - destinationDir: dir, - blockSize: blockSize, + logger: logger, + destinationDir: dir, + blockSize: blockSize, + CacheOnlySeriesSymbols: cacheOnlySeriesSymbols, } if err := w.initHead(); err != nil { return nil, err @@ -96,11 +98,15 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { maxt := w.head.MaxTime() + 1 w.logger.Info("flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) - compactor, err := NewLeveledCompactor(ctx, + compactor, err := NewLeveledCompactorWithOptions(ctx, nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool(), nil) + chunkenc.NewPool(), + LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + CacheOnlySeriesSymbols: w.CacheOnlySeriesSymbols, + }) if err != nil { return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err) } diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index e7c3146247..ade4a69522 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -29,7 +29,7 @@ import ( func TestBlockWriter(t *testing.T) { ctx := context.Background() outputDir := t.TempDir() - w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration) + w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration, false) require.NoError(t, err) // Add some series. diff --git a/tsdb/compact.go b/tsdb/compact.go index 7828fd0860..d8f5d1cf13 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -90,6 +90,7 @@ type LeveledCompactor struct { postingsEncoder index.PostingsEncoder postingsDecoderFactory PostingsDecoderFactory enableOverlappingCompaction bool + CacheOnlySeriesSymbols bool } type CompactorMetrics struct { @@ -174,6 +175,8 @@ type LeveledCompactorOptions struct { Metrics *CompactorMetrics // UseUncachedIO allows bypassing the page cache when appropriate. UseUncachedIO bool + // CacheOnlySeriesSymbols enables caching of TSDB symbols only when adding series. + CacheOnlySeriesSymbols bool } type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder @@ -182,6 +185,8 @@ func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder { return index.DecodePostingsRaw } +// NewLeveledCompactor returns a new LeveledCompactor. +// It's the same as calling NewLeveledCompactorWithOptions with mergeFunc, and enabled overlapping compaction. func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ MergeFunc: mergeFunc, @@ -226,6 +231,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer postingsEncoder: pe, postingsDecoderFactory: opts.PD, enableOverlappingCompaction: opts.EnableOverlappingCompaction, + CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols, }, nil } @@ -668,7 +674,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } } - indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) + indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder, c.CacheOnlySeriesSymbols) if err != nil { return fmt.Errorf("open index writer: %w", err) } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 330c187af6..34e5c1c224 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1365,7 +1365,7 @@ func TestCancelCompactions(t *testing.T) { // This checks that the `context.Canceled` error is properly checked at all levels: // - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks. // - callers should check with errors.Is() instead of ==. - readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger()) + readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", false, promslog.NewNopLogger()) require.NoError(t, err) blocks, err := readOnlyDB.Blocks() require.NoError(t, err) diff --git a/tsdb/db.go b/tsdb/db.go index 4d21d4dc12..04c983bc19 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -90,6 +90,7 @@ func DefaultOptions() *Options { EnableOverlappingCompaction: true, EnableSharding: false, EnableDelayedCompaction: false, + CacheOnlySeriesSymbols: false, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelay: time.Duration(0), PostingsDecoderFactory: DefaultPostingsDecoderFactory, @@ -207,6 +208,9 @@ type Options struct { // CompactionDelayMaxPercent is the upper limit for CompactionDelay, specified as a percentage of the head chunk range. CompactionDelayMaxPercent int + // CacheOnlySeriesSymbols enables caching of TSDB symbols only when adding series, during compaction. + CacheOnlySeriesSymbols bool + // NewCompactorFunc is a function that returns a TSDB compactor. NewCompactorFunc NewCompactorFunc @@ -429,15 +433,16 @@ var ErrClosed = errors.New("db already closed") // Current implementation doesn't support concurrency so // all API calls should happen in the same go routine. type DBReadOnly struct { - logger *slog.Logger - dir string - sandboxDir string - closers []io.Closer - closed chan struct{} + logger *slog.Logger + dir string + sandboxDir string + CacheOnlySeriesSymbols bool + closers []io.Closer + closed chan struct{} } // OpenDBReadOnly opens DB in the given directory for read only operations. -func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, error) { +func OpenDBReadOnly(dir, sandboxDirRoot string, cacheOnlySeriesSymbols bool, l *slog.Logger) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { return nil, fmt.Errorf("opening the db dir: %w", err) } @@ -455,10 +460,11 @@ func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, er } return &DBReadOnly{ - logger: l, - dir: dir, - sandboxDir: sandboxDir, - closed: make(chan struct{}), + logger: l, + dir: dir, + CacheOnlySeriesSymbols: cacheOnlySeriesSymbols, + sandboxDir: sandboxDir, + closed: make(chan struct{}), }, nil } @@ -508,12 +514,16 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { mint := head.MinTime() maxt := head.MaxTime() rh := NewRangeHead(head, mint, maxt) - compactor, err := NewLeveledCompactor( + compactor, err := NewLeveledCompactorWithOptions( context.Background(), nil, db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), - chunkenc.NewPool(), nil, + chunkenc.NewPool(), + LeveledCompactorOptions{ + EnableOverlappingCompaction: true, + CacheOnlySeriesSymbols: db.CacheOnlySeriesSymbols, + }, ) if err != nil { return fmt.Errorf("create leveled compactor: %w", err) @@ -905,6 +915,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols, PD: opts.PostingsDecoderFactory, UseUncachedIO: opts.UseUncachedIO, }) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2c9bfe6640..6641372227 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2443,7 +2443,7 @@ func TestDBReadOnly(t *testing.T) { } // Open a read only db and ensure that the API returns the same result as the normal DB. - dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger) + dbReadOnly, err := OpenDBReadOnly(dbDir, "", false, logger) require.NoError(t, err) defer func() { require.NoError(t, dbReadOnly.Close()) }() @@ -2498,7 +2498,7 @@ func TestDBReadOnly(t *testing.T) { // all api methods return an ErrClosed. func TestDBReadOnlyClosing(t *testing.T) { sandboxDir := t.TempDir() - db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, promslog.New(&promslog.Config{})) + db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, false, promslog.New(&promslog.Config{})) require.NoError(t, err) // The sandboxDir was there. require.DirExists(t, db.sandboxDir) @@ -2540,7 +2540,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { } // Flush WAL. - db, err := OpenDBReadOnly(dbDir, "", logger) + db, err := OpenDBReadOnly(dbDir, "", false, logger) require.NoError(t, err) flush := t.TempDir() @@ -2548,7 +2548,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { require.NoError(t, db.Close()) // Reopen the DB from the flushed WAL block. - db, err = OpenDBReadOnly(flush, "", logger) + db, err = OpenDBReadOnly(flush, "", false, logger) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() blocks, err := db.Blocks() @@ -2596,7 +2596,7 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) { spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) { dBDirHash := dirHash(dir) // Bootstrap a RO db from the same dir and set up a querier. - dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil) + dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, false, nil) require.NoError(t, err) require.Equal(t, chunksCount, countChunks(dir)) q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index edcb92a719..36319d0a4d 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,6 +110,12 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } +type seriesSymbolCacheEntry struct { + index uint32 + lastValueIndex uint32 + lastValue string +} + type PostingsEncoder func(*encoding.Encbuf, []uint32) error type PostingsDecoder func(encoding.Decbuf) (int, Postings, error) @@ -136,11 +142,15 @@ type Writer struct { buf1 encoding.Encbuf buf2 encoding.Encbuf - numSymbols int - symbols *Symbols - symbolFile *fileutil.MmapFile - lastSymbol string - symbolCache map[string]uint32 // From symbol to index in table. + numSymbols int + symbols *Symbols + symbolFile *fileutil.MmapFile + lastSymbol string + cacheOnlySeriesSymbols bool + // seriesSymbolCache is used for caching series label symbols when cacheOnlySeriesSymbols is true. + seriesSymbolCache map[string]seriesSymbolCacheEntry + // symbolToIndex is used for caching symbols with their index when cacheOnlySeriesSymbols is false. + symbolToIndex map[string]uint32 labelIndexes []labelIndexHashEntry // Label index offsets. labelNames map[string]uint64 // Label names, and their usage. @@ -197,7 +207,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { // NewWriterWithEncoder returns a new Writer to the given filename. It // serializes data in format version 2. It uses the given encoder to encode each // postings list. -func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) { +func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder, cacheOnlySeriesSymbols bool) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -240,10 +250,15 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - symbolCache: make(map[string]uint32, 1<<16), - labelNames: make(map[string]uint64, 1<<8), - crc32: newCRC32(), - postingsEncoder: encoder, + cacheOnlySeriesSymbols: cacheOnlySeriesSymbols, + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), + postingsEncoder: encoder, + } + if !cacheOnlySeriesSymbols { + iw.symbolToIndex = make(map[string]uint32, 1<<16) + } else { + iw.seriesSymbolCache = make(map[string]seriesSymbolCacheEntry, 1<<8) } if err := iw.writeMeta(); err != nil { return nil, err @@ -253,8 +268,8 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode // NewWriter creates a new index writer using the default encoder. See // NewWriterWithEncoder. -func NewWriter(ctx context.Context, fn string) (*Writer, error) { - return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw) +func NewWriter(ctx context.Context, fn string, cacheOnlySeriesSymbols bool) (*Writer, error) { + return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw, cacheOnlySeriesSymbols) } func (w *Writer) write(bufs ...[]byte) error { @@ -472,17 +487,43 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... w.buf2.PutUvarint(lset.Len()) if err := lset.Validate(func(l labels.Label) error { - nameIndex, ok := w.symbolCache[l.Name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", l.Name) + var ( + nameIndex uint32 + valueIndex uint32 + ) + if !w.cacheOnlySeriesSymbols { + var err error + if nameIndex, err = w.indexForSymbol(l.Name); err != nil { + return err + } + if valueIndex, err = w.indexForSymbol(l.Value); err != nil { + return err + } + } else { + var err error + cacheEntry, ok := w.seriesSymbolCache[l.Name] + if ok { + nameIndex = cacheEntry.index + valueIndex = cacheEntry.lastValueIndex + } else { + if nameIndex, err = w.indexForSymbol(l.Name); err != nil { + return err + } + } + + if !ok || cacheEntry.lastValue != l.Value { + if valueIndex, err = w.indexForSymbol(l.Value); err != nil { + return err + } + w.seriesSymbolCache[l.Name] = seriesSymbolCacheEntry{ + index: nameIndex, + lastValueIndex: valueIndex, + lastValue: l.Value, + } + } } w.labelNames[l.Name]++ w.buf2.PutUvarint32(nameIndex) - - valueIndex, ok := w.symbolCache[l.Value] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", l.Value) - } w.buf2.PutUvarint32(valueIndex) return nil }); err != nil { @@ -540,7 +581,9 @@ func (w *Writer) AddSymbol(sym string) error { return fmt.Errorf("symbol %q out-of-order", sym) } w.lastSymbol = sym - w.symbolCache[sym] = uint32(w.numSymbols) + if !w.cacheOnlySeriesSymbols { + w.symbolToIndex[sym] = uint32(w.numSymbols) + } w.numSymbols++ w.buf1.Reset() w.buf1.PutUvarintStr(sym) @@ -610,10 +653,10 @@ func (w *Writer) writeLabelIndices() error { values := []uint32{} for d.Err() == nil && cnt > 0 { cnt-- - d.Uvarint() // Keycount. - name := d.UvarintBytes() // Label name. - value := d.UvarintBytes() // Label value. - d.Uvarint64() // Offset. + d.Uvarint() // Keycount. + name := d.UvarintBytes() // Label name. + value := yoloString(d.UvarintBytes()) // Label value. + d.Uvarint64() // Offset. if len(name) == 0 { continue // All index is ignored. } @@ -626,9 +669,9 @@ func (w *Writer) writeLabelIndices() error { values = values[:0] } current = name - sid, ok := w.symbolCache[string(value)] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", string(value)) + sid, err := w.indexForSymbol(value) + if err != nil { + return err } values = append(values, sid) } @@ -900,9 +943,9 @@ func (w *Writer) writePostingsToTmpFiles() error { nameSymbols := map[uint32]string{} for _, name := range batchNames { - sid, ok := w.symbolCache[name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", name) + sid, err := w.indexForSymbol(name) + if err != nil { + return err } nameSymbols[sid] = name } @@ -939,9 +982,9 @@ func (w *Writer) writePostingsToTmpFiles() error { for _, name := range batchNames { // Write out postings for this label name. - sid, ok := w.symbolCache[name] - if !ok { - return fmt.Errorf("symbol entry for %q does not exist", name) + sid, err := w.indexForSymbol(name) + if err != nil { + return err } values := make([]uint32, 0, len(postings[sid])) for v := range postings[sid] { @@ -968,6 +1011,23 @@ func (w *Writer) writePostingsToTmpFiles() error { return nil } +// indexForSymbol returns the index corresponding to a symbol, or an error if it's not found. +func (w *Writer) indexForSymbol(symbol string) (uint32, error) { + if w.cacheOnlySeriesSymbols { + sid, err := w.symbols.ReverseLookup(symbol) + if err != nil { + return 0, fmt.Errorf("symbol entry for %q does not exist, %w", symbol, err) + } + return sid, nil + } + + sid, ok := w.symbolToIndex[symbol] + if !ok { + return 0, fmt.Errorf("symbol entry for %q does not exist", symbol) + } + return sid, nil +} + // EncodePostingsRaw uses the "basic" postings list encoding format with no compression: // .... func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error { diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 17b4cc88dd..81f95fea0a 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -142,7 +142,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(context.Background(), fn) + iw, err := NewWriter(context.Background(), fn, false) require.NoError(t, err) require.NoError(t, iw.Close()) @@ -445,7 +445,7 @@ func TestPersistence_index_e2e(t *testing.T) { } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { - w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index")) + w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), false) require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__")) @@ -568,7 +568,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { func TestChunksRefOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -588,7 +588,7 @@ func TestChunksRefOrdering(t *testing.T) { func TestChunksTimeOrdering(t *testing.T) { dir := t.TempDir() - idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) @@ -713,7 +713,7 @@ func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSerie fn := filepath.Join(tb.TempDir(), indexFilename) - iw, err := NewWriter(ctx, fn) + iw, err := NewWriter(ctx, fn, false) require.NoError(tb, err) symbols := map[string]struct{}{} diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index af2348019a..0a81d28ddc 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -27,7 +27,7 @@ import ( var ErrInvalidTimes = errors.New("max time is lesser than min time") // CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk. -func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger *slog.Logger) (string, error) { +func CreateBlock(series []storage.Series, dir string, chunkRange int64, cacheOnlySeriesSymbols bool, logger *slog.Logger) (string, error) { if chunkRange == 0 { chunkRange = DefaultBlockDuration } @@ -35,7 +35,7 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger * return "", ErrInvalidTimes } - w, err := NewBlockWriter(logger, dir, chunkRange) + w, err := NewBlockWriter(logger, dir, chunkRange, cacheOnlySeriesSymbols) if err != nil { return "", err }