mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-15 17:46:49 +02:00
fix(tsdb): wal/wbl do pass ST when requested
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
8e2169fc8d
commit
1f0cc810fd
@ -7515,6 +7515,126 @@ func TestHeadAppender_STStorage_Disabled(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestHeadAppender_STStorage_WALReplay verifies that ST values are preserved
|
||||
// across a WAL replay when EnableSTStorage is true. The bug was that Commit()
|
||||
// hardcoded EnableSTStorage=false in the WAL encoder, so ST values were written
|
||||
// as V1 records (without ST) and lost on replay.
|
||||
func TestHeadAppender_STStorage_WALReplay(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
|
||||
opts.EnableSTStorage.Store(true)
|
||||
h, w := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
const st = int64(50)
|
||||
|
||||
a := h.AppenderV2(context.Background())
|
||||
for ts := int64(100); ts < 200; ts++ {
|
||||
_, err := a.Append(0, lbls, st, ts, float64(ts), nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, a.Commit())
|
||||
require.NoError(t, h.Close())
|
||||
|
||||
// Reopen the head, triggering WAL replay.
|
||||
w, err := wlog.New(nil, nil, w.Dir(), compression.None)
|
||||
require.NoError(t, err)
|
||||
opts.ChunkDirRoot = h.opts.ChunkDirRoot
|
||||
h2, err := NewHead(nil, nil, w, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = h2.Close() })
|
||||
require.NoError(t, h2.Init(0))
|
||||
|
||||
// Query and verify ST values survived the WAL replay.
|
||||
q, err := NewBlockQuerier(h2, 100, 199)
|
||||
require.NoError(t, err)
|
||||
got := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
|
||||
var expected []chunks.Sample
|
||||
for ts := int64(100); ts < 200; ts++ {
|
||||
expected = append(expected, sample{st, ts, float64(ts), nil, nil})
|
||||
}
|
||||
require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expected}, got)
|
||||
}
|
||||
|
||||
// TestHeadAppender_STStorage_WBLReplay verifies that ST values are preserved
|
||||
// across a WBL replay for out-of-order samples when EnableSTStorage is true.
|
||||
// The bug was that collectOOORecords() hardcoded EnableSTStorage=false in the
|
||||
// WBL encoder (acc.enc), so OOO sample ST values were written as V1 records
|
||||
// (without ST) and lost on WBL replay.
|
||||
func TestHeadAppender_STStorage_WBLReplay(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
wbl, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = DefaultBlockDuration
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds())
|
||||
opts.EnableSTStorage.Store(true)
|
||||
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
const st = int64(50)
|
||||
|
||||
// Append an in-order sample to establish the head's maxt.
|
||||
app := h.AppenderV2(context.Background())
|
||||
_, err = app.Append(0, lbls, st, 200, 200, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Append OOO samples with non-zero ST; these go to the WBL.
|
||||
// Use fewer than DefaultOutOfOrderCapMax (32) samples so they all stay in the
|
||||
// OOO head chunk (not mmap'd) and are exclusively recovered via WBL replay.
|
||||
app = h.AppenderV2(context.Background())
|
||||
for ts := int64(100); ts < 120; ts++ {
|
||||
_, err = app.Append(0, lbls, st, ts, float64(ts), nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
require.NoError(t, h.Close())
|
||||
|
||||
// Reopen the head, triggering WBL replay.
|
||||
wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
wbl, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.None)
|
||||
require.NoError(t, err)
|
||||
h2, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = h2.Close() })
|
||||
require.NoError(t, h2.Init(0))
|
||||
|
||||
// Access the OOO head chunk directly and verify ST values survived WBL replay.
|
||||
ms, created, err := h2.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.NotNil(t, ms.ooo)
|
||||
require.NotNil(t, ms.ooo.oooHeadChunk)
|
||||
|
||||
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(true, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, chks, 1)
|
||||
|
||||
it := chks[0].chunk.Iterator(nil)
|
||||
var got []chunks.Sample
|
||||
for it.Next() != chunkenc.ValNone {
|
||||
t2, v := it.At()
|
||||
got = append(got, sample{it.AtST(), t2, v, nil, nil})
|
||||
}
|
||||
require.NoError(t, it.Err())
|
||||
|
||||
var expected []chunks.Sample
|
||||
for ts := int64(100); ts < 120; ts++ {
|
||||
expected = append(expected, sample{st, ts, float64(ts), nil, nil})
|
||||
}
|
||||
require.Equal(t, expected, got)
|
||||
}
|
||||
|
||||
// TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding
|
||||
// is used based on EnableSTStorage setting.
|
||||
func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user