This commit is contained in:
Arve Knudsen 2025-08-05 14:42:22 +02:00 committed by GitHub
commit c82cf4a5be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 163 additions and 77 deletions

View File

@ -20,6 +20,7 @@
* [ENHANCEMENT] UI: Clear search field on `/targets` page. #16567 * [ENHANCEMENT] UI: Clear search field on `/targets` page. #16567
* [ENHANCEMENT] Rules: Check that rules parse without error earlier at startup. #16601 * [ENHANCEMENT] Rules: Check that rules parse without error earlier at startup. #16601
* [ENHANCEMENT] Promtool: Optional fuzzy float64 comparison in rules unittests. #16395 * [ENHANCEMENT] Promtool: Optional fuzzy float64 comparison in rules unittests. #16395
* [ENHANCEMENT] TSDB: Add configuration parameters for only caching series label symbols when writing TSDB indexes. #15836
* [PERF] PromQL: Reuse `histogramStatsIterator` where possible. #16686 * [PERF] PromQL: Reuse `histogramStatsIterator` where possible. #16686
* [PERF] PromQL: Reuse storage for custom bucket values for native histograms. #16565 * [PERF] PromQL: Reuse storage for custom bucket values for native histograms. #16565
* [PERF] UI: Optimize memoization and search debouncing on `/targets` page. #16589 * [PERF] UI: Optimize memoization and search debouncing on `/targets` page. #16589

View File

@ -1876,6 +1876,7 @@ type tsdbOptions struct {
CompactionDelayMaxPercent int CompactionDelayMaxPercent int
EnableOverlappingCompaction bool EnableOverlappingCompaction bool
UseUncachedIO bool UseUncachedIO bool
CacheOnlySeriesSymbols bool
} }
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@ -1900,6 +1901,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction, EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
UseUncachedIO: opts.UseUncachedIO, UseUncachedIO: opts.UseUncachedIO,
CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols,
} }
} }

View File

@ -89,7 +89,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
blockDuration := getCompatibleBlockDuration(maxBlockDuration) blockDuration := getCompatibleBlockDuration(maxBlockDuration)
mint = blockDuration * (mint / blockDuration) mint = blockDuration * (mint / blockDuration)
db, err := tsdb.OpenDBReadOnly(outputDir, "", nil) db, err := tsdb.OpenDBReadOnly(outputDir, "", false, nil)
if err != nil { if err != nil {
return err return err
} }
@ -120,7 +120,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
// also need to append samples throughout the whole block range. To allow that, we // also need to append samples throughout the whole block range. To allow that, we
// pretend that the block is twice as large here, but only really add sample in the // pretend that the block is twice as large here, but only really add sample in the
// original interval later. // original interval later.
w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration) w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration, false)
if err != nil { if err != nil {
return fmt.Errorf("block writer: %w", err) return fmt.Errorf("block writer: %w", err)
} }

View File

@ -133,7 +133,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName
// also need to append samples throughout the whole block range. To allow that, we // also need to append samples throughout the whole block range. To allow that, we
// pretend that the block is twice as large here, but only really add sample in the // pretend that the block is twice as large here, but only really add sample in the
// original interval later. // original interval later.
w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration) w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration, false)
if err != nil { if err != nil {
return fmt.Errorf("new block writer: %w", err) return fmt.Errorf("new block writer: %w", err)
} }

View File

@ -336,7 +336,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
} }
func listBlocks(path string, humanReadable bool) error { func listBlocks(path string, humanReadable bool) error {
db, err := tsdb.OpenDBReadOnly(path, "", nil) db, err := tsdb.OpenDBReadOnly(path, "", false, nil)
if err != nil { if err != nil {
return err return err
} }
@ -391,7 +391,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string {
} }
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) { func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
db, err := tsdb.OpenDBReadOnly(path, "", nil) db, err := tsdb.OpenDBReadOnly(path, "", false, nil)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -710,7 +710,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
type SeriesSetFormatter func(series storage.SeriesSet) error type SeriesSetFormatter func(series storage.SeriesSet) error
func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) { func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil) db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, false, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -621,7 +621,7 @@ func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp
// createBlock creates a block with given set of series and returns its dir. // createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []storage.Series) string { func createBlock(tb testing.TB, dir string, series []storage.Series) string {
blockDir, err := CreateBlock(series, dir, 0, promslog.NewNopLogger()) blockDir, err := CreateBlock(series, dir, 0, false, promslog.NewNopLogger())
require.NoError(tb, err) require.NoError(tb, err)
return blockDir return blockDir
} }

View File

@ -33,9 +33,10 @@ type BlockWriter struct {
logger *slog.Logger logger *slog.Logger
destinationDir string destinationDir string
head *Head head *Head
blockSize int64 // in ms blockSize int64 // in ms
chunkDir string chunkDir string
CacheOnlySeriesSymbols bool
} }
// ErrNoSeriesAppended is returned if the series count is zero while flushing blocks. // ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.
@ -49,11 +50,12 @@ var ErrNoSeriesAppended = errors.New("no series appended, aborting")
// contains anything at all. It is the caller's responsibility to // contains anything at all. It is the caller's responsibility to
// ensure that the resulting blocks do not overlap etc. // ensure that the resulting blocks do not overlap etc.
// Writer ensures the block flush is atomic (via rename). // Writer ensures the block flush is atomic (via rename).
func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64) (*BlockWriter, error) { func NewBlockWriter(logger *slog.Logger, dir string, blockSize int64, cacheOnlySeriesSymbols bool) (*BlockWriter, error) {
w := &BlockWriter{ w := &BlockWriter{
logger: logger, logger: logger,
destinationDir: dir, destinationDir: dir,
blockSize: blockSize, blockSize: blockSize,
CacheOnlySeriesSymbols: cacheOnlySeriesSymbols,
} }
if err := w.initHead(); err != nil { if err := w.initHead(); err != nil {
return nil, err return nil, err
@ -96,11 +98,15 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
maxt := w.head.MaxTime() + 1 maxt := w.head.MaxTime() + 1
w.logger.Info("flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) w.logger.Info("flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))
compactor, err := NewLeveledCompactor(ctx, compactor, err := NewLeveledCompactorWithOptions(ctx,
nil, nil,
w.logger, w.logger,
[]int64{w.blockSize}, []int64{w.blockSize},
chunkenc.NewPool(), nil) chunkenc.NewPool(),
LeveledCompactorOptions{
EnableOverlappingCompaction: true,
CacheOnlySeriesSymbols: w.CacheOnlySeriesSymbols,
})
if err != nil { if err != nil {
return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err) return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err)
} }

View File

@ -29,7 +29,7 @@ import (
func TestBlockWriter(t *testing.T) { func TestBlockWriter(t *testing.T) {
ctx := context.Background() ctx := context.Background()
outputDir := t.TempDir() outputDir := t.TempDir()
w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration) w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration, false)
require.NoError(t, err) require.NoError(t, err)
// Add some series. // Add some series.

View File

@ -90,6 +90,7 @@ type LeveledCompactor struct {
postingsEncoder index.PostingsEncoder postingsEncoder index.PostingsEncoder
postingsDecoderFactory PostingsDecoderFactory postingsDecoderFactory PostingsDecoderFactory
enableOverlappingCompaction bool enableOverlappingCompaction bool
CacheOnlySeriesSymbols bool
} }
type CompactorMetrics struct { type CompactorMetrics struct {
@ -174,6 +175,8 @@ type LeveledCompactorOptions struct {
Metrics *CompactorMetrics Metrics *CompactorMetrics
// UseUncachedIO allows bypassing the page cache when appropriate. // UseUncachedIO allows bypassing the page cache when appropriate.
UseUncachedIO bool UseUncachedIO bool
// CacheOnlySeriesSymbols enables caching of TSDB symbols only when adding series.
CacheOnlySeriesSymbols bool
} }
type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder
@ -182,6 +185,8 @@ func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder {
return index.DecodePostingsRaw return index.DecodePostingsRaw
} }
// NewLeveledCompactor returns a new LeveledCompactor.
// It's the same as calling NewLeveledCompactorWithOptions with mergeFunc, and enabled overlapping compaction.
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MergeFunc: mergeFunc, MergeFunc: mergeFunc,
@ -226,6 +231,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
postingsEncoder: pe, postingsEncoder: pe,
postingsDecoderFactory: opts.PD, postingsDecoderFactory: opts.PD,
enableOverlappingCompaction: opts.EnableOverlappingCompaction, enableOverlappingCompaction: opts.EnableOverlappingCompaction,
CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols,
}, nil }, nil
} }
@ -668,7 +674,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
} }
} }
indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder, c.CacheOnlySeriesSymbols)
if err != nil { if err != nil {
return fmt.Errorf("open index writer: %w", err) return fmt.Errorf("open index writer: %w", err)
} }

View File

@ -1365,7 +1365,7 @@ func TestCancelCompactions(t *testing.T) {
// This checks that the `context.Canceled` error is properly checked at all levels: // This checks that the `context.Canceled` error is properly checked at all levels:
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks. // - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
// - callers should check with errors.Is() instead of ==. // - callers should check with errors.Is() instead of ==.
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger()) readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", false, promslog.NewNopLogger())
require.NoError(t, err) require.NoError(t, err)
blocks, err := readOnlyDB.Blocks() blocks, err := readOnlyDB.Blocks()
require.NoError(t, err) require.NoError(t, err)

View File

@ -90,6 +90,7 @@ func DefaultOptions() *Options {
EnableOverlappingCompaction: true, EnableOverlappingCompaction: true,
EnableSharding: false, EnableSharding: false,
EnableDelayedCompaction: false, EnableDelayedCompaction: false,
CacheOnlySeriesSymbols: false,
CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent,
CompactionDelay: time.Duration(0), CompactionDelay: time.Duration(0),
PostingsDecoderFactory: DefaultPostingsDecoderFactory, PostingsDecoderFactory: DefaultPostingsDecoderFactory,
@ -207,6 +208,9 @@ type Options struct {
// CompactionDelayMaxPercent is the upper limit for CompactionDelay, specified as a percentage of the head chunk range. // CompactionDelayMaxPercent is the upper limit for CompactionDelay, specified as a percentage of the head chunk range.
CompactionDelayMaxPercent int CompactionDelayMaxPercent int
// CacheOnlySeriesSymbols enables caching of TSDB symbols only when adding series, during compaction.
CacheOnlySeriesSymbols bool
// NewCompactorFunc is a function that returns a TSDB compactor. // NewCompactorFunc is a function that returns a TSDB compactor.
NewCompactorFunc NewCompactorFunc NewCompactorFunc NewCompactorFunc
@ -429,15 +433,16 @@ var ErrClosed = errors.New("db already closed")
// Current implementation doesn't support concurrency so // Current implementation doesn't support concurrency so
// all API calls should happen in the same go routine. // all API calls should happen in the same go routine.
type DBReadOnly struct { type DBReadOnly struct {
logger *slog.Logger logger *slog.Logger
dir string dir string
sandboxDir string sandboxDir string
closers []io.Closer CacheOnlySeriesSymbols bool
closed chan struct{} closers []io.Closer
closed chan struct{}
} }
// OpenDBReadOnly opens DB in the given directory for read only operations. // OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, error) { func OpenDBReadOnly(dir, sandboxDirRoot string, cacheOnlySeriesSymbols bool, l *slog.Logger) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil { if _, err := os.Stat(dir); err != nil {
return nil, fmt.Errorf("opening the db dir: %w", err) return nil, fmt.Errorf("opening the db dir: %w", err)
} }
@ -455,10 +460,11 @@ func OpenDBReadOnly(dir, sandboxDirRoot string, l *slog.Logger) (*DBReadOnly, er
} }
return &DBReadOnly{ return &DBReadOnly{
logger: l, logger: l,
dir: dir, dir: dir,
sandboxDir: sandboxDir, CacheOnlySeriesSymbols: cacheOnlySeriesSymbols,
closed: make(chan struct{}), sandboxDir: sandboxDir,
closed: make(chan struct{}),
}, nil }, nil
} }
@ -508,12 +514,16 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
mint := head.MinTime() mint := head.MinTime()
maxt := head.MaxTime() maxt := head.MaxTime()
rh := NewRangeHead(head, mint, maxt) rh := NewRangeHead(head, mint, maxt)
compactor, err := NewLeveledCompactor( compactor, err := NewLeveledCompactorWithOptions(
context.Background(), context.Background(),
nil, nil,
db.logger, db.logger,
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
chunkenc.NewPool(), nil, chunkenc.NewPool(),
LeveledCompactorOptions{
EnableOverlappingCompaction: true,
CacheOnlySeriesSymbols: db.CacheOnlySeriesSymbols,
},
) )
if err != nil { if err != nil {
return fmt.Errorf("create leveled compactor: %w", err) return fmt.Errorf("create leveled compactor: %w", err)
@ -905,6 +915,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction, EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
CacheOnlySeriesSymbols: opts.CacheOnlySeriesSymbols,
PD: opts.PostingsDecoderFactory, PD: opts.PostingsDecoderFactory,
UseUncachedIO: opts.UseUncachedIO, UseUncachedIO: opts.UseUncachedIO,
}) })

View File

@ -2443,7 +2443,7 @@ func TestDBReadOnly(t *testing.T) {
} }
// Open a read only db and ensure that the API returns the same result as the normal DB. // Open a read only db and ensure that the API returns the same result as the normal DB.
dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger) dbReadOnly, err := OpenDBReadOnly(dbDir, "", false, logger)
require.NoError(t, err) require.NoError(t, err)
defer func() { require.NoError(t, dbReadOnly.Close()) }() defer func() { require.NoError(t, dbReadOnly.Close()) }()
@ -2498,7 +2498,7 @@ func TestDBReadOnly(t *testing.T) {
// all api methods return an ErrClosed. // all api methods return an ErrClosed.
func TestDBReadOnlyClosing(t *testing.T) { func TestDBReadOnlyClosing(t *testing.T) {
sandboxDir := t.TempDir() sandboxDir := t.TempDir()
db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, promslog.New(&promslog.Config{})) db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, false, promslog.New(&promslog.Config{}))
require.NoError(t, err) require.NoError(t, err)
// The sandboxDir was there. // The sandboxDir was there.
require.DirExists(t, db.sandboxDir) require.DirExists(t, db.sandboxDir)
@ -2540,7 +2540,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
} }
// Flush WAL. // Flush WAL.
db, err := OpenDBReadOnly(dbDir, "", logger) db, err := OpenDBReadOnly(dbDir, "", false, logger)
require.NoError(t, err) require.NoError(t, err)
flush := t.TempDir() flush := t.TempDir()
@ -2548,7 +2548,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
// Reopen the DB from the flushed WAL block. // Reopen the DB from the flushed WAL block.
db, err = OpenDBReadOnly(flush, "", logger) db, err = OpenDBReadOnly(flush, "", false, logger)
require.NoError(t, err) require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
blocks, err := db.Blocks() blocks, err := db.Blocks()
@ -2596,7 +2596,7 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) {
spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) { spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) {
dBDirHash := dirHash(dir) dBDirHash := dirHash(dir)
// Bootstrap a RO db from the same dir and set up a querier. // Bootstrap a RO db from the same dir and set up a querier.
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil) dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, false, nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, chunksCount, countChunks(dir)) require.Equal(t, chunksCount, countChunks(dir))
q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt) q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt)

View File

@ -110,6 +110,12 @@ func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable) return crc32.New(castagnoliTable)
} }
type seriesSymbolCacheEntry struct {
index uint32
lastValueIndex uint32
lastValue string
}
type PostingsEncoder func(*encoding.Encbuf, []uint32) error type PostingsEncoder func(*encoding.Encbuf, []uint32) error
type PostingsDecoder func(encoding.Decbuf) (int, Postings, error) type PostingsDecoder func(encoding.Decbuf) (int, Postings, error)
@ -136,11 +142,15 @@ type Writer struct {
buf1 encoding.Encbuf buf1 encoding.Encbuf
buf2 encoding.Encbuf buf2 encoding.Encbuf
numSymbols int numSymbols int
symbols *Symbols symbols *Symbols
symbolFile *fileutil.MmapFile symbolFile *fileutil.MmapFile
lastSymbol string lastSymbol string
symbolCache map[string]uint32 // From symbol to index in table. cacheOnlySeriesSymbols bool
// seriesSymbolCache is used for caching series label symbols when cacheOnlySeriesSymbols is true.
seriesSymbolCache map[string]seriesSymbolCacheEntry
// symbolToIndex is used for caching symbols with their index when cacheOnlySeriesSymbols is false.
symbolToIndex map[string]uint32
labelIndexes []labelIndexHashEntry // Label index offsets. labelIndexes []labelIndexHashEntry // Label index offsets.
labelNames map[string]uint64 // Label names, and their usage. labelNames map[string]uint64 // Label names, and their usage.
@ -197,7 +207,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
// NewWriterWithEncoder returns a new Writer to the given filename. It // NewWriterWithEncoder returns a new Writer to the given filename. It
// serializes data in format version 2. It uses the given encoder to encode each // serializes data in format version 2. It uses the given encoder to encode each
// postings list. // postings list.
func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) { func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder, cacheOnlySeriesSymbols bool) (*Writer, error) {
dir := filepath.Dir(fn) dir := filepath.Dir(fn)
df, err := fileutil.OpenDir(dir) df, err := fileutil.OpenDir(dir)
@ -240,10 +250,15 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
symbolCache: make(map[string]uint32, 1<<16), cacheOnlySeriesSymbols: cacheOnlySeriesSymbols,
labelNames: make(map[string]uint64, 1<<8), labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(), crc32: newCRC32(),
postingsEncoder: encoder, postingsEncoder: encoder,
}
if !cacheOnlySeriesSymbols {
iw.symbolToIndex = make(map[string]uint32, 1<<16)
} else {
iw.seriesSymbolCache = make(map[string]seriesSymbolCacheEntry, 1<<8)
} }
if err := iw.writeMeta(); err != nil { if err := iw.writeMeta(); err != nil {
return nil, err return nil, err
@ -253,8 +268,8 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode
// NewWriter creates a new index writer using the default encoder. See // NewWriter creates a new index writer using the default encoder. See
// NewWriterWithEncoder. // NewWriterWithEncoder.
func NewWriter(ctx context.Context, fn string) (*Writer, error) { func NewWriter(ctx context.Context, fn string, cacheOnlySeriesSymbols bool) (*Writer, error) {
return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw) return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw, cacheOnlySeriesSymbols)
} }
func (w *Writer) write(bufs ...[]byte) error { func (w *Writer) write(bufs ...[]byte) error {
@ -472,17 +487,43 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
w.buf2.PutUvarint(lset.Len()) w.buf2.PutUvarint(lset.Len())
if err := lset.Validate(func(l labels.Label) error { if err := lset.Validate(func(l labels.Label) error {
nameIndex, ok := w.symbolCache[l.Name] var (
if !ok { nameIndex uint32
return fmt.Errorf("symbol entry for %q does not exist", l.Name) valueIndex uint32
)
if !w.cacheOnlySeriesSymbols {
var err error
if nameIndex, err = w.indexForSymbol(l.Name); err != nil {
return err
}
if valueIndex, err = w.indexForSymbol(l.Value); err != nil {
return err
}
} else {
var err error
cacheEntry, ok := w.seriesSymbolCache[l.Name]
if ok {
nameIndex = cacheEntry.index
valueIndex = cacheEntry.lastValueIndex
} else {
if nameIndex, err = w.indexForSymbol(l.Name); err != nil {
return err
}
}
if !ok || cacheEntry.lastValue != l.Value {
if valueIndex, err = w.indexForSymbol(l.Value); err != nil {
return err
}
w.seriesSymbolCache[l.Name] = seriesSymbolCacheEntry{
index: nameIndex,
lastValueIndex: valueIndex,
lastValue: l.Value,
}
}
} }
w.labelNames[l.Name]++ w.labelNames[l.Name]++
w.buf2.PutUvarint32(nameIndex) w.buf2.PutUvarint32(nameIndex)
valueIndex, ok := w.symbolCache[l.Value]
if !ok {
return fmt.Errorf("symbol entry for %q does not exist", l.Value)
}
w.buf2.PutUvarint32(valueIndex) w.buf2.PutUvarint32(valueIndex)
return nil return nil
}); err != nil { }); err != nil {
@ -540,7 +581,9 @@ func (w *Writer) AddSymbol(sym string) error {
return fmt.Errorf("symbol %q out-of-order", sym) return fmt.Errorf("symbol %q out-of-order", sym)
} }
w.lastSymbol = sym w.lastSymbol = sym
w.symbolCache[sym] = uint32(w.numSymbols) if !w.cacheOnlySeriesSymbols {
w.symbolToIndex[sym] = uint32(w.numSymbols)
}
w.numSymbols++ w.numSymbols++
w.buf1.Reset() w.buf1.Reset()
w.buf1.PutUvarintStr(sym) w.buf1.PutUvarintStr(sym)
@ -610,10 +653,10 @@ func (w *Writer) writeLabelIndices() error {
values := []uint32{} values := []uint32{}
for d.Err() == nil && cnt > 0 { for d.Err() == nil && cnt > 0 {
cnt-- cnt--
d.Uvarint() // Keycount. d.Uvarint() // Keycount.
name := d.UvarintBytes() // Label name. name := d.UvarintBytes() // Label name.
value := d.UvarintBytes() // Label value. value := yoloString(d.UvarintBytes()) // Label value.
d.Uvarint64() // Offset. d.Uvarint64() // Offset.
if len(name) == 0 { if len(name) == 0 {
continue // All index is ignored. continue // All index is ignored.
} }
@ -626,9 +669,9 @@ func (w *Writer) writeLabelIndices() error {
values = values[:0] values = values[:0]
} }
current = name current = name
sid, ok := w.symbolCache[string(value)] sid, err := w.indexForSymbol(value)
if !ok { if err != nil {
return fmt.Errorf("symbol entry for %q does not exist", string(value)) return err
} }
values = append(values, sid) values = append(values, sid)
} }
@ -900,9 +943,9 @@ func (w *Writer) writePostingsToTmpFiles() error {
nameSymbols := map[uint32]string{} nameSymbols := map[uint32]string{}
for _, name := range batchNames { for _, name := range batchNames {
sid, ok := w.symbolCache[name] sid, err := w.indexForSymbol(name)
if !ok { if err != nil {
return fmt.Errorf("symbol entry for %q does not exist", name) return err
} }
nameSymbols[sid] = name nameSymbols[sid] = name
} }
@ -939,9 +982,9 @@ func (w *Writer) writePostingsToTmpFiles() error {
for _, name := range batchNames { for _, name := range batchNames {
// Write out postings for this label name. // Write out postings for this label name.
sid, ok := w.symbolCache[name] sid, err := w.indexForSymbol(name)
if !ok { if err != nil {
return fmt.Errorf("symbol entry for %q does not exist", name) return err
} }
values := make([]uint32, 0, len(postings[sid])) values := make([]uint32, 0, len(postings[sid]))
for v := range postings[sid] { for v := range postings[sid] {
@ -968,6 +1011,23 @@ func (w *Writer) writePostingsToTmpFiles() error {
return nil return nil
} }
// indexForSymbol returns the index corresponding to a symbol, or an error if it's not found.
func (w *Writer) indexForSymbol(symbol string) (uint32, error) {
if w.cacheOnlySeriesSymbols {
sid, err := w.symbols.ReverseLookup(symbol)
if err != nil {
return 0, fmt.Errorf("symbol entry for %q does not exist, %w", symbol, err)
}
return sid, nil
}
sid, ok := w.symbolToIndex[symbol]
if !ok {
return 0, fmt.Errorf("symbol entry for %q does not exist", symbol)
}
return sid, nil
}
// EncodePostingsRaw uses the "basic" postings list encoding format with no compression: // EncodePostingsRaw uses the "basic" postings list encoding format with no compression:
// <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>. // <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>.
func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error { func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error {

View File

@ -142,7 +142,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
fn := filepath.Join(dir, indexFilename) fn := filepath.Join(dir, indexFilename)
// An empty index must still result in a readable file. // An empty index must still result in a readable file.
iw, err := NewWriter(context.Background(), fn) iw, err := NewWriter(context.Background(), fn, false)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, iw.Close()) require.NoError(t, iw.Close())
@ -445,7 +445,7 @@ func TestPersistence_index_e2e(t *testing.T) {
} }
func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index")) w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), false)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.AddSymbol("__name__")) require.NoError(t, w.AddSymbol("__name__"))
@ -568,7 +568,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) {
func TestChunksRefOrdering(t *testing.T) { func TestChunksRefOrdering(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, idx.AddSymbol("1")) require.NoError(t, idx.AddSymbol("1"))
@ -588,7 +588,7 @@ func TestChunksRefOrdering(t *testing.T) {
func TestChunksTimeOrdering(t *testing.T) { func TestChunksTimeOrdering(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"), false)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, idx.AddSymbol("1")) require.NoError(t, idx.AddSymbol("1"))
@ -713,7 +713,7 @@ func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSerie
fn := filepath.Join(tb.TempDir(), indexFilename) fn := filepath.Join(tb.TempDir(), indexFilename)
iw, err := NewWriter(ctx, fn) iw, err := NewWriter(ctx, fn, false)
require.NoError(tb, err) require.NoError(tb, err)
symbols := map[string]struct{}{} symbols := map[string]struct{}{}

View File

@ -27,7 +27,7 @@ import (
var ErrInvalidTimes = errors.New("max time is lesser than min time") var ErrInvalidTimes = errors.New("max time is lesser than min time")
// CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk. // CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk.
func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger *slog.Logger) (string, error) { func CreateBlock(series []storage.Series, dir string, chunkRange int64, cacheOnlySeriesSymbols bool, logger *slog.Logger) (string, error) {
if chunkRange == 0 { if chunkRange == 0 {
chunkRange = DefaultBlockDuration chunkRange = DefaultBlockDuration
} }
@ -35,7 +35,7 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger *
return "", ErrInvalidTimes return "", ErrInvalidTimes
} }
w, err := NewBlockWriter(logger, dir, chunkRange) w, err := NewBlockWriter(logger, dir, chunkRange, cacheOnlySeriesSymbols)
if err != nil { if err != nil {
return "", err return "", err
} }