post merge conflict fixes

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-03-12 08:28:45 +00:00
parent c133a969af
commit 3cf43337dc
3 changed files with 170 additions and 159 deletions

View File

@ -608,7 +608,7 @@ func TestReshardPartialBatch(t *testing.T) {
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
recs := testwal.GenerateRecords(recCase{
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
Series: 1, SamplesPerSeries: 10,
})
@ -656,8 +656,8 @@ func TestQueueFilledDeadlock(t *testing.T) {
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
recs := testwal.GenerateRecords(recCase{
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
Series: 50, SamplesPerSeries: 1
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
Series: 50, SamplesPerSeries: 1,
})
c := NewNopWriteClient()
@ -1920,7 +1920,7 @@ func TestDropOldTimeSeries(t *testing.T) {
nSamples := config.DefaultQueueConfig.Capacity * size
noST := protoMsg == remoteapi.WriteV1MessageType // RW1
pastRecs := testwal.GenerateRecords(recCase{
NoST: noST,
NoST: noST,
Series: nSeries,
SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
TsFn: func(_, j int) int64 {
@ -1929,7 +1929,7 @@ func TestDropOldTimeSeries(t *testing.T) {
},
})
newRecs := testwal.GenerateRecords(recCase{
NoST: noST,
NoST: noST,
Series: nSeries,
SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
TsFn: func(_, j int) int64 {
@ -2004,7 +2004,7 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
r := rand.New(rand.NewSource(99))
recs := testwal.GenerateRecords(recCase{
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
Series: numberOfSeries,
SamplesPerSeries: 1,
TsFn: func(_, _ int) int64 {
@ -2031,8 +2031,8 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
tsID := getSeriesIDFromRef(recs.Series[s.Ref])
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], writev2.Sample{
StartTimestamp: s.ST,
Timestamp: s.T,
Value: s.V,
Timestamp: s.T,
Value: s.V,
})
}
}
@ -2554,7 +2554,7 @@ func TestHighestTimestampOnAppend(t *testing.T) {
nSamples := 11 * config.DefaultQueueConfig.Capacity
nSeries := 3
recs := testwal.GenerateRecords(recCase{
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
NoST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST.
Series: nSeries, SamplesPerSeries: nSamples / nSeries,
})

View File

@ -194,145 +194,145 @@ func TestWatcher_Tail(t *testing.T) {
exemplarsPerSeries = 2
)
for _, enableSTStorage := range []bool{false, true} {
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s/stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
var (
now = time.Now()
dir = t.TempDir()
wdir = path.Join(dir, "wal")
enc = record.Encoder{EnableSTStorage: enableSTStorage}
)
require.NoError(t, os.Mkdir(wdir, 0o777))
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s/stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
var (
now = time.Now()
dir = t.TempDir()
wdir = path.Join(dir, "wal")
enc = record.Encoder{EnableSTStorage: enableSTStorage}
)
require.NoError(t, os.Mkdir(wdir, 0o777))
// Generate test records that represents batches of records data.
// "batch" simulates a single scrape or RW/OTLP receive message.
// Watcher does not inspect the data other than watching start timestamp, so records
// does not need any certain shape.
records := make([]testwal.Records, batches)
cbHistogramRecords := make([]testwal.Records, batches)
for i := range records {
tsFn := func(_, _ int) int64 {
return timestamp.FromTime(now.Add(1 * time.Second))
// Generate test records that represents batches of records data.
// "batch" simulates a single scrape or RW/OTLP receive message.
// Watcher does not inspect the data other than watching start timestamp, so records
// does not need any certain shape.
records := make([]testwal.Records, batches)
cbHistogramRecords := make([]testwal.Records, batches)
for i := range records {
tsFn := func(_, _ int) int64 {
return timestamp.FromTime(now.Add(1 * time.Second))
}
records[i] = testwal.GenerateRecords(testwal.RecordsCase{
NoST: !enableSTStorage,
RefPadding: i * seriesPerBatch,
TsFn: tsFn,
Series: seriesPerBatch,
SamplesPerSeries: 10,
HistogramsPerSeries: 5,
FloatHistogramsPerSeries: 5,
ExemplarsPerSeries: exemplarsPerSeries,
})
cbHistogramRecords[i] = testwal.GenerateRecords(testwal.RecordsCase{
NoST: !enableSTStorage,
RefPadding: i * seriesPerBatch,
TsFn: tsFn,
Series: seriesPerBatch,
HistogramsPerSeries: 5,
FloatHistogramsPerSeries: 5,
HistogramFn: func(ref int) *histogram.Histogram {
return &histogram.Histogram{
Schema: -53,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
CustomValues: []float64{float64(ref) + 2},
}
},
})
}
records[i] = testwal.GenerateRecords(testwal.RecordsCase{
NoST: !enableSTStorage,
RefPadding: i * seriesPerBatch,
TsFn: tsFn,
Series: seriesPerBatch,
SamplesPerSeries: 10,
HistogramsPerSeries: 5,
FloatHistogramsPerSeries: 5,
ExemplarsPerSeries: exemplarsPerSeries,
// Create WAL for writing.
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, w.Close())
})
cbHistogramRecords[i] = testwal.GenerateRecords(testwal.RecordsCase{
NoST: !enableSTStorage,
RefPadding: i * seriesPerBatch,
TsFn: tsFn,
Series: seriesPerBatch,
HistogramsPerSeries: 5,
FloatHistogramsPerSeries: 5,
HistogramFn: func(ref int) *histogram.Histogram {
return &histogram.Histogram{
Schema: -53,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
CustomValues: []float64{float64(ref) + 2},
}
},
})
}
// Start watcher to that reads into a mock.
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true, nil)
// Update the time because we just created samples around "now" time and watcher
// only starts watching after that time.
watcher.SetStartTime(now)
// Start spins up watcher loop in a go-routine.
watcher.Start()
t.Cleanup(watcher.Stop)
// Create WAL for writing.
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, w.Close())
})
// Write to WAL like append commit would do, while watcher is tailing.
// Start watcher to that reads into a mock.
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true, nil)
// Update the time because we just created samples around "now" time and watcher
// only starts watching after that time.
watcher.SetStartTime(now)
// Start spins up watcher loop in a go-routine.
watcher.Start()
t.Cleanup(watcher.Stop)
// Write first a few samples before the start time, we don't expect those to be appended.
require.NoError(t, w.Log(enc.Samples([]record.RefSample{
{Ref: 1, T: timestamp.FromTime(now), V: 123},
{Ref: 2, T: timestamp.FromTime(now), V: 123.1},
}, nil)))
// Write to WAL like append commit would do, while watcher is tailing.
for i := range records {
// Similar order as tsdb/head_appender.go.headAppenderBase.log
// https://github.com/prometheus/prometheus/blob/1751685dd4f6430757ba3078a96cffeffcb2bb47/tsdb/head_append.go#L1053
require.NoError(t, w.Log(enc.Series(records[i].Series, nil)))
require.NoError(t, w.Log(enc.Metadata(records[i].Metadata, nil)))
require.NoError(t, w.Log(enc.Samples(records[i].Samples, nil)))
// Write first a few samples before the start time, we don't expect those to be appended.
require.NoError(t, w.Log(enc.Samples([]record.RefSample{
{Ref: 1, T: timestamp.FromTime(now), V: 123},
{Ref: 2, T: timestamp.FromTime(now), V: 123.1},
}, nil)))
hs, cbHs := enc.HistogramSamples(records[i].Histograms, nil)
require.Empty(t, cbHs)
require.NoError(t, w.Log(hs))
fhs, cbFhs := enc.FloatHistogramSamples(records[i].FloatHistograms, nil)
require.Empty(t, cbFhs)
require.NoError(t, w.Log(fhs))
require.NoError(t, w.Log(enc.CustomBucketsHistogramSamples(cbHistogramRecords[i].Histograms, nil)))
require.NoError(t, w.Log(enc.CustomBucketsFloatHistogramSamples(cbHistogramRecords[i].FloatHistograms, nil)))
for i := range records {
// Similar order as tsdb/head_appender.go.headAppenderBase.log
// https://github.com/prometheus/prometheus/blob/1751685dd4f6430757ba3078a96cffeffcb2bb47/tsdb/head_append.go#L1053
require.NoError(t, w.Log(enc.Series(records[i].Series, nil)))
require.NoError(t, w.Log(enc.Metadata(records[i].Metadata, nil)))
require.NoError(t, w.Log(enc.Samples(records[i].Samples, nil)))
require.NoError(t, w.Log(enc.Exemplars(records[i].Exemplars, nil)))
hs, cbHs := enc.HistogramSamples(records[i].Histograms, nil)
require.Empty(t, cbHs)
require.NoError(t, w.Log(hs))
fhs, cbFhs := enc.FloatHistogramSamples(records[i].FloatHistograms, nil)
require.Empty(t, cbFhs)
require.NoError(t, w.Log(fhs))
require.NoError(t, w.Log(enc.CustomBucketsHistogramSamples(cbHistogramRecords[i].Histograms, nil)))
require.NoError(t, w.Log(enc.CustomBucketsFloatHistogramSamples(cbHistogramRecords[i].FloatHistograms, nil)))
// Ping watcher for faster test. Watcher is checking for segment changes or 15s timeout.
watcher.Notify()
}
require.NoError(t, w.Log(enc.Exemplars(records[i].Exemplars, nil)))
// Wait for watcher to lead all.
require.Eventually(t, func() bool {
wt.mu.Lock()
defer wt.mu.Unlock()
// Ping watcher for faster test. Watcher is checking for segment changes or 15s timeout.
watcher.Notify()
}
// Exemplars are logged as the last one, so assert on those.
return wt.exemplarAppends >= batches
}, 2*time.Minute, 1*time.Second)
// Wait for watcher to lead all.
require.Eventually(t, func() bool {
wt.mu.Lock()
defer wt.mu.Unlock()
// Exemplars are logged as the last one, so assert on those.
return wt.exemplarAppends >= batches
}, 2*time.Minute, 1*time.Second)
require.Equal(t, batches, wt.seriesStores)
require.Equal(t, batches, wt.metadataStores)
require.Equal(t, batches, wt.sampleAppends)
require.Equal(t, 2*batches, wt.histogramAppends)
require.Equal(t, 2*batches, wt.floatHistogramsAppends)
require.Equal(t, batches, wt.exemplarAppends)
wt.mu.Lock()
defer wt.mu.Unlock()
for i := range batches {
sector := len(records[i].Series)
testutil.RequireEqual(t, records[i].Series, wt.seriesStored[i*sector:(i+1)*sector], i)
sector = len(records[i].Metadata)
require.Equal(t, records[i].Metadata, wt.metadataStored[i*sector:(i+1)*sector], i)
sector = len(records[i].Samples)
require.Equal(t, records[i].Samples, wt.samplesAppended[i*sector:(i+1)*sector], i)
require.Equal(t, batches, wt.seriesStores)
require.Equal(t, batches, wt.metadataStores)
require.Equal(t, batches, wt.sampleAppends)
require.Equal(t, 2*batches, wt.histogramAppends)
require.Equal(t, 2*batches, wt.floatHistogramsAppends)
require.Equal(t, batches, wt.exemplarAppends)
sector = len(records[i].Histograms) + len(cbHistogramRecords[i].Histograms)
require.Equal(t, records[i].Histograms, wt.histogramsAppended[i*sector:i*sector+len(records[i].Histograms)], i)
require.Equal(t, cbHistogramRecords[i].Histograms, wt.histogramsAppended[i*sector+len(records[i].Histograms):(i+1)*sector])
sector = len(records[i].FloatHistograms) + len(cbHistogramRecords[i].FloatHistograms)
require.Equal(t, records[i].FloatHistograms, wt.floatHistogramsAppended[i*sector:i*sector+len(records[i].FloatHistograms)])
require.Equal(t, cbHistogramRecords[i].FloatHistograms, wt.floatHistogramsAppended[i*sector+len(records[i].FloatHistograms):(i+1)*sector])
for i := range batches {
sector := len(records[i].Series)
testutil.RequireEqual(t, records[i].Series, wt.seriesStored[i*sector:(i+1)*sector], i)
sector = len(records[i].Metadata)
require.Equal(t, records[i].Metadata, wt.metadataStored[i*sector:(i+1)*sector], i)
sector = len(records[i].Samples)
require.Equal(t, records[i].Samples, wt.samplesAppended[i*sector:(i+1)*sector], i)
sector = len(records[i].Histograms) + len(cbHistogramRecords[i].Histograms)
require.Equal(t, records[i].Histograms, wt.histogramsAppended[i*sector:i*sector+len(records[i].Histograms)], i)
require.Equal(t, cbHistogramRecords[i].Histograms, wt.histogramsAppended[i*sector+len(records[i].Histograms):(i+1)*sector])
sector = len(records[i].FloatHistograms) + len(cbHistogramRecords[i].FloatHistograms)
require.Equal(t, records[i].FloatHistograms, wt.floatHistogramsAppended[i*sector:i*sector+len(records[i].FloatHistograms)])
require.Equal(t, cbHistogramRecords[i].FloatHistograms, wt.floatHistogramsAppended[i*sector+len(records[i].FloatHistograms):(i+1)*sector])
sector = len(records[i].Exemplars)
testutil.RequireEqual(t, records[i].Exemplars, wt.exemplarsAppended[i*sector:(i+1)*sector])
}
})
}
sector = len(records[i].Exemplars)
testutil.RequireEqual(t, records[i].Exemplars, wt.exemplarsAppended[i*sector:(i+1)*sector])
}
})
}
}
}
@ -390,9 +390,9 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
expected := seriesCount
require.Eventually(t, func() bool {
@ -478,12 +478,12 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
}
}
_, _, err = Segments(w.Dir())
require.NoError(t, err)
overwriteReadTimeout(t, time.Second)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
_, _, err = Segments(w.Dir())
require.NoError(t, err)
overwriteReadTimeout(t, time.Second)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
expected := seriesCount * 2
@ -554,9 +554,9 @@ func TestReadCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
go watcher.Start()
expectedSeries := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
@ -625,9 +625,9 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
require.NoError(t, err)
}
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.MaxSegment = -1
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers.
watcher.SetMetrics()
@ -705,7 +705,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
overwriteReadTimeout(t, time.Second)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, subdir, false, false, false, nil)
watcher.MaxSegment = -1
go watcher.Start()
@ -784,9 +784,9 @@ func TestRun_StartupTime(t *testing.T) {
}
require.NoError(t, w.Close())
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.MaxSegment = segments
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.MaxSegment = segments
watcher.SetMetrics()
startTime := time.Now()
@ -856,11 +856,11 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
// Create 00000001, the watcher will tail it once started.
w.NextSegment()
// Set up the watcher and run it in the background.
wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.SetMetrics()
watcher.MaxSegment = segmentsToRead
// Set up the watcher and run it in the background.
wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
watcher.SetMetrics()
watcher.MaxSegment = segmentsToRead
var g errgroup.Group
g.Go(func() error {
@ -892,10 +892,11 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
// Wait for the watcher.
require.NoError(t, g.Wait())
// All series and samples were read.
require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read.
require.Len(t, wt.samplesAppended, segmentsToRead*seriesCount*samplesCount)
require.NoError(t, w.Close())
})
// All series and samples were read.
require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read.
require.Len(t, wt.samplesAppended, segmentsToRead*seriesCount*samplesCount)
require.NoError(t, w.Close())
})
}
}
}

View File

@ -48,6 +48,8 @@ type RecordsCase struct {
// HistogramFn source histogram for histogram and float histogram records.
// By default, newTestHist is used (exponential bucketing)
HistogramFn func(ref int) *histogram.Histogram
// NoST controls if ref samples should skip generating Start Timestamps. If true, ST is 0.
NoST bool
}
// Records represents batches of generated WAL records.
@ -118,10 +120,18 @@ func GenerateRecords(c RecordsCase) (ret Records) {
Help: fmt.Sprintf("help text for %d", ref),
}
for j := range c.SamplesPerSeries {
ts := c.TsFn(ref, j)
// Keep ST simple for now; we don't test the exact semantics.
// We can improve later (e.g. STsFN).
sts := ts - 1
if c.NoST {
sts = 0
}
ret.Samples[i*c.SamplesPerSeries+j] = record.RefSample{
Ref: chunks.HeadSeriesRef(ref),
T: c.TsFn(ref, j),
V: float64(ref),
ST: sts, T: ts,
V: float64(ref),
}
}
h := c.HistogramFn(ref)