diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index ca908c8b29..fc28ccd3f7 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -557,6 +557,12 @@ func main() {
"Maximum age samples may be before being forcibly deleted when the WAL is truncated").
Default(agentDefaultMaxWALTime).SetValue(&cfg.agent.MaxWALTime)
+ agentOnlyFlag(a, "storage.agent.checkpoint-from-in-memory-series", "Use only in-memory series data when building a checkpoint.").
+ Default("false").BoolVar(&cfg.agent.CheckpointFromInMemorySeries)
+
+ agentOnlyFlag(a, "storage.agent.checkpoint-batch-size", "Size of a single WAL log entry chunk to be flushed. Has no effect without --storage.agent.checkpoint-from-in-memory-series flag.").
+ Default("1000").IntVar(&cfg.agent.CheckpointBatchSize)
+
agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.agent.NoLockfile)
@@ -662,6 +668,11 @@ func main() {
os.Exit(3)
}
+ if agentMode && cfg.agent.CheckpointBatchSize <= 0 {
+ fmt.Fprintln(os.Stderr, "--storage.agent.checkpoint-batch-size must be greater than 0.")
+ os.Exit(1)
+ }
+
if cfg.memlimitRatio <= 0.0 || cfg.memlimitRatio > 1.0 {
fmt.Fprintf(os.Stderr, "--auto-gomemlimit.ratio must be greater than 0 and less than or equal to 1.")
os.Exit(1)
@@ -2078,15 +2089,17 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
// agentOptions is a version of agent.Options with defined units. This is required
// as agent.Option fields are unit agnostic (time).
type agentOptions struct {
- WALSegmentSize units.Base2Bytes
- WALCompressionType compression.Type
- StripeSize int
- TruncateFrequency model.Duration
- MinWALTime, MaxWALTime model.Duration
- NoLockfile bool
- OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove.
- EnableSTAsZeroSample bool
- EnableSTStorage bool
+ WALSegmentSize units.Base2Bytes
+ WALCompressionType compression.Type
+ StripeSize int
+ TruncateFrequency model.Duration
+ MinWALTime, MaxWALTime model.Duration
+ NoLockfile bool
+ OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove.
+ EnableSTAsZeroSample bool
+ EnableSTStorage bool
+ CheckpointFromInMemorySeries bool
+ CheckpointBatchSize int
}
func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options {
@@ -2094,16 +2107,18 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option
outOfOrderTimeWindow = 0
}
return agent.Options{
- WALSegmentSize: int(opts.WALSegmentSize),
- WALCompression: opts.WALCompressionType,
- StripeSize: opts.StripeSize,
- TruncateFrequency: time.Duration(opts.TruncateFrequency),
- MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
- MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
- NoLockfile: opts.NoLockfile,
- OutOfOrderTimeWindow: outOfOrderTimeWindow,
- EnableSTAsZeroSample: opts.EnableSTAsZeroSample,
- EnableSTStorage: opts.EnableSTStorage,
+ WALSegmentSize: int(opts.WALSegmentSize),
+ WALCompression: opts.WALCompressionType,
+ StripeSize: opts.StripeSize,
+ TruncateFrequency: time.Duration(opts.TruncateFrequency),
+ MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
+ MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
+ NoLockfile: opts.NoLockfile,
+ OutOfOrderTimeWindow: outOfOrderTimeWindow,
+ EnableSTAsZeroSample: opts.EnableSTAsZeroSample,
+ EnableSTStorage: opts.EnableSTStorage,
+ CheckpointFromInMemorySeries: opts.CheckpointFromInMemorySeries,
+ CheckpointBatchSize: opts.CheckpointBatchSize,
}
}
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index df0425c98d..e6e5b104f4 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -43,6 +43,8 @@ The Prometheus monitoring server
| --storage.agent.wal-compression | Compress the agent WAL. If false, the --storage.agent.wal-compression-type flag is ignored. Use with agent mode only. | `true` |
| --storage.agent.retention.min-time | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | `5m` |
| --storage.agent.retention.max-time | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | `4h` |
+| --storage.agent.checkpoint-from-in-memory-series | Use only in-memory series data when building a checkpoint. Use with agent mode only. | `false` |
+| --storage.agent.checkpoint-batch-size | Size of a single WAL log entry chunk to be flushed. Has no effect without --storage.agent.checkpoint-from-in-memory-series flag. Use with agent mode only. | `1000` |
| --storage.agent.no-lockfile | Do not create lockfile in data directory. Use with agent mode only. | `false` |
| --storage.remote.flush-deadline | How long to wait flushing sample on shutdown or config reload. | `1m` |
| --storage.remote.read-sample-limit | Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types. Use with server mode only. | `5e7` |
diff --git a/tsdb/agent/checkpoint.go b/tsdb/agent/checkpoint.go
new file mode 100644
index 0000000000..97d71c33bb
--- /dev/null
+++ b/tsdb/agent/checkpoint.go
@@ -0,0 +1,232 @@
+// Copyright The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package agent
+
+import (
+ "errors"
+ "fmt"
+ "iter"
+ "log/slog"
+ "os"
+
+ "github.com/prometheus/prometheus/model/labels"
+ "github.com/prometheus/prometheus/tsdb/chunks"
+ "github.com/prometheus/prometheus/tsdb/fileutil"
+ "github.com/prometheus/prometheus/tsdb/record"
+ "github.com/prometheus/prometheus/tsdb/wlog"
+)
+
+const defaultBatchSize = 1000
+
+// ActiveSeries describes a live series to be written by [Checkpoint].
+//
+// This interface is intentionally exported so downstream users of this package
+// can use [Checkpoint] without depending on Prometheus internal series types.
+type ActiveSeries interface {
+ Ref() chunks.HeadSeriesRef
+ Labels() labels.Labels
+ LastSampleTimestamp() int64
+}
+
+// DeletedSeries describes a deleted series to be written by [Checkpoint].
+//
+// This interface is intentionally exported so downstream users of this package
+// can use [Checkpoint] without depending on Prometheus internal series types.
+type DeletedSeries interface {
+ Ref() chunks.HeadSeriesRef
+ Labels() labels.Labels
+}
+
+// Checkpoint creates an unindexed checkpoint containing record.RefSeries and
+// last timestamp for ActiveSeries and record.RefSeries for DeletedSeries.
+//
+// This API accepts interfaces so downstream users of this package can provide
+// their own series storage while reusing Prometheus checkpoint writing logic.
+//
+// The difference between this implementation and [wlog.Checkpoint] is that it skips re-read current checkpoint + segments
+// and relies on data in memory.
+func Checkpoint(logger *slog.Logger, w *wlog.WL, atIndex, batchSize int, activeSeries iter.Seq[ActiveSeries], deletedSeries iter.Seq[DeletedSeries]) error {
+ if batchSize <= 0 {
+ batchSize = defaultBatchSize
+ }
+ logger.Info("Creating checkpoint", "atIndex", atIndex)
+
+ dir, idx, err := wlog.LastCheckpoint(w.Dir())
+ if err != nil && !errors.Is(err, record.ErrNotFound) {
+ return fmt.Errorf("can't find last checkpoint: %w", err)
+ }
+
+ if idx >= atIndex {
+ logger.Info(
+ "checkpoint already exists",
+ "dir", dir,
+ "index", idx,
+ "requested_index", atIndex,
+ )
+ return nil
+ }
+
+ if err := wlog.DeleteTempCheckpoints(logger, w.Dir()); err != nil {
+ return fmt.Errorf("failed to cleanup temporary checkpoints: %w", err)
+ }
+
+ cpDir := wlog.CheckpointDir(w.Dir(), atIndex)
+ cpTmpDir := cpDir + wlog.CheckpointTempFileSuffix
+
+ if err := os.MkdirAll(cpTmpDir, os.ModePerm); err != nil {
+ return fmt.Errorf("create checkpoint dir: %w", err)
+ }
+
+ cp, err := wlog.New(logger, nil, cpTmpDir, w.CompressionType())
+ if err != nil {
+ return fmt.Errorf("open checkpoint: %w", err)
+ }
+
+ success := false
+ defer func() {
+ if !success {
+ cp.Close()
+ }
+ os.RemoveAll(cpTmpDir)
+ }()
+
+ flusher := newCheckpointFlusher(cp, batchSize)
+ if err := flusher.writeSeries(activeSeries); err != nil {
+ return err
+ }
+
+ if err := flusher.writeDeletedRecords(deletedSeries); err != nil {
+ return err
+ }
+
+ success = true
+ if err := cp.Close(); err != nil {
+ return fmt.Errorf("close checkpoint: %w", err)
+ }
+
+ df, err := fileutil.OpenDir(cpTmpDir)
+ if err != nil {
+ return fmt.Errorf("open temporary checkpoint directory: %w", err)
+ }
+ if err := df.Sync(); err != nil {
+ df.Close()
+ return fmt.Errorf("sync temporary checkpoint directory: %w", err)
+ }
+ if err = df.Close(); err != nil {
+ return fmt.Errorf("close temporary checkpoint directory: %w", err)
+ }
+
+ if err := fileutil.Replace(cpTmpDir, cpDir); err != nil {
+ return fmt.Errorf("rename checkpoint directory: %w", err)
+ }
+
+ return nil
+}
+
+type checkpointWriter struct {
+ enc record.Encoder
+ checkpoint *wlog.WL
+ seriesBuff []byte
+ samplesBuff []byte
+
+ seriesRecords []record.RefSeries
+ sampleRecords []record.RefSample
+ batchSize int
+}
+
+func newCheckpointFlusher(checkpoint *wlog.WL, batchSize int) *checkpointWriter {
+ return &checkpointWriter{
+ batchSize: batchSize,
+ checkpoint: checkpoint,
+ seriesRecords: make([]record.RefSeries, 0, batchSize),
+ sampleRecords: make([]record.RefSample, 0, batchSize),
+ }
+}
+
+func (cf *checkpointWriter) flushRecords() error {
+ withSamples := len(cf.sampleRecords) > 0
+ cf.seriesBuff = cf.enc.Series(cf.seriesRecords, cf.seriesBuff)
+
+ var err error
+ if withSamples {
+ cf.samplesBuff = cf.enc.Samples(cf.sampleRecords, cf.samplesBuff)
+ err = cf.checkpoint.Log(cf.seriesBuff, cf.samplesBuff)
+ } else {
+ err = cf.checkpoint.Log(cf.seriesBuff)
+ }
+
+ if err != nil {
+ return fmt.Errorf("flush records: %w", err)
+ }
+
+ cf.seriesBuff = cf.seriesBuff[:0]
+ cf.samplesBuff = cf.samplesBuff[:0]
+ cf.seriesRecords = cf.seriesRecords[:0]
+ cf.sampleRecords = cf.sampleRecords[:0]
+ return nil
+}
+
+func (cf *checkpointWriter) writeSeries(seriesIter iter.Seq[ActiveSeries]) error {
+ for series := range seriesIter {
+ // If we filled the buffers, write them out and reset.
+ if len(cf.seriesRecords) == cf.batchSize {
+ if err := cf.flushRecords(); err != nil {
+ return fmt.Errorf("flush active series: %w", err)
+ }
+ }
+
+ cf.seriesRecords = append(cf.seriesRecords, record.RefSeries{
+ Ref: series.Ref(),
+ Labels: series.Labels(),
+ })
+
+ // Sample value is irrelevant, we only need the timestamp.
+ cf.sampleRecords = append(cf.sampleRecords, record.RefSample{
+ Ref: series.Ref(),
+ T: series.LastSampleTimestamp(),
+ V: 0,
+ })
+ }
+
+ // Flush the last batch if we have one
+ if len(cf.seriesRecords) != 0 {
+ return cf.flushRecords()
+ }
+
+ return nil
+}
+
+func (cf *checkpointWriter) writeDeletedRecords(seriesRefIter iter.Seq[DeletedSeries]) error {
+ for series := range seriesRefIter {
+ // If we filled the buffers, write them out and reset.
+ if len(cf.seriesRecords) == cf.batchSize {
+ if err := cf.flushRecords(); err != nil {
+ return fmt.Errorf("flush deleted series: %w", err)
+ }
+ }
+
+ // We don't care about timestamps here, so no samples.
+ cf.seriesRecords = append(cf.seriesRecords, record.RefSeries{
+ Ref: series.Ref(),
+ Labels: series.Labels(),
+ })
+ }
+
+ // Clear the last batch if we have one
+ if len(cf.seriesRecords) != 0 {
+ return cf.flushRecords()
+ }
+
+ return nil
+}
diff --git a/tsdb/agent/checkpoint_test.go b/tsdb/agent/checkpoint_test.go
new file mode 100644
index 0000000000..87d41fc41a
--- /dev/null
+++ b/tsdb/agent/checkpoint_test.go
@@ -0,0 +1,466 @@
+// Copyright The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package agent
+
+import (
+ "errors"
+ "fmt"
+ "io/fs"
+ "maps"
+ "math"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/prometheus/common/promslog"
+ "github.com/stretchr/testify/require"
+
+ "github.com/prometheus/prometheus/model/exemplar"
+ "github.com/prometheus/prometheus/model/histogram"
+ "github.com/prometheus/prometheus/model/labels"
+ "github.com/prometheus/prometheus/storage/remote"
+ "github.com/prometheus/prometheus/tsdb/chunks"
+ "github.com/prometheus/prometheus/tsdb/record"
+ "github.com/prometheus/prometheus/tsdb/tsdbutil"
+ "github.com/prometheus/prometheus/tsdb/wlog"
+)
+
+const walSegmentSize = 32 << 10 // must be aligned to the page size
+
+func TestCheckpointReplayCompatibility(t *testing.T) {
+ // Test to ensure that WAL replay between wlog.Checkpoint and agent.Checkpoint are the same.
+ var (
+ wlogAfterSeries *stripeSeries
+ agentAfterSeries *stripeSeries
+ )
+
+ openDBAndDo := func(isInMemCheckpoint bool, storageDir string, fn func(db *DB)) {
+ l := promslog.NewNopLogger()
+ rs := remote.NewStorage(
+ promslog.NewNopLogger(), nil,
+ startTime, storageDir,
+ 30*time.Second, nil, false,
+ )
+ defer rs.Close()
+
+ opts := DefaultOptions()
+ opts.CheckpointFromInMemorySeries = isInMemCheckpoint
+ opts.WALSegmentSize = walSegmentSize // Set minimum size to get more segments for checkpoint.
+
+ db, err := Open(l, nil, rs, storageDir, opts)
+ require.NoError(t, err, "Open")
+ fn(db)
+ }
+
+ // Prepare samples and labels that will be written into appender.
+ samples := genCheckpointTestSamples(checkpointTestSamplesParams{
+ labelPrefix: t.Name(),
+ numDatapoints: 3,
+ numHistograms: 3,
+ numSeries: 300,
+ })
+
+ appendData := func(db *DB) {
+ app := db.Appender(t.Context())
+ const flushEvery = 1000
+ n := 0
+ maybeFlush := func() {
+ if n < flushEvery {
+ return
+ }
+ require.NoError(t, app.Commit())
+ app = db.Appender(t.Context())
+ n = 0
+ }
+
+ lbls := samples.datapointLabels
+ for i, l := range lbls {
+ lset := labels.New(l...)
+ for j, sample := range samples.datapointSamples {
+ st := sample[0].T()
+ sf := sample[0].F()
+
+ // replay doesn't include exemplars, thus don't include them to remove them from assertion.
+ _, err := app.Append(0, lset, st, sf)
+ require.NoErrorf(t, err, "L: %v; S: %v", i, j)
+ n++
+ maybeFlush()
+ }
+ }
+
+ for i, l := range samples.histogramLabels {
+ lset := labels.New(l...)
+ histograms := samples.histogramSamples[i]
+ for j, sample := range histograms {
+ _, err := app.AppendHistogram(0, lset, int64(j), sample, nil)
+ require.NoError(t, err)
+ n++
+ maybeFlush()
+ }
+ }
+
+ require.NoError(t, app.Commit())
+ }
+
+ // Write and replay for old wlog.Checkpoint
+
+ // wlog.Open expects to have a "wal" subdirectory
+ wlogStateRoot := filepath.Join(t.TempDir(), "state-wlog")
+ wlogWalDir := filepath.Join(wlogStateRoot, "wal")
+ require.NoError(t, os.MkdirAll(wlogWalDir, os.ModePerm))
+
+ openDBAndDo(false, wlogStateRoot, func(db *DB) {
+ appendData(db)
+
+ // Trigger checkpoint call.
+ err := db.truncate(-1)
+ require.NoError(t, err, "db.truncate")
+ require.NoError(t, db.Close())
+ })
+ assertCheckpointExists(t, wlogWalDir, 1)
+
+ // Restore the database from the checkpoint.
+ openDBAndDo(true, wlogStateRoot, func(db *DB) {
+ defer db.Close()
+ wlogAfterSeries = db.series
+ })
+
+ // Write and replay using agent.Checkpoint:
+ agentStateRoot := filepath.Join(t.TempDir(), "state-agent")
+ agentWalDir := filepath.Join(agentStateRoot, "wal")
+ require.NoError(t, os.MkdirAll(agentWalDir, os.ModePerm))
+
+ openDBAndDo(true, agentStateRoot, func(db *DB) {
+ appendData(db)
+
+ err := db.truncate(-1)
+ require.NoError(t, err, "db.truncate")
+ require.NoError(t, db.Close())
+ })
+
+ assertCheckpointExists(t, agentWalDir, 1)
+ openDBAndDo(true, agentStateRoot, func(db *DB) {
+ defer db.Close()
+ agentAfterSeries = db.series
+ })
+
+ requireStripeSeriesEqual(t, wlogAfterSeries, agentAfterSeries)
+}
+
+// requireStripeSeriesEqual asserts that two stripeSeries are semantically
+// equivalent: same set of refs, each memSeries has matching labels (by
+// content) and lastTs. It avoids reflect.DeepEqual on labels.Labels because
+// under -tags=dedupelabels the struct carries a per-instance nameTable
+// pointer and symbol-table IDs whose layout depends on the order in which
+// records were interned during replay — an implementation detail, not a
+// behavioural property.
+func requireStripeSeriesEqual(t *testing.T, want, got *stripeSeries) {
+ t.Helper()
+
+ require.Equal(t, want.size, got.size, "stripeSeries size mismatch")
+
+ collect := func(s *stripeSeries) map[chunks.HeadSeriesRef]*memSeries {
+ out := map[chunks.HeadSeriesRef]*memSeries{}
+ for _, m := range s.series {
+ maps.Copy(out, m)
+ }
+ return out
+ }
+ wantByRef := collect(want)
+ gotByRef := collect(got)
+
+ require.Len(t, gotByRef, len(wantByRef), "series count mismatch")
+
+ for ref, w := range wantByRef {
+ g, ok := gotByRef[ref]
+ require.Truef(t, ok, "ref %d present in wlog path, missing in agent path", ref)
+ require.Truef(t, labels.Equal(w.lset, g.lset),
+ "ref %d labels mismatch: wlog=%s agent=%s", ref, w.lset.String(), g.lset.String())
+ require.Equalf(t, w.lastTs, g.lastTs, "ref %d lastTs mismatch", ref)
+ }
+}
+
+func assertCheckpointExists(t *testing.T, walDir string, checkpointID int) {
+ d := wlog.CheckpointDir(walDir, checkpointID)
+ v, err := os.Stat(d)
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ t.Fatalf("checkpoint doesn't exists in WAL dir %q", walDir)
+ return
+ }
+
+ t.Fatalf("can't stat checkpoint dir %q", err)
+ return
+ }
+
+ require.True(t, v.IsDir(), "checkpoint should be a dir")
+}
+
+// To run the benchmark and display a diff, use the following command:
+//
+// go test -bench="BenchmarkCheckpoint" . -run ^$ -benchmem -count 6 -benchtime 5s | tee benchmarks
+// benchstat -col '/checkpoint' benchmarks
+func BenchmarkCheckpoint(b *testing.B) {
+ // Prepare in advance samples and labels that will be written into appender.
+ samples := genCheckpointTestSamples(checkpointTestSamplesParams{
+ labelPrefix: b.Name(),
+ numDatapoints: 10,
+ numHistograms: 10,
+ numSeries: 3600,
+ })
+
+ // Prepare initial wlog state with segments.
+ testSamplesSrcDir := filepath.Join(b.TempDir(), "samples-src")
+ require.NoError(b, os.Mkdir(testSamplesSrcDir, os.ModePerm))
+ createCheckpointFixtures(b, checkpointFixtureParams{
+ dir: testSamplesSrcDir,
+ numSegments: 512,
+ dtDelta: 10000,
+ segmentSize: walSegmentSize, // must be aligned to the page size
+ seriesLabels: samples.datapointLabels,
+ })
+
+ configs := []struct {
+ name string
+ useAgentCheckpoint bool
+ }{
+ {
+ name: "wlog",
+ useAgentCheckpoint: false,
+ },
+ {
+ name: "agent",
+ useAgentCheckpoint: true,
+ },
+ }
+
+ for _, cfg := range configs {
+ tname := fmt.Sprintf("checkpoint=%s", cfg.name)
+ b.Run(tname, func(b *testing.B) {
+ // Copy initial wlog state into a scratch directory for test.
+ // wlog.Open expects to have a "wal" subdirectory
+ wlogDir := filepath.Join(b.TempDir(), "testdata", "wal")
+ err := os.CopyFS(wlogDir, os.DirFS(testSamplesSrcDir))
+ require.NoErrorf(b, err, "failed to copy test samples from %q to %q", testSamplesSrcDir, wlogDir)
+ storageDir := filepath.Dir(wlogDir)
+
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ for b.Loop() {
+ benchCheckpoint(b, benchCheckpointParams{
+ storageDir: storageDir,
+ samples: samples,
+ skipCurrentCheckpointReRead: cfg.useAgentCheckpoint,
+ })
+
+ // Get the size of the checkpoint directory
+ checkpointSize := getCheckpointSize(b, wlogDir)
+ b.ReportMetric(float64(checkpointSize), "checkpoint_size")
+ }
+ })
+ }
+}
+
+func getCheckpointSize(b testing.TB, walDir string) int64 {
+ dirName, _, err := wlog.LastCheckpoint(walDir)
+ require.NoError(b, err, "can't find the last checkpoint")
+ // Walk through a dir and accumulate total size of all files
+ var size int64
+ err = filepath.WalkDir(dirName, func(_ string, d fs.DirEntry, err error) error {
+ if err != nil {
+ return err
+ }
+ if !d.IsDir() {
+ info, err := d.Info()
+ if err != nil {
+ return err
+ }
+ size += info.Size()
+ }
+ return nil
+ })
+ require.NoError(b, err, "can't walk through the checkpoint dir")
+ return size
+}
+
+type benchCheckpointParams struct {
+ storageDir string
+ skipCurrentCheckpointReRead bool
+ samples checkpointTestSamples
+}
+
+func benchCheckpoint(b *testing.B, p benchCheckpointParams) {
+ b.StopTimer()
+
+ l := promslog.NewNopLogger()
+ rs := remote.NewStorage(
+ promslog.NewNopLogger(), nil,
+ startTime, p.storageDir,
+ 30*time.Second, nil, false,
+ )
+ defer rs.Close()
+
+ opts := DefaultOptions()
+ opts.OutOfOrderTimeWindow = math.MaxInt64 // Fixes "out of order sample" in benchmarks.
+ opts.CheckpointFromInMemorySeries = p.skipCurrentCheckpointReRead
+ opts.WALSegmentSize = walSegmentSize // Set minimum size to get more segments for checkpoint.
+
+ db, err := Open(l, nil, rs, p.storageDir, opts)
+ require.NoError(b, err, "Open")
+
+ app := db.Appender(b.Context())
+ const flushEvery = 1000
+ n := 0
+ maybeFlush := func() {
+ if n < flushEvery {
+ return
+ }
+ require.NoError(b, app.Commit())
+ app = db.Appender(b.Context())
+ n = 0
+ }
+
+ lbls := p.samples.datapointLabels
+ for i, l := range lbls {
+ lset := labels.New(l...)
+ for j, sample := range p.samples.datapointSamples {
+ st := sample[0].T()
+ sf := sample[0].F()
+ ref, err := app.Append(0, lset, st, sf)
+ require.NoErrorf(b, err, "L: %v; S: %v", i, j)
+
+ e := exemplar.Exemplar{
+ Labels: lset,
+ Ts: sample[0].T() + int64(i),
+ Value: sample[0].F(),
+ HasTs: true,
+ }
+
+ _, err = app.AppendExemplar(ref, lset, e)
+ require.NoError(b, err)
+
+ n += 2
+ maybeFlush()
+ }
+ }
+
+ for i, l := range p.samples.histogramLabels {
+ lset := labels.New(l...)
+ histograms := p.samples.histogramSamples[i]
+ for j, sample := range histograms {
+ _, err := app.AppendHistogram(0, lset, int64(j), sample, nil)
+ require.NoError(b, err)
+ n++
+ maybeFlush()
+ }
+ }
+
+ require.NoError(b, app.Commit())
+
+ // Trigger checkpoint call.
+ b.StartTimer()
+ err = db.truncate(-1)
+ require.NoError(b, err, "db.truncate")
+ require.NoError(b, db.Close())
+}
+
+type checkpointTestSamplesParams struct {
+ labelPrefix string
+ numDatapoints int
+ numHistograms int
+ numSeries int
+}
+
+type checkpointTestSamples struct {
+ datapointLabels [][]labels.Label
+ histogramLabels [][]labels.Label
+ datapointSamples [][]chunks.Sample
+ histogramSamples [][]*histogram.Histogram
+}
+
+func genCheckpointTestSamples(p checkpointTestSamplesParams) checkpointTestSamples {
+ out := checkpointTestSamples{
+ datapointLabels: labelsForTest(p.labelPrefix, p.numSeries),
+ histogramLabels: labelsForTest(p.labelPrefix+"_histogram", p.numSeries),
+ datapointSamples: make([][]chunks.Sample, 0, p.numSeries),
+ histogramSamples: make([][]*histogram.Histogram, 0, p.numSeries),
+ }
+
+ for range p.numDatapoints {
+ sample := chunks.GenerateSamples(0, 1)
+ out.datapointSamples = append(out.datapointSamples, sample)
+ }
+
+ for range out.histogramLabels {
+ histograms := tsdbutil.GenerateTestHistograms(p.numHistograms)
+ out.histogramSamples = append(out.histogramSamples, histograms)
+ }
+
+ return out
+}
+
+type checkpointFixtureParams struct {
+ dir string
+ numSegments int
+ segmentSize int
+ dtDelta int64
+ seriesLabels [][]labels.Label
+}
+
+func createCheckpointFixtures(t testing.TB, p checkpointFixtureParams) {
+ // Make a segment to put initial data
+ var enc record.Encoder
+
+ // Create dummy segment to bump the start segment number.
+ // Dummy segment should be zero or agent.Open() will fail.
+ seg, err := wlog.CreateSegment(p.dir, 0)
+ require.NoError(t, err)
+ require.NoError(t, seg.Close())
+
+ w, err := wlog.NewSize(promslog.NewNopLogger(), nil, p.dir, p.segmentSize, DefaultOptions().WALCompression)
+ require.NoError(t, err)
+
+ series := make([]record.RefSeries, 0, len(p.seriesLabels))
+ for i, lset := range p.seriesLabels {
+ // NOTE: don't append RefMetadata as agent.DB doesn't support it during WAL replay.
+ series = append(series, record.RefSeries{
+ Ref: chunks.HeadSeriesRef(i),
+ Labels: labels.New(lset...),
+ })
+ }
+
+ var dt int64
+ samples := make([]record.RefSample, 0, len(series))
+ for i := range p.numSegments {
+ if i == 0 {
+ // Write series required for samples
+ b := enc.Series(series, nil)
+ require.NoError(t, w.Log(b))
+ }
+
+ samples = samples[:0]
+ for j := range len(series) {
+ samples = append(samples, record.RefSample{
+ Ref: chunks.HeadSeriesRef(j),
+ V: float64(i),
+ T: dt + int64(j+1),
+ })
+ }
+ require.NoError(t, w.Log(enc.Samples(samples, nil)))
+ dt += p.dtDelta
+ }
+ require.NoError(t, w.Close(), "WAL.Close")
+}
diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go
index 68e871cd00..d908f429a0 100644
--- a/tsdb/agent/db.go
+++ b/tsdb/agent/db.go
@@ -99,6 +99,15 @@ type Options struct {
// persisted to the WAL for samples that include a non-zero start timestamp in
// supported record types.
EnableSTStorage bool
+
+ // CheckpointFromInMemorySeries changes checkpoint implementation to use only in-memory series data when building a checkpoint.
+ // This prevents re-reading the previous checkpoint and segments from disk.
+ CheckpointFromInMemorySeries bool
+
+ // CheckpointBatchSize specifies a size of a single WAL log entry chunk to be flushed.
+ //
+ // Has no effect if CheckpointFromInMemorySeries is false.
+ CheckpointBatchSize int
}
// DefaultOptions used for the WAL storage. They are reasonable for setups using
@@ -238,6 +247,11 @@ func (m *dbMetrics) Unregister() {
}
}
+type deletedRefMeta struct {
+ lastSegment int
+ labels labels.Labels
+}
+
// DB represents a WAL-only storage. It implements storage.DB.
type DB struct {
mtx sync.RWMutex
@@ -263,7 +277,7 @@ type DB struct {
series *stripeSeries
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
// must be kept around to).
- deleted map[chunks.HeadSeriesRef]int
+ deleted map[chunks.HeadSeriesRef]deletedRefMeta
donec chan struct{}
stopc chan struct{}
@@ -305,7 +319,7 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str
nextRef: atomic.NewUint64(0),
series: newStripeSeries(opts.StripeSize),
- deleted: make(map[chunks.HeadSeriesRef]int),
+ deleted: make(map[chunks.HeadSeriesRef]deletedRefMeta),
donec: make(chan struct{}),
stopc: make(chan struct{}),
@@ -566,8 +580,12 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
- if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
- db.deleted[entry.Ref] = currentSegmentOrCheckpoint
+ if meta := db.deleted[entry.Ref]; meta.lastSegment <= currentSegmentOrCheckpoint {
+ var lbls labels.Labels
+ if db.opts.CheckpointFromInMemorySeries {
+ lbls = entry.Labels
+ }
+ db.deleted[entry.Ref] = deletedRefMeta{lastSegment: currentSegmentOrCheckpoint, labels: lbls}
}
} else {
db.metrics.numActiveSeries.Inc()
@@ -582,8 +600,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
- if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
- db.deleted[entry.Ref] = currentSegmentOrCheckpoint
+ if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint {
+ meta.lastSegment = currentSegmentOrCheckpoint
+ db.deleted[entry.Ref] = meta
}
entry.Ref = ref
}
@@ -605,8 +624,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
- if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
- db.deleted[entry.Ref] = currentSegmentOrCheckpoint
+ if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint {
+ meta.lastSegment = currentSegmentOrCheckpoint
+ db.deleted[entry.Ref] = meta
}
entry.Ref = ref
}
@@ -628,8 +648,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
- if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
- db.deleted[entry.Ref] = currentSegmentOrCheckpoint
+ if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint {
+ meta.lastSegment = currentSegmentOrCheckpoint
+ db.deleted[entry.Ref] = meta
}
entry.Ref = ref
}
@@ -713,8 +734,8 @@ func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef
}
// Keep the record if the series was recently deleted.
- seg, ok := db.deleted[id]
- return ok && seg > last
+ meta, ok := db.deleted[id]
+ return ok && meta.lastSegment > last
}
}
@@ -753,7 +774,13 @@ func (db *DB) truncate(mint int64) error {
db.metrics.checkpointCreationTotal.Inc()
- if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint, db.opts.EnableSTStorage); err != nil {
+ if db.opts.CheckpointFromInMemorySeries {
+ err = Checkpoint(db.logger, db.wal, last, db.opts.CheckpointBatchSize, db.series.allSeries(), deletedSeriesIter(db.deleted, last))
+ } else {
+ _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint, db.opts.EnableSTStorage)
+ }
+
+ if err != nil {
db.metrics.checkpointCreationFail.Inc()
var cerr *wlog.CorruptionErr
if errors.As(err, &cerr) {
@@ -770,8 +797,8 @@ func (db *DB) truncate(mint int64) error {
// The checkpoint is written and segments before it are truncated, so we
// no longer need to track deleted series that were being kept around.
- for ref, segment := range db.deleted {
- if segment <= last {
+ for ref, meta := range db.deleted {
+ if meta.lastSegment <= last {
delete(db.deleted, ref)
}
}
@@ -795,7 +822,7 @@ func (db *DB) truncate(mint int64) error {
// gc marks ref IDs that have not received a sample since mint as deleted in
// s.deleted, along with the segment where they originally got deleted.
func (db *DB) gc(mint int64) {
- deleted := db.series.GC(mint)
+ deleted := db.series.GC(mint, db.opts.CheckpointFromInMemorySeries)
db.metrics.numActiveSeries.Sub(float64(len(deleted)))
_, last, _ := wlog.Segments(db.wal.Dir())
@@ -803,8 +830,8 @@ func (db *DB) gc(mint int64) {
// We want to keep series records for any newly deleted series
// until we've passed the last recorded segment. This prevents
// the WAL having samples for series records that no longer exist.
- for ref := range deleted {
- db.deleted[ref] = last
+ for ref, lset := range deleted {
+ db.deleted[ref] = deletedRefMeta{lastSegment: last, labels: lset}
}
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))
diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go
index 465748f810..74dc7fd1b9 100644
--- a/tsdb/agent/db_test.go
+++ b/tsdb/agent/db_test.go
@@ -1533,7 +1533,7 @@ func BenchmarkGetOrCreate(b *testing.B) {
for i := range b.N {
if i%n == 0 && i > 0 {
b.StopTimer()
- _ = s.series.GC(math.MaxInt64)
+ _ = s.series.GC(math.MaxInt64, false)
b.StartTimer()
}
app.getOrCreate(0, lbls[i%n])
diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go
index 9aa3143459..14cc80de9c 100644
--- a/tsdb/agent/series.go
+++ b/tsdb/agent/series.go
@@ -14,6 +14,7 @@
package agent
import (
+ "iter"
"sync"
"github.com/prometheus/prometheus/model/exemplar"
@@ -45,6 +46,18 @@ func (m *memSeries) updateTimestamp(newTs int64) bool {
return false
}
+func (m *memSeries) Ref() chunks.HeadSeriesRef {
+ return m.ref
+}
+
+func (m *memSeries) Labels() labels.Labels {
+ return m.lset
+}
+
+func (m *memSeries) LastSampleTimestamp() int64 {
+ return m.lastTs
+}
+
// seriesHashmap lets agent find a memSeries by its label set, via a 64-bit hash.
// There is one map for the common case where the hash value is unique, and a
// second map for the case that two series have the same hash value.
@@ -160,14 +173,15 @@ func newStripeSeries(stripeSize int) *stripeSeries {
// GC garbage collects old series that have not received a sample after mint
// and will fully delete them.
-func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
+func (s *stripeSeries) GC(mint int64, retainLabels bool) map[chunks.HeadSeriesRef]labels.Labels {
// gcMut serializes GC calls. Within a single GC pass, the check function
// holds hashLock and then acquires refLock — callers must never hold both
// simultaneously, which SetUnlessAlreadySet satisfies.
s.gcMut.Lock()
defer s.gcMut.Unlock()
- deleted := map[chunks.HeadSeriesRef]struct{}{}
+ // labels of deleted series are used by agent.Checkpoint
+ deleted := map[chunks.HeadSeriesRef]labels.Labels{}
// For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID.
check := func(hashLock int, hash uint64, series *memSeries) {
@@ -186,7 +200,12 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
s.locks[refLock].Lock()
}
- deleted[series.ref] = struct{}{}
+ if retainLabels {
+ deleted[series.ref] = series.lset
+ } else {
+ deleted[series.ref] = labels.EmptyLabels()
+ }
+
delete(s.series[refLock], series.ref)
s.hashes[hashLock].Delete(hash, series.ref)
@@ -305,3 +324,87 @@ func (s *stripeSeries) hashLock(hash uint64) uint64 {
func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 {
return uint64(ref) & uint64(s.size-1)
}
+
+var _ ActiveSeries = (*seriesSnapshot)(nil)
+
+// seriesSnapshot is a point-in-time copy of a memSeries fields.
+// It is used to avoid holding series locks during checkpoint I/O.
+type seriesSnapshot struct {
+ ref chunks.HeadSeriesRef
+ lset labels.Labels
+ lastTs int64
+}
+
+func (s *seriesSnapshot) Ref() chunks.HeadSeriesRef {
+ return s.ref
+}
+
+func (s *seriesSnapshot) Labels() labels.Labels {
+ return s.lset
+}
+
+func (s *seriesSnapshot) LastSampleTimestamp() int64 {
+ return s.lastTs
+}
+
+func (s *stripeSeries) allSeries() iter.Seq[ActiveSeries] {
+ return func(yield func(ActiveSeries) bool) {
+ var buf []*memSeries
+ for i := 0; i < s.size; i++ {
+ // Collect pointers under RLock to avoid blocking appenders during I/O.
+ s.locks[i].RLock()
+ buf = buf[:0]
+ for _, series := range s.series[i] {
+ buf = append(buf, series)
+ }
+ s.locks[i].RUnlock()
+
+ // Snapshot and yield outside the stripe lock so that
+ // slow consumers (e.g. checkpoint disk I/O) do not
+ // block appends that need a write lock on this stripe.
+ for _, series := range buf {
+ series.Lock()
+ snapshot := seriesSnapshot{
+ ref: series.ref,
+ lset: series.lset,
+ lastTs: series.lastTs,
+ }
+ series.Unlock()
+
+ if !yield(&snapshot) {
+ return
+ }
+ }
+ }
+ }
+}
+
+// deletedSeriesIter returns an iterator over deleted series from the given map.
+// Only series whose lastSegment is greater than last are emitted, matching the
+// filtering behaviour of [wlog.Checkpoint] keep function.
+func deletedSeriesIter(m map[chunks.HeadSeriesRef]deletedRefMeta, last int) iter.Seq[DeletedSeries] {
+ return func(yield func(DeletedSeries) bool) {
+ for ref, meta := range m {
+ if meta.lastSegment > last {
+ if !yield(deletedSeries{ref: ref, labels: meta.labels}) {
+ return
+ }
+ }
+ }
+ }
+}
+
+var _ DeletedSeries = deletedSeries{}
+
+type deletedSeries struct {
+ ref chunks.HeadSeriesRef
+ labels labels.Labels
+}
+
+func (series deletedSeries) Ref() chunks.HeadSeriesRef {
+ return series.ref
+}
+
+func (series deletedSeries) Labels() labels.Labels {
+ return series.labels
+}
diff --git a/tsdb/agent/series_test.go b/tsdb/agent/series_test.go
index a5a6f04f4b..4039c3fca0 100644
--- a/tsdb/agent/series_test.go
+++ b/tsdb/agent/series_test.go
@@ -40,7 +40,7 @@ func TestNoDeadlock(t *testing.T) {
go func() {
defer wg.Done()
<-started
- _ = stripeSeries.GC(math.MaxInt64)
+ _ = stripeSeries.GC(math.MaxInt64, false)
}()
}
@@ -224,7 +224,7 @@ func TestSetUnlessAlreadySetConcurrentGC(t *testing.T) {
case <-done:
return
default:
- ss.GC(1) // removes series with lastTs < 1, i.e. lastTs==0
+ ss.GC(1, false) // removes series with lastTs < 1, i.e. lastTs==0
}
}
}()
@@ -248,7 +248,7 @@ func TestSetUnlessAlreadySetConcurrentGC(t *testing.T) {
// A final synchronous GC pass ensures all eligible series are fully removed,
// then verify they are unreachable via both lookup paths.
- ss.GC(1)
+ ss.GC(1, false)
for _, s := range eligible {
require.Nil(t, ss.GetByID(s.ref))
require.Nil(t, ss.GetByHash(s.lset.Hash(), s.lset))
@@ -259,7 +259,7 @@ func TestStripeSeries_gc(t *testing.T) {
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
hash := ms1.lset.Hash()
- s.GC(1)
+ s.GC(1, false)
// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
got := s.GetByHash(hash, ms1.lset)
diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go
index a41935044d..e1212b280e 100644
--- a/tsdb/wlog/checkpoint.go
+++ b/tsdb/wlog/checkpoint.go
@@ -82,8 +82,8 @@ func DeleteCheckpoints(dir string, maxIndex int) error {
return errors.Join(errs...)
}
-// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files.
-const checkpointTempFileSuffix = ".tmp"
+// CheckpointTempFileSuffix is the suffix used when creating temporary checkpoint files.
+const CheckpointTempFileSuffix = ".tmp"
// DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory.
func DeleteTempCheckpoints(logger *slog.Logger, dir string) error {
@@ -137,8 +137,8 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
return nil, err
}
- cpdir := checkpointDir(w.Dir(), to)
- cpdirtmp := cpdir + checkpointTempFileSuffix
+ cpdir := CheckpointDir(w.Dir(), to)
+ cpdirtmp := cpdir + CheckpointTempFileSuffix
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
return nil, fmt.Errorf("create checkpoint dir: %w", err)
@@ -407,7 +407,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
// checkpointPrefix is the prefix used for checkpoint files.
const checkpointPrefix = "checkpoint."
-func checkpointDir(dir string, i int) string {
+func CheckpointDir(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i))
}
@@ -446,5 +446,5 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) {
}
func isTempDir(fi fs.DirEntry) bool {
- return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix)
+ return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), CheckpointTempFileSuffix)
}