From a2172f91c198aed8d2f645021a90942ffbcaf706 Mon Sep 17 00:00:00 2001 From: Rushabh Mehta <139112780+RushabhMehta2005@users.noreply.github.com> Date: Wed, 1 Apr 2026 10:15:53 +0530 Subject: [PATCH] tsdb: Find the last series ID on startup from the last series id file and WAL scan (#18333) * Add logic to Head.Init(...) for fast startup Signed-off-by: Rushabh Mehta * Add unit tests Signed-off-by: Rushabh Mehta * Empty commit to retrigger CI Signed-off-by: Rushabh Mehta * Empty commit to retrigger CI Signed-off-by: Rushabh Mehta * Make readSeriesStateFile return a struct directly, fix small nits, remove test Signed-off-by: Rushabh Mehta * Fix test for readSeriesStateFile function Signed-off-by: Rushabh Mehta * Fix some more nits, add extra testcase Signed-off-by: Rushabh Mehta --------- Signed-off-by: Rushabh Mehta --- tsdb/head.go | 23 +++++++++++++ tsdb/head_test.go | 83 +++++++++++++++++++++++++++++++++++++++++++++++ tsdb/head_wal.go | 80 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 186 insertions(+) diff --git a/tsdb/head.go b/tsdb/head.go index fb85691638..cf8ed8a199 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -20,6 +20,7 @@ import ( "io" "log/slog" "math" + "os" "path/filepath" "runtime" "strconv" @@ -804,6 +805,28 @@ func (h *Head) Init(minValidTime int64) error { return fmt.Errorf("finding WAL segments: %w", e) } + if h.opts.EnableFastStartup { + state, err := h.readSeriesStateFile() + if err != nil && !os.IsNotExist(err) { + h.logger.Warn("Failed to read series state file, skipping the fast startup", "err", err) + } + if err == nil { + if state.CleanShutdown { + h.lastSeriesID.Store(state.LastSeriesID) + h.logger.Info("Fast startup: clean shutdown detected, restored last series ID", "last_series_id", state.LastSeriesID) + } else { + h.logger.Info("Fast startup: unclean shutdown detected, performing WAL scan", "from_segment", state.LastWALSegment, "to_segment", endAt) + id, scanErr := h.findLastSeriesID(state, endAt) + if scanErr != nil { + h.logger.Error("Fast startup: WAL scan failed, skipping fast startup", "err", scanErr) + } else { + h.lastSeriesID.Store(id) + h.logger.Info("Fast startup: WAL scan completed", "last_series_id", id) + } + } + } + } + h.startWALReplayStatus(startFrom, endAt) syms := labels.NewSymbolTable() // One table for the whole WAL. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8aaad13c0e..f4e19d791a 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -7915,3 +7915,86 @@ func TestHead_FastStartupStateFile(t *testing.T) { require.Equal(t, uint64(1), state.LastSeriesID, "LastSeriesID should remain 1") require.Equal(t, 0, state.LastWALSegment, "LastWALSegment should remain 0") } + +func TestHead_ReadSeriesStateFile(t *testing.T) { + opts := newTestHeadDefaultOptions(1000, false) + head, w := newTestHeadWithOptions(t, compression.None, opts) + + // Fresh boot case. + // Should return 0 valued state and os.ErrNotExist. + state, err := head.readSeriesStateFile() + require.Error(t, err, "reading non-existent state file should return an error") + require.True(t, os.IsNotExist(err), "error should be of type os.ErrNotExist") + require.Equal(t, SeriesLifecycleState{}, state, "state should be zero-valued when file does not exist") + + // Valid file case. + expectedState := SeriesLifecycleState{ + LastSeriesID: 42000, + LastWALSegment: 5, + CleanShutdown: true, + } + + b, err := json.Marshal(expectedState) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(w.Dir(), "series_state.json"), b, 0o666) + require.NoError(t, err) + + state, err = head.readSeriesStateFile() + require.NoError(t, err, "reading valid state file should not error") + require.Equal(t, expectedState, state, "read state should match written state") +} + +func TestHead_FindLastSeriesID(t *testing.T) { + opts := newTestHeadDefaultOptions(1000, false) + head, w := newTestHeadWithOptions(t, compression.None, opts) + + // Write Series A to first segment. + app := head.Appender(context.Background()) + _, err := app.Append(0, labels.FromStrings("metric", "A"), 100, 1.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Force the WAL to cut a new segment file (second segment). + _, err = w.NextSegment() + require.NoError(t, err) + + // Write new sample for series A to second segment. + app = head.Appender(context.Background()) + _, err = app.Append(0, labels.FromStrings("metric", "A"), 200, 2.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Get the current max segment number. + first, last, err := wlog.Segments(w.Dir()) + require.NoError(t, err) + + mockState := SeriesLifecycleState{ + LastSeriesID: 1, + LastWALSegment: first, + CleanShutdown: false, + } + + // Should return 1 as there is only 1 series created so far. + id, err := head.findLastSeriesID(mockState, last) + require.NoError(t, err) + require.Equal(t, uint64(1), id, "Should find ID 1 as no new series were created in segment 2") + + // Write Series B to the second segment + app = head.Appender(context.Background()) + _, err = app.Append(0, labels.FromStrings("metric", "B"), 300, 3.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Scanning both files should now return 2 + id, err = head.findLastSeriesID(mockState, last) + require.NoError(t, err) + require.Equal(t, uint64(2), id, "Should find ID 2 after new series was created in segment 2") + + // Simulate state file knowing about latest segment. + mockState.LastWALSegment = last + + // Should return 2 as it should scan the last file and find series B. + id, err = head.findLastSeriesID(mockState, last) + require.NoError(t, err) + require.Equal(t, uint64(2), id, "Should find ID 2 even when state file's last segment is the newest segment") +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index de8b19b6c1..e9700dd82e 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -1817,6 +1817,27 @@ type SeriesLifecycleState struct { CleanShutdown bool `json:"clean_shutdown"` } +// readSeriesStateFile reads the series lifecycle state from disk. +func (h *Head) readSeriesStateFile() (SeriesLifecycleState, error) { + if h.wal == nil { + return SeriesLifecycleState{}, os.ErrNotExist + } + + path := filepath.Join(h.wal.Dir(), seriesStateFilename) + f, err := os.Open(path) + if err != nil { + return SeriesLifecycleState{}, err + } + defer f.Close() + + var state SeriesLifecycleState + if err := json.NewDecoder(f).Decode(&state); err != nil { + return SeriesLifecycleState{}, fmt.Errorf("decode series state: %w", err) + } + + return state, nil +} + // Atomically writes the current series state to disk. func (h *Head) writeSeriesState(cleanShutdown bool) { if h.wal == nil { @@ -1873,3 +1894,62 @@ func (h *Head) runSeriesStateTicker() { } } } + +// findLastSeriesID performs a bounded reverse scan of WAL segments to find the highest series ID. +func (h *Head) findLastSeriesID(state SeriesLifecycleState, endSegment int) (uint64, error) { + startSegment := state.LastWALSegment + startSegment = max(0, startSegment) + + syms := labels.NewSymbolTable() + + // Iterate backwards from the newest segment to the oldest allowed segment. + for i := endSegment; i >= startSegment; i-- { + s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i)) + if os.IsNotExist(err) { + continue // Segment might have been deleted, we skip it. + } + if err != nil { + return 0, fmt.Errorf("open WAL segment %d: %w", i, err) + } + + sr := wlog.NewSegmentBufReader(s) + r := wlog.NewReader(sr) + dec := record.NewDecoder(syms, h.logger) + + var highestID chunks.HeadSeriesRef + var found bool + + // Read the segment forwards. + for r.Next() { + rec := r.Record() + // We only care about Series records. + if dec.Type(rec) != record.Series { + continue + } + + series, err := dec.Series(rec, nil) + if err != nil { + s.Close() + return 0, fmt.Errorf("decode series in segment %d: %w", i, err) + } + for _, ws := range series { + highestID = max(highestID, ws.Ref) + found = true + } + } + + err = r.Err() + s.Close() + if err != nil { + return 0, fmt.Errorf("read WAL segment %d: %w", i, err) + } + + if found { + return uint64(highestID), nil + } + } + + // If we scanned the segments and found no series records, + // the ID from our state file has to be used. + return state.LastSeriesID, nil +}