mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-13 00:28:42 +02:00
parent
83962c35a4
commit
db852f7ff9
@ -15,8 +15,10 @@ package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
@ -2014,6 +2016,161 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
|
||||
require.Equal(t, 0, metadata)
|
||||
}
|
||||
|
||||
// TestHead_CrossSegmentWALCorruption_Replay_vs_Checkpoint reproduces the
|
||||
// asymmetry described in https://github.com/prometheus/prometheus/issues/18552:
|
||||
// a WAL with a record that was torn across a segment boundary (segment N ends
|
||||
// mid-record with a recFirst fragment followed by zero padding, segment N+1
|
||||
// starts cleanly with a recFull) is silently accepted by the per-segment WAL
|
||||
// replay path in Head.Init, but causes wlog.Checkpoint to fail with
|
||||
// "unexpected full record" because it reads all segments through a single
|
||||
// streaming Reader that preserves the partial-record counter across segment
|
||||
// boundaries.
|
||||
//
|
||||
// This test currently codifies the buggy behavior. Once the bug is fixed,
|
||||
// the replay-path assertion below must be inverted (it should then report
|
||||
// the corruption so that wal.Repair gets invoked) and the checkpoint-path
|
||||
// assertion should be updated to reflect the post-repair state.
|
||||
func TestHead_CrossSegmentWALCorruption_Replay_vs_Checkpoint(t *testing.T) {
|
||||
// WAL record framing constants. These are duplicated from the internal
|
||||
// constants in tsdb/wlog (which are unexported) and must be kept in sync.
|
||||
const (
|
||||
pageSize = 32 * 1024
|
||||
recordHeaderSize = 7
|
||||
segmentSize = 3 * pageSize
|
||||
recFull = byte(1) // wlog.recFull
|
||||
recFirst = byte(2) // wlog.recFirst
|
||||
)
|
||||
castagnoliTable := crc32.MakeTable(crc32.Castagnoli)
|
||||
|
||||
dir := t.TempDir()
|
||||
walDir := filepath.Join(dir, "wal")
|
||||
|
||||
// Create a multi-segment WAL whose records each exactly fill one page
|
||||
// so that segments 0 and 1 are both completely full of recFull records.
|
||||
w, err := wlog.NewSize(nil, nil, walDir, segmentSize, compression.None)
|
||||
require.NoError(t, err)
|
||||
|
||||
recPayloadSize := pageSize - recordHeaderSize
|
||||
// 7 records: 3 fill segment 0, 3 fill segment 1, 1 goes into segment 2.
|
||||
for i := range 7 {
|
||||
payload := make([]byte, recPayloadSize)
|
||||
// Use a non-Prometheus record-type marker so that if the data is ever
|
||||
// passed to record.Decoder.Type, it decodes to record.Unknown.
|
||||
payload[0] = 0xAA
|
||||
payload[1] = byte(i)
|
||||
require.NoError(t, w.Log(payload))
|
||||
}
|
||||
require.NoError(t, w.Close())
|
||||
|
||||
first, last, err := wlog.Segments(walDir)
|
||||
require.NoError(t, err)
|
||||
require.GreaterOrEqual(t, last-first, 2, "need at least 3 segments for this test")
|
||||
|
||||
// Sanity-check: segment 0 is full (segmentSize bytes).
|
||||
fi, err := os.Stat(wlog.SegmentName(walDir, 0))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(segmentSize), fi.Size(), "segment 0 must be a full %d-byte segment", segmentSize)
|
||||
|
||||
// Craft the cross-segment torn record: overwrite the last page of
|
||||
// segment 0 with a valid recFirst header + small payload, and zero-pad
|
||||
// the rest of the page. This simulates what a restart leaves behind
|
||||
// after a disk-full write: the torn last page is zero-padded by
|
||||
// OpenWriteSegment, but the recFirst fragment still sits at the start
|
||||
// of that page, while segment 1 starts cleanly with a recFull.
|
||||
const tornSeg = 0
|
||||
fragmentPayload := []byte("tornFragment")
|
||||
|
||||
lastPage := make([]byte, pageSize) // All zero-initialized.
|
||||
lastPage[0] = recFirst
|
||||
binary.BigEndian.PutUint16(lastPage[1:3], uint16(len(fragmentPayload)))
|
||||
binary.BigEndian.PutUint32(lastPage[3:7], crc32.Checksum(fragmentPayload, castagnoliTable))
|
||||
copy(lastPage[recordHeaderSize:recordHeaderSize+len(fragmentPayload)], fragmentPayload)
|
||||
|
||||
segFile, err := os.OpenFile(wlog.SegmentName(walDir, tornSeg), os.O_RDWR, 0o666)
|
||||
require.NoError(t, err)
|
||||
// Write the crafted page at the last page offset (page 2 of a 3-page segment).
|
||||
_, err = segFile.WriteAt(lastPage, int64(segmentSize-pageSize))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, segFile.Close())
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Assertion 1 (currently buggy): per-segment WAL replay misses the
|
||||
// cross-segment torn record. This mirrors the loop in Head.Init
|
||||
// (tsdb/head.go, the "Backfill segments from the most recent
|
||||
// checkpoint onwards" block), where a fresh wlog.Reader is created
|
||||
// for every segment and therefore the partial-record counter is
|
||||
// reset at each boundary.
|
||||
// ---------------------------------------------------------------
|
||||
for i := first; i <= last; i++ {
|
||||
s, err := wlog.OpenReadSegment(wlog.SegmentName(walDir, i))
|
||||
require.NoError(t, err)
|
||||
|
||||
sr, err := wlog.NewSegmentBufReaderWithOffset(0, s)
|
||||
require.NoError(t, err)
|
||||
|
||||
r := wlog.NewReader(sr)
|
||||
for r.Next() { //nolint:revive // Drain the reader.
|
||||
}
|
||||
// Today this assertion holds: per-segment replay silently accepts
|
||||
// the corruption. See https://github.com/prometheus/prometheus/issues/18552.
|
||||
// Once the fix lands, segment tornSeg should report a CorruptionErr
|
||||
// and this assertion will need to be inverted.
|
||||
require.NoErrorf(t, r.Err(),
|
||||
"bug #18552: per-segment WAL replay unexpectedly detected the "+
|
||||
"cross-segment torn record in segment %d; if this assertion "+
|
||||
"starts failing after a fix, invert it to assert the "+
|
||||
"CorruptionErr is returned for segment %d", i, tornSeg)
|
||||
|
||||
require.NoError(t, sr.Close())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Assertion 2: reading the same on-disk WAL through a single
|
||||
// continuous Reader (which is what wlog.Checkpoint does) surfaces
|
||||
// the corruption with exactly the error text reported in the issue.
|
||||
// ---------------------------------------------------------------
|
||||
sr, err := wlog.NewSegmentsRangeReader(wlog.SegmentRange{Dir: walDir, First: first, Last: last})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = sr.Close() })
|
||||
|
||||
r := wlog.NewReader(sr)
|
||||
for r.Next() { //nolint:revive // Drain until corruption is hit.
|
||||
}
|
||||
readerErr := r.Err()
|
||||
require.Error(t, readerErr, "continuous Reader must detect the cross-segment torn record")
|
||||
|
||||
var cerr *wlog.CorruptionErr
|
||||
require.ErrorAs(t, readerErr, &cerr)
|
||||
require.Contains(t, readerErr.Error(), "unexpected full record",
|
||||
"continuous Reader must report the same error as checkpointing in the wild")
|
||||
// The Reader reports the segment that held the unexpected recFull
|
||||
// (segment tornSeg+1), not the segment that started the torn record.
|
||||
// Either is acceptable; just make sure we're not pointing before tornSeg.
|
||||
require.GreaterOrEqual(t, cerr.Segment, tornSeg)
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Assertion 3: exercising wlog.Checkpoint end-to-end to make this
|
||||
// test as close as possible to the code path that surfaces the bug
|
||||
// in production (the "WAL truncation in Compact" log line).
|
||||
// ---------------------------------------------------------------
|
||||
wForCheckpoint, err := wlog.NewSize(nil, nil, walDir, segmentSize, compression.None)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = wForCheckpoint.Close() })
|
||||
|
||||
_, err = wlog.Checkpoint(
|
||||
promslog.NewNopLogger(),
|
||||
wForCheckpoint,
|
||||
first,
|
||||
last-1, // Don't include the active segment in the checkpoint.
|
||||
func(chunks.HeadSeriesRef) bool { return true },
|
||||
math.MinInt64,
|
||||
false,
|
||||
)
|
||||
require.Error(t, err, "wlog.Checkpoint must fail on the cross-segment torn WAL")
|
||||
require.Contains(t, err.Error(), "unexpected full record",
|
||||
"wlog.Checkpoint must fail with the same error observed in issue #18552")
|
||||
}
|
||||
|
||||
func TestDelete_e2e(t *testing.T) {
|
||||
numDatapoints := 1000
|
||||
numRanges := 1000
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user