From ca578101aff0022c64d8a47827de6fcca0a168f2 Mon Sep 17 00:00:00 2001 From: Denys Sedchenko <9203548+x1unix@users.noreply.github.com> Date: Fri, 24 Apr 2026 13:42:26 -0400 Subject: [PATCH] feat(tsdb/agent): Implement checkpoint based on series in memory (#17948) Adds CheckpointFromInMemorySeries option for agent.Options to enable a faster checkpoint implementation that skips segment re-read and just uses in-memory data instead. * feat: impl agent-specific checkpoint dir * feat: impl ActiveSeries interface * feat: use new checkpoint impl * feat: hide new checkpoint impl behind a feature flag * feat: add benchmark * feat: add benchstat case * feat: use feature flag in bench * feat: use same labels for persisted state and append * feat: set WAL segment size * feat: add checkpoint size metric and bump series size * feat: wal replay test * feat: expose new checkpoint opts in cmd flags * feat: update cli doc * add ActiveSeries and DeletedSeries doc Signed-off-by: x1unix <9203548+x1unix@users.noreply.github.com> Signed-off-by: Denys Sedchenko <9203548+x1unix@users.noreply.github.com> Co-authored-by: George Krajcsovits --- cmd/prometheus/main.go | 53 ++-- docs/command-line/prometheus.md | 2 + tsdb/agent/checkpoint.go | 232 ++++++++++++++++ tsdb/agent/checkpoint_test.go | 466 ++++++++++++++++++++++++++++++++ tsdb/agent/db.go | 63 +++-- tsdb/agent/db_test.go | 2 +- tsdb/agent/series.go | 109 +++++++- tsdb/agent/series_test.go | 8 +- tsdb/wlog/checkpoint.go | 12 +- 9 files changed, 896 insertions(+), 51 deletions(-) create mode 100644 tsdb/agent/checkpoint.go create mode 100644 tsdb/agent/checkpoint_test.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ca908c8b29..fc28ccd3f7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -557,6 +557,12 @@ func main() { "Maximum age samples may be before being forcibly deleted when the WAL is truncated"). Default(agentDefaultMaxWALTime).SetValue(&cfg.agent.MaxWALTime) + agentOnlyFlag(a, "storage.agent.checkpoint-from-in-memory-series", "Use only in-memory series data when building a checkpoint."). + Default("false").BoolVar(&cfg.agent.CheckpointFromInMemorySeries) + + agentOnlyFlag(a, "storage.agent.checkpoint-batch-size", "Size of a single WAL log entry chunk to be flushed. Has no effect without --storage.agent.checkpoint-from-in-memory-series flag."). + Default("1000").IntVar(&cfg.agent.CheckpointBatchSize) + agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory."). Default("false").BoolVar(&cfg.agent.NoLockfile) @@ -662,6 +668,11 @@ func main() { os.Exit(3) } + if agentMode && cfg.agent.CheckpointBatchSize <= 0 { + fmt.Fprintln(os.Stderr, "--storage.agent.checkpoint-batch-size must be greater than 0.") + os.Exit(1) + } + if cfg.memlimitRatio <= 0.0 || cfg.memlimitRatio > 1.0 { fmt.Fprintf(os.Stderr, "--auto-gomemlimit.ratio must be greater than 0 and less than or equal to 1.") os.Exit(1) @@ -2078,15 +2089,17 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { // agentOptions is a version of agent.Options with defined units. This is required // as agent.Option fields are unit agnostic (time). type agentOptions struct { - WALSegmentSize units.Base2Bytes - WALCompressionType compression.Type - StripeSize int - TruncateFrequency model.Duration - MinWALTime, MaxWALTime model.Duration - NoLockfile bool - OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove. - EnableSTAsZeroSample bool - EnableSTStorage bool + WALSegmentSize units.Base2Bytes + WALCompressionType compression.Type + StripeSize int + TruncateFrequency model.Duration + MinWALTime, MaxWALTime model.Duration + NoLockfile bool + OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove. + EnableSTAsZeroSample bool + EnableSTStorage bool + CheckpointFromInMemorySeries bool + CheckpointBatchSize int } func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options { @@ -2094,16 +2107,18 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option outOfOrderTimeWindow = 0 } return agent.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - WALCompression: opts.WALCompressionType, - StripeSize: opts.StripeSize, - TruncateFrequency: time.Duration(opts.TruncateFrequency), - MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), - MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), - NoLockfile: opts.NoLockfile, - OutOfOrderTimeWindow: outOfOrderTimeWindow, - EnableSTAsZeroSample: opts.EnableSTAsZeroSample, - EnableSTStorage: opts.EnableSTStorage, + WALSegmentSize: int(opts.WALSegmentSize), + WALCompression: opts.WALCompressionType, + StripeSize: opts.StripeSize, + TruncateFrequency: time.Duration(opts.TruncateFrequency), + MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), + MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), + NoLockfile: opts.NoLockfile, + OutOfOrderTimeWindow: outOfOrderTimeWindow, + EnableSTAsZeroSample: opts.EnableSTAsZeroSample, + EnableSTStorage: opts.EnableSTStorage, + CheckpointFromInMemorySeries: opts.CheckpointFromInMemorySeries, + CheckpointBatchSize: opts.CheckpointBatchSize, } } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index df0425c98d..e6e5b104f4 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -43,6 +43,8 @@ The Prometheus monitoring server | --storage.agent.wal-compression | Compress the agent WAL. If false, the --storage.agent.wal-compression-type flag is ignored. Use with agent mode only. | `true` | | --storage.agent.retention.min-time | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | `5m` | | --storage.agent.retention.max-time | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | `4h` | +| --storage.agent.checkpoint-from-in-memory-series | Use only in-memory series data when building a checkpoint. Use with agent mode only. | `false` | +| --storage.agent.checkpoint-batch-size | Size of a single WAL log entry chunk to be flushed. Has no effect without --storage.agent.checkpoint-from-in-memory-series flag. Use with agent mode only. | `1000` | | --storage.agent.no-lockfile | Do not create lockfile in data directory. Use with agent mode only. | `false` | | --storage.remote.flush-deadline | How long to wait flushing sample on shutdown or config reload. | `1m` | | --storage.remote.read-sample-limit | Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types. Use with server mode only. | `5e7` | diff --git a/tsdb/agent/checkpoint.go b/tsdb/agent/checkpoint.go new file mode 100644 index 0000000000..97d71c33bb --- /dev/null +++ b/tsdb/agent/checkpoint.go @@ -0,0 +1,232 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "errors" + "fmt" + "iter" + "log/slog" + "os" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wlog" +) + +const defaultBatchSize = 1000 + +// ActiveSeries describes a live series to be written by [Checkpoint]. +// +// This interface is intentionally exported so downstream users of this package +// can use [Checkpoint] without depending on Prometheus internal series types. +type ActiveSeries interface { + Ref() chunks.HeadSeriesRef + Labels() labels.Labels + LastSampleTimestamp() int64 +} + +// DeletedSeries describes a deleted series to be written by [Checkpoint]. +// +// This interface is intentionally exported so downstream users of this package +// can use [Checkpoint] without depending on Prometheus internal series types. +type DeletedSeries interface { + Ref() chunks.HeadSeriesRef + Labels() labels.Labels +} + +// Checkpoint creates an unindexed checkpoint containing record.RefSeries and +// last timestamp for ActiveSeries and record.RefSeries for DeletedSeries. +// +// This API accepts interfaces so downstream users of this package can provide +// their own series storage while reusing Prometheus checkpoint writing logic. +// +// The difference between this implementation and [wlog.Checkpoint] is that it skips re-read current checkpoint + segments +// and relies on data in memory. +func Checkpoint(logger *slog.Logger, w *wlog.WL, atIndex, batchSize int, activeSeries iter.Seq[ActiveSeries], deletedSeries iter.Seq[DeletedSeries]) error { + if batchSize <= 0 { + batchSize = defaultBatchSize + } + logger.Info("Creating checkpoint", "atIndex", atIndex) + + dir, idx, err := wlog.LastCheckpoint(w.Dir()) + if err != nil && !errors.Is(err, record.ErrNotFound) { + return fmt.Errorf("can't find last checkpoint: %w", err) + } + + if idx >= atIndex { + logger.Info( + "checkpoint already exists", + "dir", dir, + "index", idx, + "requested_index", atIndex, + ) + return nil + } + + if err := wlog.DeleteTempCheckpoints(logger, w.Dir()); err != nil { + return fmt.Errorf("failed to cleanup temporary checkpoints: %w", err) + } + + cpDir := wlog.CheckpointDir(w.Dir(), atIndex) + cpTmpDir := cpDir + wlog.CheckpointTempFileSuffix + + if err := os.MkdirAll(cpTmpDir, os.ModePerm); err != nil { + return fmt.Errorf("create checkpoint dir: %w", err) + } + + cp, err := wlog.New(logger, nil, cpTmpDir, w.CompressionType()) + if err != nil { + return fmt.Errorf("open checkpoint: %w", err) + } + + success := false + defer func() { + if !success { + cp.Close() + } + os.RemoveAll(cpTmpDir) + }() + + flusher := newCheckpointFlusher(cp, batchSize) + if err := flusher.writeSeries(activeSeries); err != nil { + return err + } + + if err := flusher.writeDeletedRecords(deletedSeries); err != nil { + return err + } + + success = true + if err := cp.Close(); err != nil { + return fmt.Errorf("close checkpoint: %w", err) + } + + df, err := fileutil.OpenDir(cpTmpDir) + if err != nil { + return fmt.Errorf("open temporary checkpoint directory: %w", err) + } + if err := df.Sync(); err != nil { + df.Close() + return fmt.Errorf("sync temporary checkpoint directory: %w", err) + } + if err = df.Close(); err != nil { + return fmt.Errorf("close temporary checkpoint directory: %w", err) + } + + if err := fileutil.Replace(cpTmpDir, cpDir); err != nil { + return fmt.Errorf("rename checkpoint directory: %w", err) + } + + return nil +} + +type checkpointWriter struct { + enc record.Encoder + checkpoint *wlog.WL + seriesBuff []byte + samplesBuff []byte + + seriesRecords []record.RefSeries + sampleRecords []record.RefSample + batchSize int +} + +func newCheckpointFlusher(checkpoint *wlog.WL, batchSize int) *checkpointWriter { + return &checkpointWriter{ + batchSize: batchSize, + checkpoint: checkpoint, + seriesRecords: make([]record.RefSeries, 0, batchSize), + sampleRecords: make([]record.RefSample, 0, batchSize), + } +} + +func (cf *checkpointWriter) flushRecords() error { + withSamples := len(cf.sampleRecords) > 0 + cf.seriesBuff = cf.enc.Series(cf.seriesRecords, cf.seriesBuff) + + var err error + if withSamples { + cf.samplesBuff = cf.enc.Samples(cf.sampleRecords, cf.samplesBuff) + err = cf.checkpoint.Log(cf.seriesBuff, cf.samplesBuff) + } else { + err = cf.checkpoint.Log(cf.seriesBuff) + } + + if err != nil { + return fmt.Errorf("flush records: %w", err) + } + + cf.seriesBuff = cf.seriesBuff[:0] + cf.samplesBuff = cf.samplesBuff[:0] + cf.seriesRecords = cf.seriesRecords[:0] + cf.sampleRecords = cf.sampleRecords[:0] + return nil +} + +func (cf *checkpointWriter) writeSeries(seriesIter iter.Seq[ActiveSeries]) error { + for series := range seriesIter { + // If we filled the buffers, write them out and reset. + if len(cf.seriesRecords) == cf.batchSize { + if err := cf.flushRecords(); err != nil { + return fmt.Errorf("flush active series: %w", err) + } + } + + cf.seriesRecords = append(cf.seriesRecords, record.RefSeries{ + Ref: series.Ref(), + Labels: series.Labels(), + }) + + // Sample value is irrelevant, we only need the timestamp. + cf.sampleRecords = append(cf.sampleRecords, record.RefSample{ + Ref: series.Ref(), + T: series.LastSampleTimestamp(), + V: 0, + }) + } + + // Flush the last batch if we have one + if len(cf.seriesRecords) != 0 { + return cf.flushRecords() + } + + return nil +} + +func (cf *checkpointWriter) writeDeletedRecords(seriesRefIter iter.Seq[DeletedSeries]) error { + for series := range seriesRefIter { + // If we filled the buffers, write them out and reset. + if len(cf.seriesRecords) == cf.batchSize { + if err := cf.flushRecords(); err != nil { + return fmt.Errorf("flush deleted series: %w", err) + } + } + + // We don't care about timestamps here, so no samples. + cf.seriesRecords = append(cf.seriesRecords, record.RefSeries{ + Ref: series.Ref(), + Labels: series.Labels(), + }) + } + + // Clear the last batch if we have one + if len(cf.seriesRecords) != 0 { + return cf.flushRecords() + } + + return nil +} diff --git a/tsdb/agent/checkpoint_test.go b/tsdb/agent/checkpoint_test.go new file mode 100644 index 0000000000..87d41fc41a --- /dev/null +++ b/tsdb/agent/checkpoint_test.go @@ -0,0 +1,466 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "errors" + "fmt" + "io/fs" + "maps" + "math" + "os" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/tsdb/wlog" +) + +const walSegmentSize = 32 << 10 // must be aligned to the page size + +func TestCheckpointReplayCompatibility(t *testing.T) { + // Test to ensure that WAL replay between wlog.Checkpoint and agent.Checkpoint are the same. + var ( + wlogAfterSeries *stripeSeries + agentAfterSeries *stripeSeries + ) + + openDBAndDo := func(isInMemCheckpoint bool, storageDir string, fn func(db *DB)) { + l := promslog.NewNopLogger() + rs := remote.NewStorage( + promslog.NewNopLogger(), nil, + startTime, storageDir, + 30*time.Second, nil, false, + ) + defer rs.Close() + + opts := DefaultOptions() + opts.CheckpointFromInMemorySeries = isInMemCheckpoint + opts.WALSegmentSize = walSegmentSize // Set minimum size to get more segments for checkpoint. + + db, err := Open(l, nil, rs, storageDir, opts) + require.NoError(t, err, "Open") + fn(db) + } + + // Prepare samples and labels that will be written into appender. + samples := genCheckpointTestSamples(checkpointTestSamplesParams{ + labelPrefix: t.Name(), + numDatapoints: 3, + numHistograms: 3, + numSeries: 300, + }) + + appendData := func(db *DB) { + app := db.Appender(t.Context()) + const flushEvery = 1000 + n := 0 + maybeFlush := func() { + if n < flushEvery { + return + } + require.NoError(t, app.Commit()) + app = db.Appender(t.Context()) + n = 0 + } + + lbls := samples.datapointLabels + for i, l := range lbls { + lset := labels.New(l...) + for j, sample := range samples.datapointSamples { + st := sample[0].T() + sf := sample[0].F() + + // replay doesn't include exemplars, thus don't include them to remove them from assertion. + _, err := app.Append(0, lset, st, sf) + require.NoErrorf(t, err, "L: %v; S: %v", i, j) + n++ + maybeFlush() + } + } + + for i, l := range samples.histogramLabels { + lset := labels.New(l...) + histograms := samples.histogramSamples[i] + for j, sample := range histograms { + _, err := app.AppendHistogram(0, lset, int64(j), sample, nil) + require.NoError(t, err) + n++ + maybeFlush() + } + } + + require.NoError(t, app.Commit()) + } + + // Write and replay for old wlog.Checkpoint + + // wlog.Open expects to have a "wal" subdirectory + wlogStateRoot := filepath.Join(t.TempDir(), "state-wlog") + wlogWalDir := filepath.Join(wlogStateRoot, "wal") + require.NoError(t, os.MkdirAll(wlogWalDir, os.ModePerm)) + + openDBAndDo(false, wlogStateRoot, func(db *DB) { + appendData(db) + + // Trigger checkpoint call. + err := db.truncate(-1) + require.NoError(t, err, "db.truncate") + require.NoError(t, db.Close()) + }) + assertCheckpointExists(t, wlogWalDir, 1) + + // Restore the database from the checkpoint. + openDBAndDo(true, wlogStateRoot, func(db *DB) { + defer db.Close() + wlogAfterSeries = db.series + }) + + // Write and replay using agent.Checkpoint: + agentStateRoot := filepath.Join(t.TempDir(), "state-agent") + agentWalDir := filepath.Join(agentStateRoot, "wal") + require.NoError(t, os.MkdirAll(agentWalDir, os.ModePerm)) + + openDBAndDo(true, agentStateRoot, func(db *DB) { + appendData(db) + + err := db.truncate(-1) + require.NoError(t, err, "db.truncate") + require.NoError(t, db.Close()) + }) + + assertCheckpointExists(t, agentWalDir, 1) + openDBAndDo(true, agentStateRoot, func(db *DB) { + defer db.Close() + agentAfterSeries = db.series + }) + + requireStripeSeriesEqual(t, wlogAfterSeries, agentAfterSeries) +} + +// requireStripeSeriesEqual asserts that two stripeSeries are semantically +// equivalent: same set of refs, each memSeries has matching labels (by +// content) and lastTs. It avoids reflect.DeepEqual on labels.Labels because +// under -tags=dedupelabels the struct carries a per-instance nameTable +// pointer and symbol-table IDs whose layout depends on the order in which +// records were interned during replay — an implementation detail, not a +// behavioural property. +func requireStripeSeriesEqual(t *testing.T, want, got *stripeSeries) { + t.Helper() + + require.Equal(t, want.size, got.size, "stripeSeries size mismatch") + + collect := func(s *stripeSeries) map[chunks.HeadSeriesRef]*memSeries { + out := map[chunks.HeadSeriesRef]*memSeries{} + for _, m := range s.series { + maps.Copy(out, m) + } + return out + } + wantByRef := collect(want) + gotByRef := collect(got) + + require.Len(t, gotByRef, len(wantByRef), "series count mismatch") + + for ref, w := range wantByRef { + g, ok := gotByRef[ref] + require.Truef(t, ok, "ref %d present in wlog path, missing in agent path", ref) + require.Truef(t, labels.Equal(w.lset, g.lset), + "ref %d labels mismatch: wlog=%s agent=%s", ref, w.lset.String(), g.lset.String()) + require.Equalf(t, w.lastTs, g.lastTs, "ref %d lastTs mismatch", ref) + } +} + +func assertCheckpointExists(t *testing.T, walDir string, checkpointID int) { + d := wlog.CheckpointDir(walDir, checkpointID) + v, err := os.Stat(d) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + t.Fatalf("checkpoint doesn't exists in WAL dir %q", walDir) + return + } + + t.Fatalf("can't stat checkpoint dir %q", err) + return + } + + require.True(t, v.IsDir(), "checkpoint should be a dir") +} + +// To run the benchmark and display a diff, use the following command: +// +// go test -bench="BenchmarkCheckpoint" . -run ^$ -benchmem -count 6 -benchtime 5s | tee benchmarks +// benchstat -col '/checkpoint' benchmarks +func BenchmarkCheckpoint(b *testing.B) { + // Prepare in advance samples and labels that will be written into appender. + samples := genCheckpointTestSamples(checkpointTestSamplesParams{ + labelPrefix: b.Name(), + numDatapoints: 10, + numHistograms: 10, + numSeries: 3600, + }) + + // Prepare initial wlog state with segments. + testSamplesSrcDir := filepath.Join(b.TempDir(), "samples-src") + require.NoError(b, os.Mkdir(testSamplesSrcDir, os.ModePerm)) + createCheckpointFixtures(b, checkpointFixtureParams{ + dir: testSamplesSrcDir, + numSegments: 512, + dtDelta: 10000, + segmentSize: walSegmentSize, // must be aligned to the page size + seriesLabels: samples.datapointLabels, + }) + + configs := []struct { + name string + useAgentCheckpoint bool + }{ + { + name: "wlog", + useAgentCheckpoint: false, + }, + { + name: "agent", + useAgentCheckpoint: true, + }, + } + + for _, cfg := range configs { + tname := fmt.Sprintf("checkpoint=%s", cfg.name) + b.Run(tname, func(b *testing.B) { + // Copy initial wlog state into a scratch directory for test. + // wlog.Open expects to have a "wal" subdirectory + wlogDir := filepath.Join(b.TempDir(), "testdata", "wal") + err := os.CopyFS(wlogDir, os.DirFS(testSamplesSrcDir)) + require.NoErrorf(b, err, "failed to copy test samples from %q to %q", testSamplesSrcDir, wlogDir) + storageDir := filepath.Dir(wlogDir) + + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + benchCheckpoint(b, benchCheckpointParams{ + storageDir: storageDir, + samples: samples, + skipCurrentCheckpointReRead: cfg.useAgentCheckpoint, + }) + + // Get the size of the checkpoint directory + checkpointSize := getCheckpointSize(b, wlogDir) + b.ReportMetric(float64(checkpointSize), "checkpoint_size") + } + }) + } +} + +func getCheckpointSize(b testing.TB, walDir string) int64 { + dirName, _, err := wlog.LastCheckpoint(walDir) + require.NoError(b, err, "can't find the last checkpoint") + // Walk through a dir and accumulate total size of all files + var size int64 + err = filepath.WalkDir(dirName, func(_ string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + info, err := d.Info() + if err != nil { + return err + } + size += info.Size() + } + return nil + }) + require.NoError(b, err, "can't walk through the checkpoint dir") + return size +} + +type benchCheckpointParams struct { + storageDir string + skipCurrentCheckpointReRead bool + samples checkpointTestSamples +} + +func benchCheckpoint(b *testing.B, p benchCheckpointParams) { + b.StopTimer() + + l := promslog.NewNopLogger() + rs := remote.NewStorage( + promslog.NewNopLogger(), nil, + startTime, p.storageDir, + 30*time.Second, nil, false, + ) + defer rs.Close() + + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = math.MaxInt64 // Fixes "out of order sample" in benchmarks. + opts.CheckpointFromInMemorySeries = p.skipCurrentCheckpointReRead + opts.WALSegmentSize = walSegmentSize // Set minimum size to get more segments for checkpoint. + + db, err := Open(l, nil, rs, p.storageDir, opts) + require.NoError(b, err, "Open") + + app := db.Appender(b.Context()) + const flushEvery = 1000 + n := 0 + maybeFlush := func() { + if n < flushEvery { + return + } + require.NoError(b, app.Commit()) + app = db.Appender(b.Context()) + n = 0 + } + + lbls := p.samples.datapointLabels + for i, l := range lbls { + lset := labels.New(l...) + for j, sample := range p.samples.datapointSamples { + st := sample[0].T() + sf := sample[0].F() + ref, err := app.Append(0, lset, st, sf) + require.NoErrorf(b, err, "L: %v; S: %v", i, j) + + e := exemplar.Exemplar{ + Labels: lset, + Ts: sample[0].T() + int64(i), + Value: sample[0].F(), + HasTs: true, + } + + _, err = app.AppendExemplar(ref, lset, e) + require.NoError(b, err) + + n += 2 + maybeFlush() + } + } + + for i, l := range p.samples.histogramLabels { + lset := labels.New(l...) + histograms := p.samples.histogramSamples[i] + for j, sample := range histograms { + _, err := app.AppendHistogram(0, lset, int64(j), sample, nil) + require.NoError(b, err) + n++ + maybeFlush() + } + } + + require.NoError(b, app.Commit()) + + // Trigger checkpoint call. + b.StartTimer() + err = db.truncate(-1) + require.NoError(b, err, "db.truncate") + require.NoError(b, db.Close()) +} + +type checkpointTestSamplesParams struct { + labelPrefix string + numDatapoints int + numHistograms int + numSeries int +} + +type checkpointTestSamples struct { + datapointLabels [][]labels.Label + histogramLabels [][]labels.Label + datapointSamples [][]chunks.Sample + histogramSamples [][]*histogram.Histogram +} + +func genCheckpointTestSamples(p checkpointTestSamplesParams) checkpointTestSamples { + out := checkpointTestSamples{ + datapointLabels: labelsForTest(p.labelPrefix, p.numSeries), + histogramLabels: labelsForTest(p.labelPrefix+"_histogram", p.numSeries), + datapointSamples: make([][]chunks.Sample, 0, p.numSeries), + histogramSamples: make([][]*histogram.Histogram, 0, p.numSeries), + } + + for range p.numDatapoints { + sample := chunks.GenerateSamples(0, 1) + out.datapointSamples = append(out.datapointSamples, sample) + } + + for range out.histogramLabels { + histograms := tsdbutil.GenerateTestHistograms(p.numHistograms) + out.histogramSamples = append(out.histogramSamples, histograms) + } + + return out +} + +type checkpointFixtureParams struct { + dir string + numSegments int + segmentSize int + dtDelta int64 + seriesLabels [][]labels.Label +} + +func createCheckpointFixtures(t testing.TB, p checkpointFixtureParams) { + // Make a segment to put initial data + var enc record.Encoder + + // Create dummy segment to bump the start segment number. + // Dummy segment should be zero or agent.Open() will fail. + seg, err := wlog.CreateSegment(p.dir, 0) + require.NoError(t, err) + require.NoError(t, seg.Close()) + + w, err := wlog.NewSize(promslog.NewNopLogger(), nil, p.dir, p.segmentSize, DefaultOptions().WALCompression) + require.NoError(t, err) + + series := make([]record.RefSeries, 0, len(p.seriesLabels)) + for i, lset := range p.seriesLabels { + // NOTE: don't append RefMetadata as agent.DB doesn't support it during WAL replay. + series = append(series, record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: labels.New(lset...), + }) + } + + var dt int64 + samples := make([]record.RefSample, 0, len(series)) + for i := range p.numSegments { + if i == 0 { + // Write series required for samples + b := enc.Series(series, nil) + require.NoError(t, w.Log(b)) + } + + samples = samples[:0] + for j := range len(series) { + samples = append(samples, record.RefSample{ + Ref: chunks.HeadSeriesRef(j), + V: float64(i), + T: dt + int64(j+1), + }) + } + require.NoError(t, w.Log(enc.Samples(samples, nil))) + dt += p.dtDelta + } + require.NoError(t, w.Close(), "WAL.Close") +} diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 68e871cd00..d908f429a0 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -99,6 +99,15 @@ type Options struct { // persisted to the WAL for samples that include a non-zero start timestamp in // supported record types. EnableSTStorage bool + + // CheckpointFromInMemorySeries changes checkpoint implementation to use only in-memory series data when building a checkpoint. + // This prevents re-reading the previous checkpoint and segments from disk. + CheckpointFromInMemorySeries bool + + // CheckpointBatchSize specifies a size of a single WAL log entry chunk to be flushed. + // + // Has no effect if CheckpointFromInMemorySeries is false. + CheckpointBatchSize int } // DefaultOptions used for the WAL storage. They are reasonable for setups using @@ -238,6 +247,11 @@ func (m *dbMetrics) Unregister() { } } +type deletedRefMeta struct { + lastSegment int + labels labels.Labels +} + // DB represents a WAL-only storage. It implements storage.DB. type DB struct { mtx sync.RWMutex @@ -263,7 +277,7 @@ type DB struct { series *stripeSeries // deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they // must be kept around to). - deleted map[chunks.HeadSeriesRef]int + deleted map[chunks.HeadSeriesRef]deletedRefMeta donec chan struct{} stopc chan struct{} @@ -305,7 +319,7 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str nextRef: atomic.NewUint64(0), series: newStripeSeries(opts.StripeSize), - deleted: make(map[chunks.HeadSeriesRef]int), + deleted: make(map[chunks.HeadSeriesRef]deletedRefMeta), donec: make(chan struct{}), stopc: make(chan struct{}), @@ -566,8 +580,12 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri // We want to track the largest segment where we encountered the duplicate ref, so we can ensure // it remains in the checkpoint until we get past that segment. - if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + if meta := db.deleted[entry.Ref]; meta.lastSegment <= currentSegmentOrCheckpoint { + var lbls labels.Labels + if db.opts.CheckpointFromInMemorySeries { + lbls = entry.Labels + } + db.deleted[entry.Ref] = deletedRefMeta{lastSegment: currentSegmentOrCheckpoint, labels: lbls} } } else { db.metrics.numActiveSeries.Inc() @@ -582,8 +600,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { // We want to track the largest segment where we encountered the duplicate ref, so we can ensure // it remains in the checkpoint until we get past that segment. - if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint { + meta.lastSegment = currentSegmentOrCheckpoint + db.deleted[entry.Ref] = meta } entry.Ref = ref } @@ -605,8 +624,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { // We want to track the largest segment where we encountered the duplicate ref, so we can ensure // it remains in the checkpoint until we get past that segment. - if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint { + meta.lastSegment = currentSegmentOrCheckpoint + db.deleted[entry.Ref] = meta } entry.Ref = ref } @@ -628,8 +648,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri if ref, ok := duplicateRefToValidRef[entry.Ref]; ok { // We want to track the largest segment where we encountered the duplicate ref, so we can ensure // it remains in the checkpoint until we get past that segment. - if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint { - db.deleted[entry.Ref] = currentSegmentOrCheckpoint + if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint { + meta.lastSegment = currentSegmentOrCheckpoint + db.deleted[entry.Ref] = meta } entry.Ref = ref } @@ -713,8 +734,8 @@ func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef } // Keep the record if the series was recently deleted. - seg, ok := db.deleted[id] - return ok && seg > last + meta, ok := db.deleted[id] + return ok && meta.lastSegment > last } } @@ -753,7 +774,13 @@ func (db *DB) truncate(mint int64) error { db.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint, db.opts.EnableSTStorage); err != nil { + if db.opts.CheckpointFromInMemorySeries { + err = Checkpoint(db.logger, db.wal, last, db.opts.CheckpointBatchSize, db.series.allSeries(), deletedSeriesIter(db.deleted, last)) + } else { + _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint, db.opts.EnableSTStorage) + } + + if err != nil { db.metrics.checkpointCreationFail.Inc() var cerr *wlog.CorruptionErr if errors.As(err, &cerr) { @@ -770,8 +797,8 @@ func (db *DB) truncate(mint int64) error { // The checkpoint is written and segments before it are truncated, so we // no longer need to track deleted series that were being kept around. - for ref, segment := range db.deleted { - if segment <= last { + for ref, meta := range db.deleted { + if meta.lastSegment <= last { delete(db.deleted, ref) } } @@ -795,7 +822,7 @@ func (db *DB) truncate(mint int64) error { // gc marks ref IDs that have not received a sample since mint as deleted in // s.deleted, along with the segment where they originally got deleted. func (db *DB) gc(mint int64) { - deleted := db.series.GC(mint) + deleted := db.series.GC(mint, db.opts.CheckpointFromInMemorySeries) db.metrics.numActiveSeries.Sub(float64(len(deleted))) _, last, _ := wlog.Segments(db.wal.Dir()) @@ -803,8 +830,8 @@ func (db *DB) gc(mint int64) { // We want to keep series records for any newly deleted series // until we've passed the last recorded segment. This prevents // the WAL having samples for series records that no longer exist. - for ref := range deleted { - db.deleted[ref] = last + for ref, lset := range deleted { + db.deleted[ref] = deletedRefMeta{lastSegment: last, labels: lset} } db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 465748f810..74dc7fd1b9 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -1533,7 +1533,7 @@ func BenchmarkGetOrCreate(b *testing.B) { for i := range b.N { if i%n == 0 && i > 0 { b.StopTimer() - _ = s.series.GC(math.MaxInt64) + _ = s.series.GC(math.MaxInt64, false) b.StartTimer() } app.getOrCreate(0, lbls[i%n]) diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index 9aa3143459..14cc80de9c 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -14,6 +14,7 @@ package agent import ( + "iter" "sync" "github.com/prometheus/prometheus/model/exemplar" @@ -45,6 +46,18 @@ func (m *memSeries) updateTimestamp(newTs int64) bool { return false } +func (m *memSeries) Ref() chunks.HeadSeriesRef { + return m.ref +} + +func (m *memSeries) Labels() labels.Labels { + return m.lset +} + +func (m *memSeries) LastSampleTimestamp() int64 { + return m.lastTs +} + // seriesHashmap lets agent find a memSeries by its label set, via a 64-bit hash. // There is one map for the common case where the hash value is unique, and a // second map for the case that two series have the same hash value. @@ -160,14 +173,15 @@ func newStripeSeries(stripeSize int) *stripeSeries { // GC garbage collects old series that have not received a sample after mint // and will fully delete them. -func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { +func (s *stripeSeries) GC(mint int64, retainLabels bool) map[chunks.HeadSeriesRef]labels.Labels { // gcMut serializes GC calls. Within a single GC pass, the check function // holds hashLock and then acquires refLock — callers must never hold both // simultaneously, which SetUnlessAlreadySet satisfies. s.gcMut.Lock() defer s.gcMut.Unlock() - deleted := map[chunks.HeadSeriesRef]struct{}{} + // labels of deleted series are used by agent.Checkpoint + deleted := map[chunks.HeadSeriesRef]labels.Labels{} // For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID. check := func(hashLock int, hash uint64, series *memSeries) { @@ -186,7 +200,12 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { s.locks[refLock].Lock() } - deleted[series.ref] = struct{}{} + if retainLabels { + deleted[series.ref] = series.lset + } else { + deleted[series.ref] = labels.EmptyLabels() + } + delete(s.series[refLock], series.ref) s.hashes[hashLock].Delete(hash, series.ref) @@ -305,3 +324,87 @@ func (s *stripeSeries) hashLock(hash uint64) uint64 { func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 { return uint64(ref) & uint64(s.size-1) } + +var _ ActiveSeries = (*seriesSnapshot)(nil) + +// seriesSnapshot is a point-in-time copy of a memSeries fields. +// It is used to avoid holding series locks during checkpoint I/O. +type seriesSnapshot struct { + ref chunks.HeadSeriesRef + lset labels.Labels + lastTs int64 +} + +func (s *seriesSnapshot) Ref() chunks.HeadSeriesRef { + return s.ref +} + +func (s *seriesSnapshot) Labels() labels.Labels { + return s.lset +} + +func (s *seriesSnapshot) LastSampleTimestamp() int64 { + return s.lastTs +} + +func (s *stripeSeries) allSeries() iter.Seq[ActiveSeries] { + return func(yield func(ActiveSeries) bool) { + var buf []*memSeries + for i := 0; i < s.size; i++ { + // Collect pointers under RLock to avoid blocking appenders during I/O. + s.locks[i].RLock() + buf = buf[:0] + for _, series := range s.series[i] { + buf = append(buf, series) + } + s.locks[i].RUnlock() + + // Snapshot and yield outside the stripe lock so that + // slow consumers (e.g. checkpoint disk I/O) do not + // block appends that need a write lock on this stripe. + for _, series := range buf { + series.Lock() + snapshot := seriesSnapshot{ + ref: series.ref, + lset: series.lset, + lastTs: series.lastTs, + } + series.Unlock() + + if !yield(&snapshot) { + return + } + } + } + } +} + +// deletedSeriesIter returns an iterator over deleted series from the given map. +// Only series whose lastSegment is greater than last are emitted, matching the +// filtering behaviour of [wlog.Checkpoint] keep function. +func deletedSeriesIter(m map[chunks.HeadSeriesRef]deletedRefMeta, last int) iter.Seq[DeletedSeries] { + return func(yield func(DeletedSeries) bool) { + for ref, meta := range m { + if meta.lastSegment > last { + if !yield(deletedSeries{ref: ref, labels: meta.labels}) { + return + } + } + } + } +} + +var _ DeletedSeries = deletedSeries{} + +type deletedSeries struct { + ref chunks.HeadSeriesRef + labels labels.Labels +} + +func (series deletedSeries) Ref() chunks.HeadSeriesRef { + return series.ref +} + +func (series deletedSeries) Labels() labels.Labels { + return series.labels +} diff --git a/tsdb/agent/series_test.go b/tsdb/agent/series_test.go index a5a6f04f4b..4039c3fca0 100644 --- a/tsdb/agent/series_test.go +++ b/tsdb/agent/series_test.go @@ -40,7 +40,7 @@ func TestNoDeadlock(t *testing.T) { go func() { defer wg.Done() <-started - _ = stripeSeries.GC(math.MaxInt64) + _ = stripeSeries.GC(math.MaxInt64, false) }() } @@ -224,7 +224,7 @@ func TestSetUnlessAlreadySetConcurrentGC(t *testing.T) { case <-done: return default: - ss.GC(1) // removes series with lastTs < 1, i.e. lastTs==0 + ss.GC(1, false) // removes series with lastTs < 1, i.e. lastTs==0 } } }() @@ -248,7 +248,7 @@ func TestSetUnlessAlreadySetConcurrentGC(t *testing.T) { // A final synchronous GC pass ensures all eligible series are fully removed, // then verify they are unreachable via both lookup paths. - ss.GC(1) + ss.GC(1, false) for _, s := range eligible { require.Nil(t, ss.GetByID(s.ref)) require.Nil(t, ss.GetByHash(s.lset.Hash(), s.lset)) @@ -259,7 +259,7 @@ func TestStripeSeries_gc(t *testing.T) { s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) hash := ms1.lset.Hash() - s.GC(1) + s.GC(1, false) // Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series got := s.GetByHash(hash, ms1.lset) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index a41935044d..e1212b280e 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -82,8 +82,8 @@ func DeleteCheckpoints(dir string, maxIndex int) error { return errors.Join(errs...) } -// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files. -const checkpointTempFileSuffix = ".tmp" +// CheckpointTempFileSuffix is the suffix used when creating temporary checkpoint files. +const CheckpointTempFileSuffix = ".tmp" // DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory. func DeleteTempCheckpoints(logger *slog.Logger, dir string) error { @@ -137,8 +137,8 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He return nil, err } - cpdir := checkpointDir(w.Dir(), to) - cpdirtmp := cpdir + checkpointTempFileSuffix + cpdir := CheckpointDir(w.Dir(), to) + cpdirtmp := cpdir + CheckpointTempFileSuffix if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return nil, fmt.Errorf("create checkpoint dir: %w", err) @@ -407,7 +407,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // checkpointPrefix is the prefix used for checkpoint files. const checkpointPrefix = "checkpoint." -func checkpointDir(dir string, i int) string { +func CheckpointDir(dir string, i int) string { return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i)) } @@ -446,5 +446,5 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { } func isTempDir(fi fs.DirEntry) bool { - return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix) + return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), CheckpointTempFileSuffix) }