diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ed447c5d50..56b74b5a36 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -7515,6 +7515,126 @@ func TestHeadAppender_STStorage_Disabled(t *testing.T) { } } +// TestHeadAppender_STStorage_WALReplay verifies that ST values are preserved +// across a WAL replay when EnableSTStorage is true. The bug was that Commit() +// hardcoded EnableSTStorage=false in the WAL encoder, so ST values were written +// as V1 records (without ST) and lost on replay. +func TestHeadAppender_STStorage_WALReplay(t *testing.T) { + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(true) + h, w := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + const st = int64(50) + + a := h.AppenderV2(context.Background()) + for ts := int64(100); ts < 200; ts++ { + _, err := a.Append(0, lbls, st, ts, float64(ts), nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + require.NoError(t, h.Close()) + + // Reopen the head, triggering WAL replay. + w, err := wlog.New(nil, nil, w.Dir(), compression.None) + require.NoError(t, err) + opts.ChunkDirRoot = h.opts.ChunkDirRoot + h2, err := NewHead(nil, nil, w, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { _ = h2.Close() }) + require.NoError(t, h2.Init(0)) + + // Query and verify ST values survived the WAL replay. + q, err := NewBlockQuerier(h2, 100, 199) + require.NoError(t, err) + got := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + + var expected []chunks.Sample + for ts := int64(100); ts < 200; ts++ { + expected = append(expected, sample{st, ts, float64(ts), nil, nil}) + } + require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expected}, got) +} + +// TestHeadAppender_STStorage_WBLReplay verifies that ST values are preserved +// across a WBL replay for out-of-order samples when EnableSTStorage is true. +// The bug was that collectOOORecords() hardcoded EnableSTStorage=false in the +// WBL encoder (acc.enc), so OOO sample ST values were written as V1 records +// (without ST) and lost on WBL replay. +func TestHeadAppender_STStorage_WBLReplay(t *testing.T) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) + require.NoError(t, err) + wbl, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.None) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkRange = DefaultBlockDuration + opts.ChunkDirRoot = dir + opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds()) + opts.EnableSTStorage.Store(true) + + h, err := NewHead(nil, nil, wal, wbl, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + lbls := labels.FromStrings("foo", "bar") + const st = int64(50) + + // Append an in-order sample to establish the head's maxt. + app := h.AppenderV2(context.Background()) + _, err = app.Append(0, lbls, st, 200, 200, nil, nil, storage.AOptions{}) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Append OOO samples with non-zero ST; these go to the WBL. + // Use fewer than DefaultOutOfOrderCapMax (32) samples so they all stay in the + // OOO head chunk (not mmap'd) and are exclusively recovered via WBL replay. + app = h.AppenderV2(context.Background()) + for ts := int64(100); ts < 120; ts++ { + _, err = app.Append(0, lbls, st, ts, float64(ts), nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + require.NoError(t, h.Close()) + + // Reopen the head, triggering WBL replay. + wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) + require.NoError(t, err) + wbl, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, compression.None) + require.NoError(t, err) + h2, err := NewHead(nil, nil, wal, wbl, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { _ = h2.Close() }) + require.NoError(t, h2.Init(0)) + + // Access the OOO head chunk directly and verify ST values survived WBL replay. + ms, created, err := h2.getOrCreate(lbls.Hash(), lbls, false) + require.NoError(t, err) + require.False(t, created) + require.NotNil(t, ms.ooo) + require.NotNil(t, ms.ooo.oooHeadChunk) + + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(true, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + require.Len(t, chks, 1) + + it := chks[0].chunk.Iterator(nil) + var got []chunks.Sample + for it.Next() != chunkenc.ValNone { + t2, v := it.At() + got = append(got, sample{it.AtST(), t2, v, nil, nil}) + } + require.NoError(t, it.Err()) + + var expected []chunks.Sample + for ts := int64(100); ts < 120; ts++ { + expected = append(expected, sample{st, ts, float64(ts), nil, nil}) + } + require.Equal(t, expected, got) +} + // TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding // is used based on EnableSTStorage setting. func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) {