mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-05 20:36:13 +02:00
tsdb: Find the last series ID on startup from the last series id file and WAL scan (#18333)
* Add logic to Head.Init(...) for fast startup Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Add unit tests Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Empty commit to retrigger CI Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Empty commit to retrigger CI Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Make readSeriesStateFile return a struct directly, fix small nits, remove test Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Fix test for readSeriesStateFile function Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> * Fix some more nits, add extra testcase Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com> --------- Signed-off-by: Rushabh Mehta <mehtarushabh2005@gmail.com>
This commit is contained in:
parent
71be0ff0ba
commit
a2172f91c1
23
tsdb/head.go
23
tsdb/head.go
@ -20,6 +20,7 @@ import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
@ -804,6 +805,28 @@ func (h *Head) Init(minValidTime int64) error {
|
||||
return fmt.Errorf("finding WAL segments: %w", e)
|
||||
}
|
||||
|
||||
if h.opts.EnableFastStartup {
|
||||
state, err := h.readSeriesStateFile()
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
h.logger.Warn("Failed to read series state file, skipping the fast startup", "err", err)
|
||||
}
|
||||
if err == nil {
|
||||
if state.CleanShutdown {
|
||||
h.lastSeriesID.Store(state.LastSeriesID)
|
||||
h.logger.Info("Fast startup: clean shutdown detected, restored last series ID", "last_series_id", state.LastSeriesID)
|
||||
} else {
|
||||
h.logger.Info("Fast startup: unclean shutdown detected, performing WAL scan", "from_segment", state.LastWALSegment, "to_segment", endAt)
|
||||
id, scanErr := h.findLastSeriesID(state, endAt)
|
||||
if scanErr != nil {
|
||||
h.logger.Error("Fast startup: WAL scan failed, skipping fast startup", "err", scanErr)
|
||||
} else {
|
||||
h.lastSeriesID.Store(id)
|
||||
h.logger.Info("Fast startup: WAL scan completed", "last_series_id", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
h.startWALReplayStatus(startFrom, endAt)
|
||||
|
||||
syms := labels.NewSymbolTable() // One table for the whole WAL.
|
||||
|
||||
@ -7915,3 +7915,86 @@ func TestHead_FastStartupStateFile(t *testing.T) {
|
||||
require.Equal(t, uint64(1), state.LastSeriesID, "LastSeriesID should remain 1")
|
||||
require.Equal(t, 0, state.LastWALSegment, "LastWALSegment should remain 0")
|
||||
}
|
||||
|
||||
func TestHead_ReadSeriesStateFile(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(1000, false)
|
||||
head, w := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
// Fresh boot case.
|
||||
// Should return 0 valued state and os.ErrNotExist.
|
||||
state, err := head.readSeriesStateFile()
|
||||
require.Error(t, err, "reading non-existent state file should return an error")
|
||||
require.True(t, os.IsNotExist(err), "error should be of type os.ErrNotExist")
|
||||
require.Equal(t, SeriesLifecycleState{}, state, "state should be zero-valued when file does not exist")
|
||||
|
||||
// Valid file case.
|
||||
expectedState := SeriesLifecycleState{
|
||||
LastSeriesID: 42000,
|
||||
LastWALSegment: 5,
|
||||
CleanShutdown: true,
|
||||
}
|
||||
|
||||
b, err := json.Marshal(expectedState)
|
||||
require.NoError(t, err)
|
||||
err = os.WriteFile(filepath.Join(w.Dir(), "series_state.json"), b, 0o666)
|
||||
require.NoError(t, err)
|
||||
|
||||
state, err = head.readSeriesStateFile()
|
||||
require.NoError(t, err, "reading valid state file should not error")
|
||||
require.Equal(t, expectedState, state, "read state should match written state")
|
||||
}
|
||||
|
||||
func TestHead_FindLastSeriesID(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(1000, false)
|
||||
head, w := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
// Write Series A to first segment.
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Append(0, labels.FromStrings("metric", "A"), 100, 1.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Force the WAL to cut a new segment file (second segment).
|
||||
_, err = w.NextSegment()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write new sample for series A to second segment.
|
||||
app = head.Appender(context.Background())
|
||||
_, err = app.Append(0, labels.FromStrings("metric", "A"), 200, 2.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Get the current max segment number.
|
||||
first, last, err := wlog.Segments(w.Dir())
|
||||
require.NoError(t, err)
|
||||
|
||||
mockState := SeriesLifecycleState{
|
||||
LastSeriesID: 1,
|
||||
LastWALSegment: first,
|
||||
CleanShutdown: false,
|
||||
}
|
||||
|
||||
// Should return 1 as there is only 1 series created so far.
|
||||
id, err := head.findLastSeriesID(mockState, last)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1), id, "Should find ID 1 as no new series were created in segment 2")
|
||||
|
||||
// Write Series B to the second segment
|
||||
app = head.Appender(context.Background())
|
||||
_, err = app.Append(0, labels.FromStrings("metric", "B"), 300, 3.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Scanning both files should now return 2
|
||||
id, err = head.findLastSeriesID(mockState, last)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), id, "Should find ID 2 after new series was created in segment 2")
|
||||
|
||||
// Simulate state file knowing about latest segment.
|
||||
mockState.LastWALSegment = last
|
||||
|
||||
// Should return 2 as it should scan the last file and find series B.
|
||||
id, err = head.findLastSeriesID(mockState, last)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), id, "Should find ID 2 even when state file's last segment is the newest segment")
|
||||
}
|
||||
|
||||
@ -1817,6 +1817,27 @@ type SeriesLifecycleState struct {
|
||||
CleanShutdown bool `json:"clean_shutdown"`
|
||||
}
|
||||
|
||||
// readSeriesStateFile reads the series lifecycle state from disk.
|
||||
func (h *Head) readSeriesStateFile() (SeriesLifecycleState, error) {
|
||||
if h.wal == nil {
|
||||
return SeriesLifecycleState{}, os.ErrNotExist
|
||||
}
|
||||
|
||||
path := filepath.Join(h.wal.Dir(), seriesStateFilename)
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return SeriesLifecycleState{}, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var state SeriesLifecycleState
|
||||
if err := json.NewDecoder(f).Decode(&state); err != nil {
|
||||
return SeriesLifecycleState{}, fmt.Errorf("decode series state: %w", err)
|
||||
}
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
// Atomically writes the current series state to disk.
|
||||
func (h *Head) writeSeriesState(cleanShutdown bool) {
|
||||
if h.wal == nil {
|
||||
@ -1873,3 +1894,62 @@ func (h *Head) runSeriesStateTicker() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// findLastSeriesID performs a bounded reverse scan of WAL segments to find the highest series ID.
|
||||
func (h *Head) findLastSeriesID(state SeriesLifecycleState, endSegment int) (uint64, error) {
|
||||
startSegment := state.LastWALSegment
|
||||
startSegment = max(0, startSegment)
|
||||
|
||||
syms := labels.NewSymbolTable()
|
||||
|
||||
// Iterate backwards from the newest segment to the oldest allowed segment.
|
||||
for i := endSegment; i >= startSegment; i-- {
|
||||
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i))
|
||||
if os.IsNotExist(err) {
|
||||
continue // Segment might have been deleted, we skip it.
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("open WAL segment %d: %w", i, err)
|
||||
}
|
||||
|
||||
sr := wlog.NewSegmentBufReader(s)
|
||||
r := wlog.NewReader(sr)
|
||||
dec := record.NewDecoder(syms, h.logger)
|
||||
|
||||
var highestID chunks.HeadSeriesRef
|
||||
var found bool
|
||||
|
||||
// Read the segment forwards.
|
||||
for r.Next() {
|
||||
rec := r.Record()
|
||||
// We only care about Series records.
|
||||
if dec.Type(rec) != record.Series {
|
||||
continue
|
||||
}
|
||||
|
||||
series, err := dec.Series(rec, nil)
|
||||
if err != nil {
|
||||
s.Close()
|
||||
return 0, fmt.Errorf("decode series in segment %d: %w", i, err)
|
||||
}
|
||||
for _, ws := range series {
|
||||
highestID = max(highestID, ws.Ref)
|
||||
found = true
|
||||
}
|
||||
}
|
||||
|
||||
err = r.Err()
|
||||
s.Close()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read WAL segment %d: %w", i, err)
|
||||
}
|
||||
|
||||
if found {
|
||||
return uint64(highestID), nil
|
||||
}
|
||||
}
|
||||
|
||||
// If we scanned the segments and found no series records,
|
||||
// the ID from our state file has to be used.
|
||||
return state.LastSeriesID, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user