From 3cf43337dcb483cfa4cc7348f8e247a156101747 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 12 Mar 2026 08:28:45 +0000 Subject: [PATCH] post merge conflict fixes Signed-off-by: bwplotka --- storage/remote/queue_manager_test.go | 18 +- tsdb/wlog/watcher_test.go | 297 ++++++++++++++------------- util/testwal/records.go | 14 +- 3 files changed, 170 insertions(+), 159 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 5c97a170f6..b0a5627e2f 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -608,7 +608,7 @@ func TestReshardPartialBatch(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { recs := testwal.GenerateRecords(recCase{ - NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. Series: 1, SamplesPerSeries: 10, }) @@ -656,8 +656,8 @@ func TestQueueFilledDeadlock(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { recs := testwal.GenerateRecords(recCase{ - NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. - Series: 50, SamplesPerSeries: 1 + NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + Series: 50, SamplesPerSeries: 1, }) c := NewNopWriteClient() @@ -1920,7 +1920,7 @@ func TestDropOldTimeSeries(t *testing.T) { nSamples := config.DefaultQueueConfig.Capacity * size noST := protoMsg == remoteapi.WriteV1MessageType // RW1 pastRecs := testwal.GenerateRecords(recCase{ - NoST: noST, + NoST: noST, Series: nSeries, SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. TsFn: func(_, j int) int64 { @@ -1929,7 +1929,7 @@ func TestDropOldTimeSeries(t *testing.T) { }, }) newRecs := testwal.GenerateRecords(recCase{ - NoST: noST, + NoST: noST, Series: nSeries, SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. TsFn: func(_, j int) int64 { @@ -2004,7 +2004,7 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { r := rand.New(rand.NewSource(99)) recs := testwal.GenerateRecords(recCase{ - NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. Series: numberOfSeries, SamplesPerSeries: 1, TsFn: func(_, _ int) int64 { @@ -2031,8 +2031,8 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { tsID := getSeriesIDFromRef(recs.Series[s.Ref]) c.expectedSamples[tsID] = append(c.expectedSamples[tsID], writev2.Sample{ StartTimestamp: s.ST, - Timestamp: s.T, - Value: s.V, + Timestamp: s.T, + Value: s.V, }) } } @@ -2554,7 +2554,7 @@ func TestHighestTimestampOnAppend(t *testing.T) { nSamples := 11 * config.DefaultQueueConfig.Capacity nSeries := 3 recs := testwal.GenerateRecords(recCase{ - NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. Series: nSeries, SamplesPerSeries: nSamples / nSeries, }) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 5c63fd3d92..6c82ec8dcb 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -194,145 +194,145 @@ func TestWatcher_Tail(t *testing.T) { exemplarsPerSeries = 2 ) for _, enableSTStorage := range []bool{false, true} { - for _, compress := range compression.Types() { - t.Run(fmt.Sprintf("compress=%s/stStorage=%v", compress, enableSTStorage), func(t *testing.T) { - var ( - now = time.Now() - dir = t.TempDir() - wdir = path.Join(dir, "wal") - enc = record.Encoder{EnableSTStorage: enableSTStorage} - ) - require.NoError(t, os.Mkdir(wdir, 0o777)) + for _, compress := range compression.Types() { + t.Run(fmt.Sprintf("compress=%s/stStorage=%v", compress, enableSTStorage), func(t *testing.T) { + var ( + now = time.Now() + dir = t.TempDir() + wdir = path.Join(dir, "wal") + enc = record.Encoder{EnableSTStorage: enableSTStorage} + ) + require.NoError(t, os.Mkdir(wdir, 0o777)) - // Generate test records that represents batches of records data. - // "batch" simulates a single scrape or RW/OTLP receive message. - // Watcher does not inspect the data other than watching start timestamp, so records - // does not need any certain shape. - records := make([]testwal.Records, batches) - cbHistogramRecords := make([]testwal.Records, batches) - for i := range records { - tsFn := func(_, _ int) int64 { - return timestamp.FromTime(now.Add(1 * time.Second)) + // Generate test records that represents batches of records data. + // "batch" simulates a single scrape or RW/OTLP receive message. + // Watcher does not inspect the data other than watching start timestamp, so records + // does not need any certain shape. + records := make([]testwal.Records, batches) + cbHistogramRecords := make([]testwal.Records, batches) + for i := range records { + tsFn := func(_, _ int) int64 { + return timestamp.FromTime(now.Add(1 * time.Second)) + } + records[i] = testwal.GenerateRecords(testwal.RecordsCase{ + NoST: !enableSTStorage, + RefPadding: i * seriesPerBatch, + TsFn: tsFn, + + Series: seriesPerBatch, + SamplesPerSeries: 10, + HistogramsPerSeries: 5, + FloatHistogramsPerSeries: 5, + ExemplarsPerSeries: exemplarsPerSeries, + }) + cbHistogramRecords[i] = testwal.GenerateRecords(testwal.RecordsCase{ + NoST: !enableSTStorage, + RefPadding: i * seriesPerBatch, + TsFn: tsFn, + + Series: seriesPerBatch, + HistogramsPerSeries: 5, + FloatHistogramsPerSeries: 5, + HistogramFn: func(ref int) *histogram.Histogram { + return &histogram.Histogram{ + Schema: -53, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + CustomValues: []float64{float64(ref) + 2}, + } + }, + }) } - records[i] = testwal.GenerateRecords(testwal.RecordsCase{ - NoST: !enableSTStorage, - RefPadding: i * seriesPerBatch, - TsFn: tsFn, - Series: seriesPerBatch, - SamplesPerSeries: 10, - HistogramsPerSeries: 5, - FloatHistogramsPerSeries: 5, - ExemplarsPerSeries: exemplarsPerSeries, + // Create WAL for writing. + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, w.Close()) }) - cbHistogramRecords[i] = testwal.GenerateRecords(testwal.RecordsCase{ - NoST: !enableSTStorage, - RefPadding: i * seriesPerBatch, - TsFn: tsFn, - Series: seriesPerBatch, - HistogramsPerSeries: 5, - FloatHistogramsPerSeries: 5, - HistogramFn: func(ref int) *histogram.Histogram { - return &histogram.Histogram{ - Schema: -53, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 2, - Sum: 0, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - CustomValues: []float64{float64(ref) + 2}, - } - }, - }) - } + // Start watcher to that reads into a mock. + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true, nil) + // Update the time because we just created samples around "now" time and watcher + // only starts watching after that time. + watcher.SetStartTime(now) + // Start spins up watcher loop in a go-routine. + watcher.Start() + t.Cleanup(watcher.Stop) - // Create WAL for writing. - w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, w.Close()) - }) + // Write to WAL like append commit would do, while watcher is tailing. - // Start watcher to that reads into a mock. - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true, nil) - // Update the time because we just created samples around "now" time and watcher - // only starts watching after that time. - watcher.SetStartTime(now) - // Start spins up watcher loop in a go-routine. - watcher.Start() - t.Cleanup(watcher.Stop) + // Write first a few samples before the start time, we don't expect those to be appended. + require.NoError(t, w.Log(enc.Samples([]record.RefSample{ + {Ref: 1, T: timestamp.FromTime(now), V: 123}, + {Ref: 2, T: timestamp.FromTime(now), V: 123.1}, + }, nil))) - // Write to WAL like append commit would do, while watcher is tailing. + for i := range records { + // Similar order as tsdb/head_appender.go.headAppenderBase.log + // https://github.com/prometheus/prometheus/blob/1751685dd4f6430757ba3078a96cffeffcb2bb47/tsdb/head_append.go#L1053 + require.NoError(t, w.Log(enc.Series(records[i].Series, nil))) + require.NoError(t, w.Log(enc.Metadata(records[i].Metadata, nil))) + require.NoError(t, w.Log(enc.Samples(records[i].Samples, nil))) - // Write first a few samples before the start time, we don't expect those to be appended. - require.NoError(t, w.Log(enc.Samples([]record.RefSample{ - {Ref: 1, T: timestamp.FromTime(now), V: 123}, - {Ref: 2, T: timestamp.FromTime(now), V: 123.1}, - }, nil))) + hs, cbHs := enc.HistogramSamples(records[i].Histograms, nil) + require.Empty(t, cbHs) + require.NoError(t, w.Log(hs)) + fhs, cbFhs := enc.FloatHistogramSamples(records[i].FloatHistograms, nil) + require.Empty(t, cbFhs) + require.NoError(t, w.Log(fhs)) + require.NoError(t, w.Log(enc.CustomBucketsHistogramSamples(cbHistogramRecords[i].Histograms, nil))) + require.NoError(t, w.Log(enc.CustomBucketsFloatHistogramSamples(cbHistogramRecords[i].FloatHistograms, nil))) - for i := range records { - // Similar order as tsdb/head_appender.go.headAppenderBase.log - // https://github.com/prometheus/prometheus/blob/1751685dd4f6430757ba3078a96cffeffcb2bb47/tsdb/head_append.go#L1053 - require.NoError(t, w.Log(enc.Series(records[i].Series, nil))) - require.NoError(t, w.Log(enc.Metadata(records[i].Metadata, nil))) - require.NoError(t, w.Log(enc.Samples(records[i].Samples, nil))) + require.NoError(t, w.Log(enc.Exemplars(records[i].Exemplars, nil))) - hs, cbHs := enc.HistogramSamples(records[i].Histograms, nil) - require.Empty(t, cbHs) - require.NoError(t, w.Log(hs)) - fhs, cbFhs := enc.FloatHistogramSamples(records[i].FloatHistograms, nil) - require.Empty(t, cbFhs) - require.NoError(t, w.Log(fhs)) - require.NoError(t, w.Log(enc.CustomBucketsHistogramSamples(cbHistogramRecords[i].Histograms, nil))) - require.NoError(t, w.Log(enc.CustomBucketsFloatHistogramSamples(cbHistogramRecords[i].FloatHistograms, nil))) + // Ping watcher for faster test. Watcher is checking for segment changes or 15s timeout. + watcher.Notify() + } - require.NoError(t, w.Log(enc.Exemplars(records[i].Exemplars, nil))) + // Wait for watcher to lead all. + require.Eventually(t, func() bool { + wt.mu.Lock() + defer wt.mu.Unlock() - // Ping watcher for faster test. Watcher is checking for segment changes or 15s timeout. - watcher.Notify() - } + // Exemplars are logged as the last one, so assert on those. + return wt.exemplarAppends >= batches + }, 2*time.Minute, 1*time.Second) - // Wait for watcher to lead all. - require.Eventually(t, func() bool { wt.mu.Lock() defer wt.mu.Unlock() - // Exemplars are logged as the last one, so assert on those. - return wt.exemplarAppends >= batches - }, 2*time.Minute, 1*time.Second) + require.Equal(t, batches, wt.seriesStores) + require.Equal(t, batches, wt.metadataStores) + require.Equal(t, batches, wt.sampleAppends) + require.Equal(t, 2*batches, wt.histogramAppends) + require.Equal(t, 2*batches, wt.floatHistogramsAppends) + require.Equal(t, batches, wt.exemplarAppends) - wt.mu.Lock() - defer wt.mu.Unlock() + for i := range batches { + sector := len(records[i].Series) + testutil.RequireEqual(t, records[i].Series, wt.seriesStored[i*sector:(i+1)*sector], i) + sector = len(records[i].Metadata) + require.Equal(t, records[i].Metadata, wt.metadataStored[i*sector:(i+1)*sector], i) + sector = len(records[i].Samples) + require.Equal(t, records[i].Samples, wt.samplesAppended[i*sector:(i+1)*sector], i) - require.Equal(t, batches, wt.seriesStores) - require.Equal(t, batches, wt.metadataStores) - require.Equal(t, batches, wt.sampleAppends) - require.Equal(t, 2*batches, wt.histogramAppends) - require.Equal(t, 2*batches, wt.floatHistogramsAppends) - require.Equal(t, batches, wt.exemplarAppends) + sector = len(records[i].Histograms) + len(cbHistogramRecords[i].Histograms) + require.Equal(t, records[i].Histograms, wt.histogramsAppended[i*sector:i*sector+len(records[i].Histograms)], i) + require.Equal(t, cbHistogramRecords[i].Histograms, wt.histogramsAppended[i*sector+len(records[i].Histograms):(i+1)*sector]) + sector = len(records[i].FloatHistograms) + len(cbHistogramRecords[i].FloatHistograms) + require.Equal(t, records[i].FloatHistograms, wt.floatHistogramsAppended[i*sector:i*sector+len(records[i].FloatHistograms)]) + require.Equal(t, cbHistogramRecords[i].FloatHistograms, wt.floatHistogramsAppended[i*sector+len(records[i].FloatHistograms):(i+1)*sector]) - for i := range batches { - sector := len(records[i].Series) - testutil.RequireEqual(t, records[i].Series, wt.seriesStored[i*sector:(i+1)*sector], i) - sector = len(records[i].Metadata) - require.Equal(t, records[i].Metadata, wt.metadataStored[i*sector:(i+1)*sector], i) - sector = len(records[i].Samples) - require.Equal(t, records[i].Samples, wt.samplesAppended[i*sector:(i+1)*sector], i) - - sector = len(records[i].Histograms) + len(cbHistogramRecords[i].Histograms) - require.Equal(t, records[i].Histograms, wt.histogramsAppended[i*sector:i*sector+len(records[i].Histograms)], i) - require.Equal(t, cbHistogramRecords[i].Histograms, wt.histogramsAppended[i*sector+len(records[i].Histograms):(i+1)*sector]) - sector = len(records[i].FloatHistograms) + len(cbHistogramRecords[i].FloatHistograms) - require.Equal(t, records[i].FloatHistograms, wt.floatHistogramsAppended[i*sector:i*sector+len(records[i].FloatHistograms)]) - require.Equal(t, cbHistogramRecords[i].FloatHistograms, wt.floatHistogramsAppended[i*sector+len(records[i].FloatHistograms):(i+1)*sector]) - - sector = len(records[i].Exemplars) - testutil.RequireEqual(t, records[i].Exemplars, wt.exemplarsAppended[i*sector:(i+1)*sector]) - } - }) - } + sector = len(records[i].Exemplars) + testutil.RequireEqual(t, records[i].Exemplars, wt.exemplarsAppended[i*sector:(i+1)*sector]) + } + }) + } } } @@ -390,9 +390,9 @@ func TestReadToEndNoCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) - go watcher.Start() + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) + go watcher.Start() expected := seriesCount require.Eventually(t, func() bool { @@ -478,12 +478,12 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - _, _, err = Segments(w.Dir()) - require.NoError(t, err) - overwriteReadTimeout(t, time.Second) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) - go watcher.Start() + _, _, err = Segments(w.Dir()) + require.NoError(t, err) + overwriteReadTimeout(t, time.Second) + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) + go watcher.Start() expected := seriesCount * 2 @@ -554,9 +554,9 @@ func TestReadCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) - go watcher.Start() + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) + go watcher.Start() expectedSeries := seriesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { @@ -625,9 +625,9 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { require.NoError(t, err) } - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) - watcher.MaxSegment = -1 + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) + watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. watcher.SetMetrics() @@ -705,7 +705,7 @@ func TestCheckpointSeriesReset(t *testing.T) { overwriteReadTimeout(t, time.Second) wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, subdir, false, false, false, nil) watcher.MaxSegment = -1 go watcher.Start() @@ -784,9 +784,9 @@ func TestRun_StartupTime(t *testing.T) { } require.NoError(t, w.Close()) - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) - watcher.MaxSegment = segments + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) + watcher.MaxSegment = segments watcher.SetMetrics() startTime := time.Now() @@ -856,11 +856,11 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { // Create 00000001, the watcher will tail it once started. w.NextSegment() - // Set up the watcher and run it in the background. - wt := newWriteToMock(time.Millisecond) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) - watcher.SetMetrics() - watcher.MaxSegment = segmentsToRead + // Set up the watcher and run it in the background. + wt := newWriteToMock(time.Millisecond) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil) + watcher.SetMetrics() + watcher.MaxSegment = segmentsToRead var g errgroup.Group g.Go(func() error { @@ -892,10 +892,11 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { // Wait for the watcher. require.NoError(t, g.Wait()) - // All series and samples were read. - require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. - require.Len(t, wt.samplesAppended, segmentsToRead*seriesCount*samplesCount) - require.NoError(t, w.Close()) - }) + // All series and samples were read. + require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. + require.Len(t, wt.samplesAppended, segmentsToRead*seriesCount*samplesCount) + require.NoError(t, w.Close()) + }) + } } } diff --git a/util/testwal/records.go b/util/testwal/records.go index 5f85e42c3c..1fe5938461 100644 --- a/util/testwal/records.go +++ b/util/testwal/records.go @@ -48,6 +48,8 @@ type RecordsCase struct { // HistogramFn source histogram for histogram and float histogram records. // By default, newTestHist is used (exponential bucketing) HistogramFn func(ref int) *histogram.Histogram + // NoST controls if ref samples should skip generating Start Timestamps. If true, ST is 0. + NoST bool } // Records represents batches of generated WAL records. @@ -118,10 +120,18 @@ func GenerateRecords(c RecordsCase) (ret Records) { Help: fmt.Sprintf("help text for %d", ref), } for j := range c.SamplesPerSeries { + ts := c.TsFn(ref, j) + // Keep ST simple for now; we don't test the exact semantics. + // We can improve later (e.g. STsFN). + sts := ts - 1 + if c.NoST { + sts = 0 + } + ret.Samples[i*c.SamplesPerSeries+j] = record.RefSample{ Ref: chunks.HeadSeriesRef(ref), - T: c.TsFn(ref, j), - V: float64(ref), + ST: sts, T: ts, + V: float64(ref), } } h := c.HistogramFn(ref)