feat(tsdb/agent): Implement checkpoint based on series in memory (#17948)

Adds CheckpointFromInMemorySeries option for agent.Options to enable a faster checkpoint implementation that skips segment re-read and just uses in-memory data instead.

* feat: impl agent-specific checkpoint dir
* feat: impl ActiveSeries interface
* feat: use new checkpoint impl
* feat: hide new checkpoint impl behind a feature flag
* feat: add benchmark
* feat: add benchstat case
* feat: use feature flag in bench
* feat: use same labels for persisted state and append
* feat: set WAL segment size
* feat: add checkpoint size metric and bump series size
* feat: wal replay test
* feat: expose new checkpoint opts in cmd flags
* feat: update cli doc
* add ActiveSeries and DeletedSeries doc

Signed-off-by: x1unix <9203548+x1unix@users.noreply.github.com>
Signed-off-by: Denys Sedchenko <9203548+x1unix@users.noreply.github.com>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
This commit is contained in:
Denys Sedchenko 2026-04-24 13:42:26 -04:00 committed by GitHub
parent 551b5b1c56
commit ca578101af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 896 additions and 51 deletions

View File

@ -557,6 +557,12 @@ func main() {
"Maximum age samples may be before being forcibly deleted when the WAL is truncated").
Default(agentDefaultMaxWALTime).SetValue(&cfg.agent.MaxWALTime)
agentOnlyFlag(a, "storage.agent.checkpoint-from-in-memory-series", "Use only in-memory series data when building a checkpoint.").
Default("false").BoolVar(&cfg.agent.CheckpointFromInMemorySeries)
agentOnlyFlag(a, "storage.agent.checkpoint-batch-size", "Size of a single WAL log entry chunk to be flushed. Has no effect without --storage.agent.checkpoint-from-in-memory-series flag.").
Default("1000").IntVar(&cfg.agent.CheckpointBatchSize)
agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.agent.NoLockfile)
@ -662,6 +668,11 @@ func main() {
os.Exit(3)
}
if agentMode && cfg.agent.CheckpointBatchSize <= 0 {
fmt.Fprintln(os.Stderr, "--storage.agent.checkpoint-batch-size must be greater than 0.")
os.Exit(1)
}
if cfg.memlimitRatio <= 0.0 || cfg.memlimitRatio > 1.0 {
fmt.Fprintf(os.Stderr, "--auto-gomemlimit.ratio must be greater than 0 and less than or equal to 1.")
os.Exit(1)
@ -2078,15 +2089,17 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
// agentOptions is a version of agent.Options with defined units. This is required
// as agent.Option fields are unit agnostic (time).
type agentOptions struct {
WALSegmentSize units.Base2Bytes
WALCompressionType compression.Type
StripeSize int
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove.
EnableSTAsZeroSample bool
EnableSTStorage bool
WALSegmentSize units.Base2Bytes
WALCompressionType compression.Type
StripeSize int
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove.
EnableSTAsZeroSample bool
EnableSTStorage bool
CheckpointFromInMemorySeries bool
CheckpointBatchSize int
}
func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options {
@ -2094,16 +2107,18 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option
outOfOrderTimeWindow = 0
}
return agent.Options{
WALSegmentSize: int(opts.WALSegmentSize),
WALCompression: opts.WALCompressionType,
StripeSize: opts.StripeSize,
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
OutOfOrderTimeWindow: outOfOrderTimeWindow,
EnableSTAsZeroSample: opts.EnableSTAsZeroSample,
EnableSTStorage: opts.EnableSTStorage,
WALSegmentSize: int(opts.WALSegmentSize),
WALCompression: opts.WALCompressionType,
StripeSize: opts.StripeSize,
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
OutOfOrderTimeWindow: outOfOrderTimeWindow,
EnableSTAsZeroSample: opts.EnableSTAsZeroSample,
EnableSTStorage: opts.EnableSTStorage,
CheckpointFromInMemorySeries: opts.CheckpointFromInMemorySeries,
CheckpointBatchSize: opts.CheckpointBatchSize,
}
}

View File

@ -43,6 +43,8 @@ The Prometheus monitoring server
| <code class="text-nowrap">--storage.agent.wal-compression</code> | Compress the agent WAL. If false, the --storage.agent.wal-compression-type flag is ignored. Use with agent mode only. | `true` |
| <code class="text-nowrap">--storage.agent.retention.min-time</code> | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | `5m` |
| <code class="text-nowrap">--storage.agent.retention.max-time</code> | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | `4h` |
| <code class="text-nowrap">--storage.agent.checkpoint-from-in-memory-series</code> | Use only in-memory series data when building a checkpoint. Use with agent mode only. | `false` |
| <code class="text-nowrap">--storage.agent.checkpoint-batch-size</code> | Size of a single WAL log entry chunk to be flushed. Has no effect without --storage.agent.checkpoint-from-in-memory-series flag. Use with agent mode only. | `1000` |
| <code class="text-nowrap">--storage.agent.no-lockfile</code> | Do not create lockfile in data directory. Use with agent mode only. | `false` |
| <code class="text-nowrap">--storage.remote.flush-deadline</code> | How long to wait flushing sample on shutdown or config reload. | `1m` |
| <code class="text-nowrap">--storage.remote.read-sample-limit</code> | Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types. Use with server mode only. | `5e7` |

232
tsdb/agent/checkpoint.go Normal file
View File

@ -0,0 +1,232 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"errors"
"fmt"
"iter"
"log/slog"
"os"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wlog"
)
const defaultBatchSize = 1000
// ActiveSeries describes a live series to be written by [Checkpoint].
//
// This interface is intentionally exported so downstream users of this package
// can use [Checkpoint] without depending on Prometheus internal series types.
type ActiveSeries interface {
Ref() chunks.HeadSeriesRef
Labels() labels.Labels
LastSampleTimestamp() int64
}
// DeletedSeries describes a deleted series to be written by [Checkpoint].
//
// This interface is intentionally exported so downstream users of this package
// can use [Checkpoint] without depending on Prometheus internal series types.
type DeletedSeries interface {
Ref() chunks.HeadSeriesRef
Labels() labels.Labels
}
// Checkpoint creates an unindexed checkpoint containing record.RefSeries and
// last timestamp for ActiveSeries and record.RefSeries for DeletedSeries.
//
// This API accepts interfaces so downstream users of this package can provide
// their own series storage while reusing Prometheus checkpoint writing logic.
//
// The difference between this implementation and [wlog.Checkpoint] is that it skips re-read current checkpoint + segments
// and relies on data in memory.
func Checkpoint(logger *slog.Logger, w *wlog.WL, atIndex, batchSize int, activeSeries iter.Seq[ActiveSeries], deletedSeries iter.Seq[DeletedSeries]) error {
if batchSize <= 0 {
batchSize = defaultBatchSize
}
logger.Info("Creating checkpoint", "atIndex", atIndex)
dir, idx, err := wlog.LastCheckpoint(w.Dir())
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("can't find last checkpoint: %w", err)
}
if idx >= atIndex {
logger.Info(
"checkpoint already exists",
"dir", dir,
"index", idx,
"requested_index", atIndex,
)
return nil
}
if err := wlog.DeleteTempCheckpoints(logger, w.Dir()); err != nil {
return fmt.Errorf("failed to cleanup temporary checkpoints: %w", err)
}
cpDir := wlog.CheckpointDir(w.Dir(), atIndex)
cpTmpDir := cpDir + wlog.CheckpointTempFileSuffix
if err := os.MkdirAll(cpTmpDir, os.ModePerm); err != nil {
return fmt.Errorf("create checkpoint dir: %w", err)
}
cp, err := wlog.New(logger, nil, cpTmpDir, w.CompressionType())
if err != nil {
return fmt.Errorf("open checkpoint: %w", err)
}
success := false
defer func() {
if !success {
cp.Close()
}
os.RemoveAll(cpTmpDir)
}()
flusher := newCheckpointFlusher(cp, batchSize)
if err := flusher.writeSeries(activeSeries); err != nil {
return err
}
if err := flusher.writeDeletedRecords(deletedSeries); err != nil {
return err
}
success = true
if err := cp.Close(); err != nil {
return fmt.Errorf("close checkpoint: %w", err)
}
df, err := fileutil.OpenDir(cpTmpDir)
if err != nil {
return fmt.Errorf("open temporary checkpoint directory: %w", err)
}
if err := df.Sync(); err != nil {
df.Close()
return fmt.Errorf("sync temporary checkpoint directory: %w", err)
}
if err = df.Close(); err != nil {
return fmt.Errorf("close temporary checkpoint directory: %w", err)
}
if err := fileutil.Replace(cpTmpDir, cpDir); err != nil {
return fmt.Errorf("rename checkpoint directory: %w", err)
}
return nil
}
type checkpointWriter struct {
enc record.Encoder
checkpoint *wlog.WL
seriesBuff []byte
samplesBuff []byte
seriesRecords []record.RefSeries
sampleRecords []record.RefSample
batchSize int
}
func newCheckpointFlusher(checkpoint *wlog.WL, batchSize int) *checkpointWriter {
return &checkpointWriter{
batchSize: batchSize,
checkpoint: checkpoint,
seriesRecords: make([]record.RefSeries, 0, batchSize),
sampleRecords: make([]record.RefSample, 0, batchSize),
}
}
func (cf *checkpointWriter) flushRecords() error {
withSamples := len(cf.sampleRecords) > 0
cf.seriesBuff = cf.enc.Series(cf.seriesRecords, cf.seriesBuff)
var err error
if withSamples {
cf.samplesBuff = cf.enc.Samples(cf.sampleRecords, cf.samplesBuff)
err = cf.checkpoint.Log(cf.seriesBuff, cf.samplesBuff)
} else {
err = cf.checkpoint.Log(cf.seriesBuff)
}
if err != nil {
return fmt.Errorf("flush records: %w", err)
}
cf.seriesBuff = cf.seriesBuff[:0]
cf.samplesBuff = cf.samplesBuff[:0]
cf.seriesRecords = cf.seriesRecords[:0]
cf.sampleRecords = cf.sampleRecords[:0]
return nil
}
func (cf *checkpointWriter) writeSeries(seriesIter iter.Seq[ActiveSeries]) error {
for series := range seriesIter {
// If we filled the buffers, write them out and reset.
if len(cf.seriesRecords) == cf.batchSize {
if err := cf.flushRecords(); err != nil {
return fmt.Errorf("flush active series: %w", err)
}
}
cf.seriesRecords = append(cf.seriesRecords, record.RefSeries{
Ref: series.Ref(),
Labels: series.Labels(),
})
// Sample value is irrelevant, we only need the timestamp.
cf.sampleRecords = append(cf.sampleRecords, record.RefSample{
Ref: series.Ref(),
T: series.LastSampleTimestamp(),
V: 0,
})
}
// Flush the last batch if we have one
if len(cf.seriesRecords) != 0 {
return cf.flushRecords()
}
return nil
}
func (cf *checkpointWriter) writeDeletedRecords(seriesRefIter iter.Seq[DeletedSeries]) error {
for series := range seriesRefIter {
// If we filled the buffers, write them out and reset.
if len(cf.seriesRecords) == cf.batchSize {
if err := cf.flushRecords(); err != nil {
return fmt.Errorf("flush deleted series: %w", err)
}
}
// We don't care about timestamps here, so no samples.
cf.seriesRecords = append(cf.seriesRecords, record.RefSeries{
Ref: series.Ref(),
Labels: series.Labels(),
})
}
// Clear the last batch if we have one
if len(cf.seriesRecords) != 0 {
return cf.flushRecords()
}
return nil
}

View File

@ -0,0 +1,466 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"errors"
"fmt"
"io/fs"
"maps"
"math"
"os"
"path/filepath"
"testing"
"time"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
)
const walSegmentSize = 32 << 10 // must be aligned to the page size
func TestCheckpointReplayCompatibility(t *testing.T) {
// Test to ensure that WAL replay between wlog.Checkpoint and agent.Checkpoint are the same.
var (
wlogAfterSeries *stripeSeries
agentAfterSeries *stripeSeries
)
openDBAndDo := func(isInMemCheckpoint bool, storageDir string, fn func(db *DB)) {
l := promslog.NewNopLogger()
rs := remote.NewStorage(
promslog.NewNopLogger(), nil,
startTime, storageDir,
30*time.Second, nil, false,
)
defer rs.Close()
opts := DefaultOptions()
opts.CheckpointFromInMemorySeries = isInMemCheckpoint
opts.WALSegmentSize = walSegmentSize // Set minimum size to get more segments for checkpoint.
db, err := Open(l, nil, rs, storageDir, opts)
require.NoError(t, err, "Open")
fn(db)
}
// Prepare samples and labels that will be written into appender.
samples := genCheckpointTestSamples(checkpointTestSamplesParams{
labelPrefix: t.Name(),
numDatapoints: 3,
numHistograms: 3,
numSeries: 300,
})
appendData := func(db *DB) {
app := db.Appender(t.Context())
const flushEvery = 1000
n := 0
maybeFlush := func() {
if n < flushEvery {
return
}
require.NoError(t, app.Commit())
app = db.Appender(t.Context())
n = 0
}
lbls := samples.datapointLabels
for i, l := range lbls {
lset := labels.New(l...)
for j, sample := range samples.datapointSamples {
st := sample[0].T()
sf := sample[0].F()
// replay doesn't include exemplars, thus don't include them to remove them from assertion.
_, err := app.Append(0, lset, st, sf)
require.NoErrorf(t, err, "L: %v; S: %v", i, j)
n++
maybeFlush()
}
}
for i, l := range samples.histogramLabels {
lset := labels.New(l...)
histograms := samples.histogramSamples[i]
for j, sample := range histograms {
_, err := app.AppendHistogram(0, lset, int64(j), sample, nil)
require.NoError(t, err)
n++
maybeFlush()
}
}
require.NoError(t, app.Commit())
}
// Write and replay for old wlog.Checkpoint
// wlog.Open expects to have a "wal" subdirectory
wlogStateRoot := filepath.Join(t.TempDir(), "state-wlog")
wlogWalDir := filepath.Join(wlogStateRoot, "wal")
require.NoError(t, os.MkdirAll(wlogWalDir, os.ModePerm))
openDBAndDo(false, wlogStateRoot, func(db *DB) {
appendData(db)
// Trigger checkpoint call.
err := db.truncate(-1)
require.NoError(t, err, "db.truncate")
require.NoError(t, db.Close())
})
assertCheckpointExists(t, wlogWalDir, 1)
// Restore the database from the checkpoint.
openDBAndDo(true, wlogStateRoot, func(db *DB) {
defer db.Close()
wlogAfterSeries = db.series
})
// Write and replay using agent.Checkpoint:
agentStateRoot := filepath.Join(t.TempDir(), "state-agent")
agentWalDir := filepath.Join(agentStateRoot, "wal")
require.NoError(t, os.MkdirAll(agentWalDir, os.ModePerm))
openDBAndDo(true, agentStateRoot, func(db *DB) {
appendData(db)
err := db.truncate(-1)
require.NoError(t, err, "db.truncate")
require.NoError(t, db.Close())
})
assertCheckpointExists(t, agentWalDir, 1)
openDBAndDo(true, agentStateRoot, func(db *DB) {
defer db.Close()
agentAfterSeries = db.series
})
requireStripeSeriesEqual(t, wlogAfterSeries, agentAfterSeries)
}
// requireStripeSeriesEqual asserts that two stripeSeries are semantically
// equivalent: same set of refs, each memSeries has matching labels (by
// content) and lastTs. It avoids reflect.DeepEqual on labels.Labels because
// under -tags=dedupelabels the struct carries a per-instance nameTable
// pointer and symbol-table IDs whose layout depends on the order in which
// records were interned during replay — an implementation detail, not a
// behavioural property.
func requireStripeSeriesEqual(t *testing.T, want, got *stripeSeries) {
t.Helper()
require.Equal(t, want.size, got.size, "stripeSeries size mismatch")
collect := func(s *stripeSeries) map[chunks.HeadSeriesRef]*memSeries {
out := map[chunks.HeadSeriesRef]*memSeries{}
for _, m := range s.series {
maps.Copy(out, m)
}
return out
}
wantByRef := collect(want)
gotByRef := collect(got)
require.Len(t, gotByRef, len(wantByRef), "series count mismatch")
for ref, w := range wantByRef {
g, ok := gotByRef[ref]
require.Truef(t, ok, "ref %d present in wlog path, missing in agent path", ref)
require.Truef(t, labels.Equal(w.lset, g.lset),
"ref %d labels mismatch: wlog=%s agent=%s", ref, w.lset.String(), g.lset.String())
require.Equalf(t, w.lastTs, g.lastTs, "ref %d lastTs mismatch", ref)
}
}
func assertCheckpointExists(t *testing.T, walDir string, checkpointID int) {
d := wlog.CheckpointDir(walDir, checkpointID)
v, err := os.Stat(d)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
t.Fatalf("checkpoint doesn't exists in WAL dir %q", walDir)
return
}
t.Fatalf("can't stat checkpoint dir %q", err)
return
}
require.True(t, v.IsDir(), "checkpoint should be a dir")
}
// To run the benchmark and display a diff, use the following command:
//
// go test -bench="BenchmarkCheckpoint" . -run ^$ -benchmem -count 6 -benchtime 5s | tee benchmarks
// benchstat -col '/checkpoint' benchmarks
func BenchmarkCheckpoint(b *testing.B) {
// Prepare in advance samples and labels that will be written into appender.
samples := genCheckpointTestSamples(checkpointTestSamplesParams{
labelPrefix: b.Name(),
numDatapoints: 10,
numHistograms: 10,
numSeries: 3600,
})
// Prepare initial wlog state with segments.
testSamplesSrcDir := filepath.Join(b.TempDir(), "samples-src")
require.NoError(b, os.Mkdir(testSamplesSrcDir, os.ModePerm))
createCheckpointFixtures(b, checkpointFixtureParams{
dir: testSamplesSrcDir,
numSegments: 512,
dtDelta: 10000,
segmentSize: walSegmentSize, // must be aligned to the page size
seriesLabels: samples.datapointLabels,
})
configs := []struct {
name string
useAgentCheckpoint bool
}{
{
name: "wlog",
useAgentCheckpoint: false,
},
{
name: "agent",
useAgentCheckpoint: true,
},
}
for _, cfg := range configs {
tname := fmt.Sprintf("checkpoint=%s", cfg.name)
b.Run(tname, func(b *testing.B) {
// Copy initial wlog state into a scratch directory for test.
// wlog.Open expects to have a "wal" subdirectory
wlogDir := filepath.Join(b.TempDir(), "testdata", "wal")
err := os.CopyFS(wlogDir, os.DirFS(testSamplesSrcDir))
require.NoErrorf(b, err, "failed to copy test samples from %q to %q", testSamplesSrcDir, wlogDir)
storageDir := filepath.Dir(wlogDir)
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
benchCheckpoint(b, benchCheckpointParams{
storageDir: storageDir,
samples: samples,
skipCurrentCheckpointReRead: cfg.useAgentCheckpoint,
})
// Get the size of the checkpoint directory
checkpointSize := getCheckpointSize(b, wlogDir)
b.ReportMetric(float64(checkpointSize), "checkpoint_size")
}
})
}
}
func getCheckpointSize(b testing.TB, walDir string) int64 {
dirName, _, err := wlog.LastCheckpoint(walDir)
require.NoError(b, err, "can't find the last checkpoint")
// Walk through a dir and accumulate total size of all files
var size int64
err = filepath.WalkDir(dirName, func(_ string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
info, err := d.Info()
if err != nil {
return err
}
size += info.Size()
}
return nil
})
require.NoError(b, err, "can't walk through the checkpoint dir")
return size
}
type benchCheckpointParams struct {
storageDir string
skipCurrentCheckpointReRead bool
samples checkpointTestSamples
}
func benchCheckpoint(b *testing.B, p benchCheckpointParams) {
b.StopTimer()
l := promslog.NewNopLogger()
rs := remote.NewStorage(
promslog.NewNopLogger(), nil,
startTime, p.storageDir,
30*time.Second, nil, false,
)
defer rs.Close()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = math.MaxInt64 // Fixes "out of order sample" in benchmarks.
opts.CheckpointFromInMemorySeries = p.skipCurrentCheckpointReRead
opts.WALSegmentSize = walSegmentSize // Set minimum size to get more segments for checkpoint.
db, err := Open(l, nil, rs, p.storageDir, opts)
require.NoError(b, err, "Open")
app := db.Appender(b.Context())
const flushEvery = 1000
n := 0
maybeFlush := func() {
if n < flushEvery {
return
}
require.NoError(b, app.Commit())
app = db.Appender(b.Context())
n = 0
}
lbls := p.samples.datapointLabels
for i, l := range lbls {
lset := labels.New(l...)
for j, sample := range p.samples.datapointSamples {
st := sample[0].T()
sf := sample[0].F()
ref, err := app.Append(0, lset, st, sf)
require.NoErrorf(b, err, "L: %v; S: %v", i, j)
e := exemplar.Exemplar{
Labels: lset,
Ts: sample[0].T() + int64(i),
Value: sample[0].F(),
HasTs: true,
}
_, err = app.AppendExemplar(ref, lset, e)
require.NoError(b, err)
n += 2
maybeFlush()
}
}
for i, l := range p.samples.histogramLabels {
lset := labels.New(l...)
histograms := p.samples.histogramSamples[i]
for j, sample := range histograms {
_, err := app.AppendHistogram(0, lset, int64(j), sample, nil)
require.NoError(b, err)
n++
maybeFlush()
}
}
require.NoError(b, app.Commit())
// Trigger checkpoint call.
b.StartTimer()
err = db.truncate(-1)
require.NoError(b, err, "db.truncate")
require.NoError(b, db.Close())
}
type checkpointTestSamplesParams struct {
labelPrefix string
numDatapoints int
numHistograms int
numSeries int
}
type checkpointTestSamples struct {
datapointLabels [][]labels.Label
histogramLabels [][]labels.Label
datapointSamples [][]chunks.Sample
histogramSamples [][]*histogram.Histogram
}
func genCheckpointTestSamples(p checkpointTestSamplesParams) checkpointTestSamples {
out := checkpointTestSamples{
datapointLabels: labelsForTest(p.labelPrefix, p.numSeries),
histogramLabels: labelsForTest(p.labelPrefix+"_histogram", p.numSeries),
datapointSamples: make([][]chunks.Sample, 0, p.numSeries),
histogramSamples: make([][]*histogram.Histogram, 0, p.numSeries),
}
for range p.numDatapoints {
sample := chunks.GenerateSamples(0, 1)
out.datapointSamples = append(out.datapointSamples, sample)
}
for range out.histogramLabels {
histograms := tsdbutil.GenerateTestHistograms(p.numHistograms)
out.histogramSamples = append(out.histogramSamples, histograms)
}
return out
}
type checkpointFixtureParams struct {
dir string
numSegments int
segmentSize int
dtDelta int64
seriesLabels [][]labels.Label
}
func createCheckpointFixtures(t testing.TB, p checkpointFixtureParams) {
// Make a segment to put initial data
var enc record.Encoder
// Create dummy segment to bump the start segment number.
// Dummy segment should be zero or agent.Open() will fail.
seg, err := wlog.CreateSegment(p.dir, 0)
require.NoError(t, err)
require.NoError(t, seg.Close())
w, err := wlog.NewSize(promslog.NewNopLogger(), nil, p.dir, p.segmentSize, DefaultOptions().WALCompression)
require.NoError(t, err)
series := make([]record.RefSeries, 0, len(p.seriesLabels))
for i, lset := range p.seriesLabels {
// NOTE: don't append RefMetadata as agent.DB doesn't support it during WAL replay.
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.New(lset...),
})
}
var dt int64
samples := make([]record.RefSample, 0, len(series))
for i := range p.numSegments {
if i == 0 {
// Write series required for samples
b := enc.Series(series, nil)
require.NoError(t, w.Log(b))
}
samples = samples[:0]
for j := range len(series) {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(j),
V: float64(i),
T: dt + int64(j+1),
})
}
require.NoError(t, w.Log(enc.Samples(samples, nil)))
dt += p.dtDelta
}
require.NoError(t, w.Close(), "WAL.Close")
}

View File

@ -99,6 +99,15 @@ type Options struct {
// persisted to the WAL for samples that include a non-zero start timestamp in
// supported record types.
EnableSTStorage bool
// CheckpointFromInMemorySeries changes checkpoint implementation to use only in-memory series data when building a checkpoint.
// This prevents re-reading the previous checkpoint and segments from disk.
CheckpointFromInMemorySeries bool
// CheckpointBatchSize specifies a size of a single WAL log entry chunk to be flushed.
//
// Has no effect if CheckpointFromInMemorySeries is false.
CheckpointBatchSize int
}
// DefaultOptions used for the WAL storage. They are reasonable for setups using
@ -238,6 +247,11 @@ func (m *dbMetrics) Unregister() {
}
}
type deletedRefMeta struct {
lastSegment int
labels labels.Labels
}
// DB represents a WAL-only storage. It implements storage.DB.
type DB struct {
mtx sync.RWMutex
@ -263,7 +277,7 @@ type DB struct {
series *stripeSeries
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
// must be kept around to).
deleted map[chunks.HeadSeriesRef]int
deleted map[chunks.HeadSeriesRef]deletedRefMeta
donec chan struct{}
stopc chan struct{}
@ -305,7 +319,7 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str
nextRef: atomic.NewUint64(0),
series: newStripeSeries(opts.StripeSize),
deleted: make(map[chunks.HeadSeriesRef]int),
deleted: make(map[chunks.HeadSeriesRef]deletedRefMeta),
donec: make(chan struct{}),
stopc: make(chan struct{}),
@ -566,8 +580,12 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
if meta := db.deleted[entry.Ref]; meta.lastSegment <= currentSegmentOrCheckpoint {
var lbls labels.Labels
if db.opts.CheckpointFromInMemorySeries {
lbls = entry.Labels
}
db.deleted[entry.Ref] = deletedRefMeta{lastSegment: currentSegmentOrCheckpoint, labels: lbls}
}
} else {
db.metrics.numActiveSeries.Inc()
@ -582,8 +600,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint {
meta.lastSegment = currentSegmentOrCheckpoint
db.deleted[entry.Ref] = meta
}
entry.Ref = ref
}
@ -605,8 +624,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint {
meta.lastSegment = currentSegmentOrCheckpoint
db.deleted[entry.Ref] = meta
}
entry.Ref = ref
}
@ -628,8 +648,9 @@ func (db *DB) loadWAL(r *wlog.Reader, duplicateRefToValidRef map[chunks.HeadSeri
if ref, ok := duplicateRefToValidRef[entry.Ref]; ok {
// We want to track the largest segment where we encountered the duplicate ref, so we can ensure
// it remains in the checkpoint until we get past that segment.
if db.deleted[entry.Ref] <= currentSegmentOrCheckpoint {
db.deleted[entry.Ref] = currentSegmentOrCheckpoint
if meta, ok := db.deleted[entry.Ref]; ok && meta.lastSegment <= currentSegmentOrCheckpoint {
meta.lastSegment = currentSegmentOrCheckpoint
db.deleted[entry.Ref] = meta
}
entry.Ref = ref
}
@ -713,8 +734,8 @@ func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef
}
// Keep the record if the series was recently deleted.
seg, ok := db.deleted[id]
return ok && seg > last
meta, ok := db.deleted[id]
return ok && meta.lastSegment > last
}
}
@ -753,7 +774,13 @@ func (db *DB) truncate(mint int64) error {
db.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint, db.opts.EnableSTStorage); err != nil {
if db.opts.CheckpointFromInMemorySeries {
err = Checkpoint(db.logger, db.wal, last, db.opts.CheckpointBatchSize, db.series.allSeries(), deletedSeriesIter(db.deleted, last))
} else {
_, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint, db.opts.EnableSTStorage)
}
if err != nil {
db.metrics.checkpointCreationFail.Inc()
var cerr *wlog.CorruptionErr
if errors.As(err, &cerr) {
@ -770,8 +797,8 @@ func (db *DB) truncate(mint int64) error {
// The checkpoint is written and segments before it are truncated, so we
// no longer need to track deleted series that were being kept around.
for ref, segment := range db.deleted {
if segment <= last {
for ref, meta := range db.deleted {
if meta.lastSegment <= last {
delete(db.deleted, ref)
}
}
@ -795,7 +822,7 @@ func (db *DB) truncate(mint int64) error {
// gc marks ref IDs that have not received a sample since mint as deleted in
// s.deleted, along with the segment where they originally got deleted.
func (db *DB) gc(mint int64) {
deleted := db.series.GC(mint)
deleted := db.series.GC(mint, db.opts.CheckpointFromInMemorySeries)
db.metrics.numActiveSeries.Sub(float64(len(deleted)))
_, last, _ := wlog.Segments(db.wal.Dir())
@ -803,8 +830,8 @@ func (db *DB) gc(mint int64) {
// We want to keep series records for any newly deleted series
// until we've passed the last recorded segment. This prevents
// the WAL having samples for series records that no longer exist.
for ref := range deleted {
db.deleted[ref] = last
for ref, lset := range deleted {
db.deleted[ref] = deletedRefMeta{lastSegment: last, labels: lset}
}
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))

View File

@ -1533,7 +1533,7 @@ func BenchmarkGetOrCreate(b *testing.B) {
for i := range b.N {
if i%n == 0 && i > 0 {
b.StopTimer()
_ = s.series.GC(math.MaxInt64)
_ = s.series.GC(math.MaxInt64, false)
b.StartTimer()
}
app.getOrCreate(0, lbls[i%n])

View File

@ -14,6 +14,7 @@
package agent
import (
"iter"
"sync"
"github.com/prometheus/prometheus/model/exemplar"
@ -45,6 +46,18 @@ func (m *memSeries) updateTimestamp(newTs int64) bool {
return false
}
func (m *memSeries) Ref() chunks.HeadSeriesRef {
return m.ref
}
func (m *memSeries) Labels() labels.Labels {
return m.lset
}
func (m *memSeries) LastSampleTimestamp() int64 {
return m.lastTs
}
// seriesHashmap lets agent find a memSeries by its label set, via a 64-bit hash.
// There is one map for the common case where the hash value is unique, and a
// second map for the case that two series have the same hash value.
@ -160,14 +173,15 @@ func newStripeSeries(stripeSize int) *stripeSeries {
// GC garbage collects old series that have not received a sample after mint
// and will fully delete them.
func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
func (s *stripeSeries) GC(mint int64, retainLabels bool) map[chunks.HeadSeriesRef]labels.Labels {
// gcMut serializes GC calls. Within a single GC pass, the check function
// holds hashLock and then acquires refLock — callers must never hold both
// simultaneously, which SetUnlessAlreadySet satisfies.
s.gcMut.Lock()
defer s.gcMut.Unlock()
deleted := map[chunks.HeadSeriesRef]struct{}{}
// labels of deleted series are used by agent.Checkpoint
deleted := map[chunks.HeadSeriesRef]labels.Labels{}
// For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID.
check := func(hashLock int, hash uint64, series *memSeries) {
@ -186,7 +200,12 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
s.locks[refLock].Lock()
}
deleted[series.ref] = struct{}{}
if retainLabels {
deleted[series.ref] = series.lset
} else {
deleted[series.ref] = labels.EmptyLabels()
}
delete(s.series[refLock], series.ref)
s.hashes[hashLock].Delete(hash, series.ref)
@ -305,3 +324,87 @@ func (s *stripeSeries) hashLock(hash uint64) uint64 {
func (s *stripeSeries) refLock(ref chunks.HeadSeriesRef) uint64 {
return uint64(ref) & uint64(s.size-1)
}
var _ ActiveSeries = (*seriesSnapshot)(nil)
// seriesSnapshot is a point-in-time copy of a memSeries fields.
// It is used to avoid holding series locks during checkpoint I/O.
type seriesSnapshot struct {
ref chunks.HeadSeriesRef
lset labels.Labels
lastTs int64
}
func (s *seriesSnapshot) Ref() chunks.HeadSeriesRef {
return s.ref
}
func (s *seriesSnapshot) Labels() labels.Labels {
return s.lset
}
func (s *seriesSnapshot) LastSampleTimestamp() int64 {
return s.lastTs
}
func (s *stripeSeries) allSeries() iter.Seq[ActiveSeries] {
return func(yield func(ActiveSeries) bool) {
var buf []*memSeries
for i := 0; i < s.size; i++ {
// Collect pointers under RLock to avoid blocking appenders during I/O.
s.locks[i].RLock()
buf = buf[:0]
for _, series := range s.series[i] {
buf = append(buf, series)
}
s.locks[i].RUnlock()
// Snapshot and yield outside the stripe lock so that
// slow consumers (e.g. checkpoint disk I/O) do not
// block appends that need a write lock on this stripe.
for _, series := range buf {
series.Lock()
snapshot := seriesSnapshot{
ref: series.ref,
lset: series.lset,
lastTs: series.lastTs,
}
series.Unlock()
if !yield(&snapshot) {
return
}
}
}
}
}
// deletedSeriesIter returns an iterator over deleted series from the given map.
// Only series whose lastSegment is greater than last are emitted, matching the
// filtering behaviour of [wlog.Checkpoint] keep function.
func deletedSeriesIter(m map[chunks.HeadSeriesRef]deletedRefMeta, last int) iter.Seq[DeletedSeries] {
return func(yield func(DeletedSeries) bool) {
for ref, meta := range m {
if meta.lastSegment > last {
if !yield(deletedSeries{ref: ref, labels: meta.labels}) {
return
}
}
}
}
}
var _ DeletedSeries = deletedSeries{}
type deletedSeries struct {
ref chunks.HeadSeriesRef
labels labels.Labels
}
func (series deletedSeries) Ref() chunks.HeadSeriesRef {
return series.ref
}
func (series deletedSeries) Labels() labels.Labels {
return series.labels
}

View File

@ -40,7 +40,7 @@ func TestNoDeadlock(t *testing.T) {
go func() {
defer wg.Done()
<-started
_ = stripeSeries.GC(math.MaxInt64)
_ = stripeSeries.GC(math.MaxInt64, false)
}()
}
@ -224,7 +224,7 @@ func TestSetUnlessAlreadySetConcurrentGC(t *testing.T) {
case <-done:
return
default:
ss.GC(1) // removes series with lastTs < 1, i.e. lastTs==0
ss.GC(1, false) // removes series with lastTs < 1, i.e. lastTs==0
}
}
}()
@ -248,7 +248,7 @@ func TestSetUnlessAlreadySetConcurrentGC(t *testing.T) {
// A final synchronous GC pass ensures all eligible series are fully removed,
// then verify they are unreachable via both lookup paths.
ss.GC(1)
ss.GC(1, false)
for _, s := range eligible {
require.Nil(t, ss.GetByID(s.ref))
require.Nil(t, ss.GetByHash(s.lset.Hash(), s.lset))
@ -259,7 +259,7 @@ func TestStripeSeries_gc(t *testing.T) {
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
hash := ms1.lset.Hash()
s.GC(1)
s.GC(1, false)
// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
got := s.GetByHash(hash, ms1.lset)

View File

@ -82,8 +82,8 @@ func DeleteCheckpoints(dir string, maxIndex int) error {
return errors.Join(errs...)
}
// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files.
const checkpointTempFileSuffix = ".tmp"
// CheckpointTempFileSuffix is the suffix used when creating temporary checkpoint files.
const CheckpointTempFileSuffix = ".tmp"
// DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory.
func DeleteTempCheckpoints(logger *slog.Logger, dir string) error {
@ -137,8 +137,8 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
return nil, err
}
cpdir := checkpointDir(w.Dir(), to)
cpdirtmp := cpdir + checkpointTempFileSuffix
cpdir := CheckpointDir(w.Dir(), to)
cpdirtmp := cpdir + CheckpointTempFileSuffix
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
return nil, fmt.Errorf("create checkpoint dir: %w", err)
@ -407,7 +407,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
// checkpointPrefix is the prefix used for checkpoint files.
const checkpointPrefix = "checkpoint."
func checkpointDir(dir string, i int) string {
func CheckpointDir(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i))
}
@ -446,5 +446,5 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) {
}
func isTempDir(fi fs.DirEntry) bool {
return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix)
return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), CheckpointTempFileSuffix)
}