Merge pull request #18220 from prometheus/extended-test

tests(tsdb/wlog): Tighten watcher tail tests
This commit is contained in:
Bartlomiej Plotka 2026-03-05 11:31:04 +01:00 committed by GitHub
commit 9dc782bf2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 239 additions and 173 deletions

View File

@ -781,6 +781,8 @@ func (*Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte {
return buf.Get()
}
// HistogramSamples encode exponential histograms while returning all the excluded custom bucket histograms.
// Callers can encode the returned custom bucket histograms via CustomBucketsHistogramSamples.
func (*Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, []RefHistogramSample) {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(HistogramSamples))
@ -815,6 +817,7 @@ func (*Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]b
return buf.Get(), customBucketHistograms
}
// CustomBucketsHistogramSamples encodes given histograms as custom bucket histograms.
func (*Encoder) CustomBucketsHistogramSamples(histograms []RefHistogramSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(CustomBucketsHistogramSamples))
@ -880,6 +883,8 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) {
}
}
// FloatHistogramSamples encode exponential float histograms while returning all the excluded custom bucket float histograms.
// Callers can encode the returned custom bucket float histograms via CustomBucketsFloatHistogramSamples.
func (*Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, []RefFloatHistogramSample) {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(FloatHistogramSamples))
@ -915,6 +920,7 @@ func (*Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []
return buf.Get(), customBucketsFloatHistograms
}
// CustomBucketsFloatHistogramSamples encodes given float histograms as custom bucket float histograms.
func (*Encoder) CustomBucketsFloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(CustomBucketsFloatHistogramSamples))

View File

@ -29,9 +29,12 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/util/testwal"
)
var (
@ -41,6 +44,7 @@ var (
)
// retry executes f() n times at each interval until it returns true.
// TODO(bwplotka): Replace with require.Eventually.
func retry(t *testing.T, interval time.Duration, n int, f func() bool) {
t.Helper()
ticker := time.NewTicker(interval)
@ -62,51 +66,93 @@ func overwriteReadTimeout(t *testing.T, val time.Duration) {
}
type writeToMock struct {
samplesAppended int
exemplarsAppended int
histogramsAppended int
floatHistogramsAppended int
seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
mu sync.Mutex
seriesStored []record.RefSeries
metadataStored []record.RefMetadata
samplesAppended []record.RefSample
exemplarsAppended []record.RefExemplar
histogramsAppended []record.RefHistogramSample
floatHistogramsAppended []record.RefFloatHistogramSample
seriesStores int
metadataStores int
sampleAppends int
exemplarAppends int
histogramAppends int
floatHistogramsAppends int
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
// If nonzero, delay reads with a short sleep.
delay time.Duration
}
func (wtm *writeToMock) Append(s []record.RefSample) bool {
wtm.mu.Lock()
defer wtm.mu.Unlock()
wtm.sampleAppends++
wtm.samplesAppended = append(wtm.samplesAppended, s...)
time.Sleep(wtm.delay)
wtm.samplesAppended += len(s)
return true
}
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
wtm.mu.Lock()
defer wtm.mu.Unlock()
time.Sleep(wtm.delay)
wtm.exemplarsAppended += len(e)
wtm.exemplarAppends++
wtm.exemplarsAppended = append(wtm.exemplarsAppended, e...)
return true
}
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
wtm.mu.Lock()
defer wtm.mu.Unlock()
time.Sleep(wtm.delay)
wtm.histogramsAppended += len(h)
wtm.histogramAppends++
wtm.histogramsAppended = append(wtm.histogramsAppended, h...)
return true
}
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
wtm.mu.Lock()
defer wtm.mu.Unlock()
time.Sleep(wtm.delay)
wtm.floatHistogramsAppended += len(fh)
wtm.floatHistogramsAppends++
wtm.floatHistogramsAppended = append(wtm.floatHistogramsAppended, fh...)
return true
}
func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.mu.Lock()
defer wtm.mu.Unlock()
wtm.seriesStores++
wtm.seriesStored = append(wtm.seriesStored, series...)
for _, s := range series {
wtm.seriesSegmentIndexes[s.Ref] = index
}
time.Sleep(wtm.delay)
wtm.UpdateSeriesSegment(series, index)
}
func (*writeToMock) StoreMetadata([]record.RefMetadata) { /* no-op */ }
func (wtm *writeToMock) StoreMetadata(meta []record.RefMetadata) {
wtm.mu.Lock()
defer wtm.mu.Unlock()
wtm.metadataStores++
wtm.metadataStored = append(wtm.metadataStored, meta...)
time.Sleep(wtm.delay)
}
func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
wtm.mu.Lock()
defer wtm.mu.Unlock()
for _, s := range series {
wtm.seriesSegmentIndexes[s.Ref] = index
}
@ -115,8 +161,9 @@ func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int
func (wtm *writeToMock) SeriesReset(index int) {
// Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint.
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
wtm.mu.Lock()
defer wtm.mu.Unlock()
for k, v := range wtm.seriesSegmentIndexes {
if v < index {
delete(wtm.seriesSegmentIndexes, k)
@ -125,8 +172,9 @@ func (wtm *writeToMock) SeriesReset(index int) {
}
func (wtm *writeToMock) checkNumSeries() int {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
wtm.mu.Lock()
defer wtm.mu.Unlock()
return len(wtm.seriesSegmentIndexes)
}
@ -137,151 +185,149 @@ func newWriteToMock(delay time.Duration) *writeToMock {
}
}
func TestTailSamples(t *testing.T) {
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
const exemplarsCount = 25
const histogramsCount = 50
func TestWatcher_Tail(t *testing.T) {
const (
pageSize = 32 * 1024
batches = 3
seriesPerBatch = 100
exemplarsPerSeries = 2
)
for _, compress := range compression.Types() {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
now := time.Now()
var (
now = time.Now()
dir = t.TempDir()
wdir = path.Join(dir, "wal")
enc record.Encoder
)
require.NoError(t, os.Mkdir(wdir, 0o777))
dir := t.TempDir()
// Generate test records that represents batches of records data.
// "batch" simulates a single scrape or RW/OTLP receive message.
// Watcher does not inspect the data other than watching start timestamp, so records
// does not need any certain shape.
records := make([]testwal.Records, batches)
cbHistogramRecords := make([]testwal.Records, batches)
for i := range records {
tsFn := func(_, _ int) int64 {
return timestamp.FromTime(now.Add(1 * time.Second))
}
records[i] = testwal.GenerateRecords(testwal.RecordsCase{
RefPadding: i * seriesPerBatch,
TsFn: tsFn,
wdir := path.Join(dir, "wal")
err := os.Mkdir(wdir, 0o777)
require.NoError(t, err)
Series: seriesPerBatch,
SamplesPerSeries: 10,
HistogramsPerSeries: 5,
FloatHistogramsPerSeries: 5,
ExemplarsPerSeries: exemplarsPerSeries,
})
cbHistogramRecords[i] = testwal.GenerateRecords(testwal.RecordsCase{
RefPadding: i * seriesPerBatch,
TsFn: tsFn,
enc := record.Encoder{}
Series: seriesPerBatch,
HistogramsPerSeries: 5,
FloatHistogramsPerSeries: 5,
HistogramFn: func(ref int) *histogram.Histogram {
return &histogram.Histogram{
Schema: -53,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
CustomValues: []float64{float64(ref) + 2},
}
},
})
}
// Create WAL for writing.
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
require.NoError(t, err)
defer func() {
t.Cleanup(func() {
require.NoError(t, w.Close())
}()
// Write to the initial segment then checkpoint.
for i := range seriesCount {
ref := i + 100
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))
for range samplesCount {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
for range exemplarsCount {
inner := rand.Intn(ref + 1)
exemplar := enc.Exemplars([]record.RefExemplar{
{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
V: float64(i),
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)),
},
}, nil)
require.NoError(t, w.Log(exemplar))
}
for range histogramsCount {
inner := rand.Intn(ref + 1)
hist := &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
}
histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: hist,
}}, nil)
require.NoError(t, w.Log(histograms))
customBucketHist := &histogram.Histogram{
Schema: -53,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
CustomValues: []float64{float64(i) + 2},
}
customBucketHistograms := enc.CustomBucketsHistogramSamples([]record.RefHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
H: customBucketHist,
}}, nil)
require.NoError(t, w.Log(customBucketHistograms))
floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
FH: hist.ToFloat(nil),
}}, nil)
require.NoError(t, w.Log(floatHistograms))
customBucketFloatHistograms := enc.CustomBucketsFloatHistogramSamples([]record.RefFloatHistogramSample{{
Ref: chunks.HeadSeriesRef(inner),
T: now.UnixNano() + 1,
FH: customBucketHist.ToFloat(nil),
}}, nil)
require.NoError(t, w.Log(customBucketFloatHistograms))
}
}
// Start read after checkpoint, no more data written.
first, last, err := Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true, true)
watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers.
watcher.SetMetrics()
for i := first; i <= last; i++ {
segment, err := OpenReadSegment(SegmentName(watcher.walDir, i))
require.NoError(t, err)
reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment)
// Use tail true so we can ensure we got the right number of samples.
watcher.readSegment(reader, i, true)
require.NoError(t, segment.Close())
}
expectedSeries := seriesCount
expectedSamples := seriesCount * samplesCount
expectedExemplars := seriesCount * exemplarsCount
expectedHistograms := seriesCount * histogramsCount * 2
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() >= expectedSeries
})
require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series")
require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms")
require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms")
// Start watcher to that reads into a mock.
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true)
// Update the time because we just created samples around "now" time and watcher
// only starts watching after that time.
watcher.SetStartTime(now)
// Start spins up watcher loop in a go-routine.
watcher.Start()
t.Cleanup(watcher.Stop)
// Write to WAL like append commit would do, while watcher is tailing.
// Write first a few samples before the start time, we don't expect those to be appended.
require.NoError(t, w.Log(enc.Samples([]record.RefSample{
{Ref: 1, T: timestamp.FromTime(now), V: 123},
{Ref: 2, T: timestamp.FromTime(now), V: 123.1},
}, nil)))
for i := range records {
// Similar order as tsdb/head_appender.go.headAppenderBase.log
// https://github.com/prometheus/prometheus/blob/1751685dd4f6430757ba3078a96cffeffcb2bb47/tsdb/head_append.go#L1053
require.NoError(t, w.Log(enc.Series(records[i].Series, nil)))
require.NoError(t, w.Log(enc.Metadata(records[i].Metadata, nil)))
require.NoError(t, w.Log(enc.Samples(records[i].Samples, nil)))
hs, cbHs := enc.HistogramSamples(records[i].Histograms, nil)
require.Empty(t, cbHs)
require.NoError(t, w.Log(hs))
fhs, cbFhs := enc.FloatHistogramSamples(records[i].FloatHistograms, nil)
require.Empty(t, cbFhs)
require.NoError(t, w.Log(fhs))
require.NoError(t, w.Log(enc.CustomBucketsHistogramSamples(cbHistogramRecords[i].Histograms, nil)))
require.NoError(t, w.Log(enc.CustomBucketsFloatHistogramSamples(cbHistogramRecords[i].FloatHistograms, nil)))
require.NoError(t, w.Log(enc.Exemplars(records[i].Exemplars, nil)))
// Ping watcher for faster test. Watcher is checking for segment changes or 15s timeout.
watcher.Notify()
}
// Wait for watcher to lead all.
require.Eventually(t, func() bool {
wt.mu.Lock()
defer wt.mu.Unlock()
// Exemplars are logged as the last one, so assert on those.
return wt.exemplarAppends >= batches
}, 2*time.Minute, 1*time.Second)
wt.mu.Lock()
defer wt.mu.Unlock()
require.Equal(t, batches, wt.seriesStores)
require.Equal(t, batches, wt.metadataStores)
require.Equal(t, batches, wt.sampleAppends)
require.Equal(t, 2*batches, wt.histogramAppends)
require.Equal(t, 2*batches, wt.floatHistogramsAppends)
require.Equal(t, batches, wt.exemplarAppends)
for i := range batches {
sector := len(records[i].Series)
testutil.RequireEqual(t, records[i].Series, wt.seriesStored[i*sector:(i+1)*sector], i)
sector = len(records[i].Metadata)
require.Equal(t, records[i].Metadata, wt.metadataStored[i*sector:(i+1)*sector], i)
sector = len(records[i].Samples)
require.Equal(t, records[i].Samples, wt.samplesAppended[i*sector:(i+1)*sector], i)
sector = len(records[i].Histograms) + len(cbHistogramRecords[i].Histograms)
require.Equal(t, records[i].Histograms, wt.histogramsAppended[i*sector:i*sector+len(records[i].Histograms)], i)
require.Equal(t, cbHistogramRecords[i].Histograms, wt.histogramsAppended[i*sector+len(records[i].Histograms):(i+1)*sector])
sector = len(records[i].FloatHistograms) + len(cbHistogramRecords[i].FloatHistograms)
require.Equal(t, records[i].FloatHistograms, wt.floatHistogramsAppended[i*sector:i*sector+len(records[i].FloatHistograms)])
require.Equal(t, cbHistogramRecords[i].FloatHistograms, wt.floatHistogramsAppended[i*sector+len(records[i].FloatHistograms):(i+1)*sector])
sector = len(records[i].Exemplars)
testutil.RequireEqual(t, records[i].Exemplars, wt.exemplarsAppended[i*sector:(i+1)*sector])
}
})
}
}
@ -830,7 +876,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
// All series and samples were read.
require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read.
require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)
require.Len(t, wt.samplesAppended, segmentsToRead*seriesCount*samplesCount)
require.NoError(t, w.Close())
})
}

View File

@ -25,6 +25,8 @@ import (
)
// RecordsCase represents record generation option in a form of a test case.
//
// Generated Series will have refs that monotonic and deterministic, in range of [RefPadding, RefPadding+Series).
type RecordsCase struct {
Name string
@ -36,8 +38,16 @@ type RecordsCase struct {
ExtraLabels []labels.Label
LabelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels
TsFn func(i, j int) int64
// RefPadding represents a padding to add to Series refs.
RefPadding int
// LabelsFn allows injecting custom labels, by default it's a test_metric_%d with ExtraLabels.
LabelsFn func(lb *labels.ScratchBuilder, ref int) labels.Labels
// TsFn allows injecting custom sample timestamps. j represents the sample index within the series.
// By default, it injects j.
TsFn func(ref, j int) int64
// HistogramFn source histogram for histogram and float histogram records.
// By default, newTestHist is used (exponential bucketing)
HistogramFn func(ref int) *histogram.Histogram
}
// Records represents batches of generated WAL records.
@ -90,47 +100,51 @@ func GenerateRecords(c RecordsCase) (ret Records) {
if c.TsFn == nil {
c.TsFn = func(_, j int) int64 { return int64(j) }
}
if c.HistogramFn == nil {
c.HistogramFn = newTestHist
}
lb := labels.NewScratchBuilder(1 + len(c.ExtraLabels))
for i := range ret.Series {
ref := c.RefPadding + i
ret.Series[i] = record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.LabelsFn(&lb, i),
Ref: chunks.HeadSeriesRef(ref),
Labels: c.LabelsFn(&lb, ref),
}
ret.Metadata[i] = record.RefMetadata{
Ref: chunks.HeadSeriesRef(i),
Ref: chunks.HeadSeriesRef(ref),
Type: uint8(record.Counter),
Unit: "unit text",
Help: fmt.Sprintf("help text for %d", i),
Help: fmt.Sprintf("help text for %d", ref),
}
for j := range c.SamplesPerSeries {
ret.Samples[i*c.SamplesPerSeries+j] = record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
V: float64(i),
Ref: chunks.HeadSeriesRef(ref),
T: c.TsFn(ref, j),
V: float64(ref),
}
}
h := newTestHist(i)
h := c.HistogramFn(ref)
for j := range c.HistogramsPerSeries {
ret.Histograms[i*c.HistogramsPerSeries+j] = record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
Ref: chunks.HeadSeriesRef(ref),
T: c.TsFn(ref, j),
H: h,
}
}
for j := range c.FloatHistogramsPerSeries {
ret.FloatHistograms[i*c.FloatHistogramsPerSeries+j] = record.RefFloatHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
Ref: chunks.HeadSeriesRef(ref),
T: c.TsFn(ref, j),
FH: h.ToFloat(nil),
}
}
for j := range c.ExemplarsPerSeries {
ret.Exemplars[i*c.ExemplarsPerSeries+j] = record.RefExemplar{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
V: float64(i),
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
Ref: chunks.HeadSeriesRef(ref),
T: c.TsFn(ref, j),
V: float64(ref),
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", ref)),
}
}
}