Merge pull request #18218 from prometheus/utilrecord

tests(util/testwal): Move WAL record generation to separate package for reuse
This commit is contained in:
Bartlomiej Plotka 2026-03-05 10:07:57 +01:00 committed by GitHub
commit d448f3f970
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 263 additions and 211 deletions

View File

@ -52,10 +52,13 @@ import (
"github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/testutil"
"github.com/prometheus/prometheus/util/testutil/synctest"
"github.com/prometheus/prometheus/util/testwal"
)
const defaultFlushDeadline = 1 * time.Minute
type recCase = testwal.RecordsCase
func newHighestTimestampMetric() *maxTimestamp {
return &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
@ -140,7 +143,7 @@ func TestBasicContentNegotiation(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close()
recs := generateRecords(recCase{series: 1, samplesPerSeries: 1})
recs := testwal.GenerateRecords(recCase{Series: 1, SamplesPerSeries: 1})
conf.RemoteWriteConfigs[0].ProtobufMessage = tc.senderProtoMsg
require.NoError(t, s.ApplyConfig(conf))
@ -152,18 +155,18 @@ func TestBasicContentNegotiation(t *testing.T) {
c.injectErrors(tc.injectErrs)
qm.SetClient(c)
qm.StoreSeries(recs.series, 0)
qm.StoreMetadata(recs.metadata)
qm.StoreSeries(recs.Series, 0)
qm.StoreMetadata(recs.Metadata)
// Do we expect some data back?
if !tc.expectFail {
c.expectSamples(recs.samples, recs.series)
c.expectSamples(recs.Samples, recs.Series)
} else {
c.expectSamples(nil, nil)
}
// Schedule send.
qm.Append(recs.samples)
qm.Append(recs.Samples)
if !tc.expectFail {
// No error expected, so wait for data.
@ -211,26 +214,26 @@ func TestSampleDelivery(t *testing.T) {
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
for _, rc := range []recCase{
{series: n, samplesPerSeries: n, histogramsPerSeries: 0, floatHistogramsPerSeries: 0, exemplarsPerSeries: 0, name: "samples only"},
{series: n, samplesPerSeries: 0, histogramsPerSeries: n, floatHistogramsPerSeries: 0, exemplarsPerSeries: 0, name: "histograms only"},
{series: n, samplesPerSeries: 0, histogramsPerSeries: 0, floatHistogramsPerSeries: n, exemplarsPerSeries: 0, name: "float histograms only"},
{series: n, samplesPerSeries: 0, histogramsPerSeries: 0, floatHistogramsPerSeries: 0, exemplarsPerSeries: n, name: "exemplars only"},
{series: n, samplesPerSeries: n, histogramsPerSeries: n, floatHistogramsPerSeries: n, exemplarsPerSeries: n, name: "all"},
{Series: n, SamplesPerSeries: n, HistogramsPerSeries: 0, FloatHistogramsPerSeries: 0, ExemplarsPerSeries: 0, Name: "samples only"},
{Series: n, SamplesPerSeries: 0, HistogramsPerSeries: n, FloatHistogramsPerSeries: 0, ExemplarsPerSeries: 0, Name: "histograms only"},
{Series: n, SamplesPerSeries: 0, HistogramsPerSeries: 0, FloatHistogramsPerSeries: n, ExemplarsPerSeries: 0, Name: "float histograms only"},
{Series: n, SamplesPerSeries: 0, HistogramsPerSeries: 0, FloatHistogramsPerSeries: 0, ExemplarsPerSeries: n, Name: "exemplars only"},
{Series: n, SamplesPerSeries: n, HistogramsPerSeries: n, FloatHistogramsPerSeries: n, ExemplarsPerSeries: n, Name: "all"},
} {
t.Run(fmt.Sprintf("proto=%s/case=%s", protoMsg, rc.name), func(t *testing.T) {
t.Run(fmt.Sprintf("proto=%s/case=%s", protoMsg, rc.Name), func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close()
recs := generateRecords(rc)
recs := testwal.GenerateRecords(rc)
var (
series = recs.series
metadata = recs.metadata
samples = recs.samples
exemplars = recs.exemplars
histograms = recs.histograms
floatHistograms = recs.floatHistograms
series = recs.Series
metadata = recs.Metadata
samples = recs.Samples
exemplars = recs.Exemplars
histograms = recs.Histograms
floatHistograms = recs.FloatHistograms
)
// Apply new config.
@ -359,7 +362,7 @@ func TestWALMetadataDelivery(t *testing.T) {
}
n := 3
recs := generateRecords(recCase{series: n, samplesPerSeries: n})
recs := testwal.GenerateRecords(recCase{Series: n, SamplesPerSeries: n})
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
@ -369,15 +372,15 @@ func TestWALMetadataDelivery(t *testing.T) {
c := NewTestWriteClient(remoteapi.WriteV2MessageType)
qm.SetClient(c)
qm.StoreSeries(recs.series, 0)
qm.StoreMetadata(recs.metadata)
qm.StoreSeries(recs.Series, 0)
qm.StoreMetadata(recs.Metadata)
require.Len(t, qm.seriesLabels, n)
require.Len(t, qm.seriesMetadata, n)
c.expectSamples(recs.samples, recs.series)
c.expectMetadataForBatch(recs.metadata, recs.series, recs.samples, nil, nil, nil)
qm.Append(recs.samples)
c.expectSamples(recs.Samples, recs.Series)
c.expectMetadataForBatch(recs.Metadata, recs.Series, recs.Samples, nil, nil, nil)
qm.Append(recs.Samples)
c.waitForExpectedData(t, 30*time.Second)
}
@ -385,24 +388,24 @@ func TestSampleDeliveryTimeout(t *testing.T) {
t.Parallel()
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
recs := generateRecords(recCase{series: 10, samplesPerSeries: 10})
recs := testwal.GenerateRecords(recCase{Series: 10, SamplesPerSeries: 10})
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
m.StoreSeries(recs.series, 0)
m.StoreSeries(recs.Series, 0)
m.Start()
defer m.Stop()
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(recs.samples, recs.series)
m.Append(recs.samples)
c.expectSamples(recs.Samples, recs.Series)
m.Append(recs.Samples)
c.waitForExpectedData(t, 30*time.Second)
c.expectSamples(recs.samples, recs.series)
m.Append(recs.samples)
c.expectSamples(recs.Samples, recs.Series)
m.Append(recs.Samples)
c.waitForExpectedData(t, 30*time.Second)
})
}
@ -414,16 +417,16 @@ func TestSampleDeliveryOrder(t *testing.T) {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
recs := generateRecords(recCase{series: n, samplesPerSeries: 1})
recs := testwal.GenerateRecords(recCase{Series: n, SamplesPerSeries: 1})
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
c.expectSamples(recs.samples, recs.series)
m.StoreSeries(recs.series, 0)
c.expectSamples(recs.Samples, recs.Series)
m.StoreSeries(recs.Series, 0)
m.Start()
defer m.Stop()
// These should be received by the client.
m.Append(recs.samples)
m.Append(recs.Samples)
c.waitForExpectedData(t, 30*time.Second)
})
}
@ -443,13 +446,13 @@ func TestShutdown(t *testing.T) {
m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg)
// Send 2x batch size, so we know it will need at least two sends.
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
recs := generateRecords(recCase{series: n / 1000, samplesPerSeries: 1000})
m.StoreSeries(recs.series, 0)
recs := testwal.GenerateRecords(recCase{Series: n / 1000, SamplesPerSeries: 1000})
m.StoreSeries(recs.Series, 0)
m.Start()
// Append blocks to guarantee delivery, so we do it in the background.
go func() {
m.Append(recs.samples)
m.Append(recs.Samples)
}()
synctest.Wait()
@ -512,29 +515,29 @@ func TestReshard(t *testing.T) {
size := 10 // Make bigger to find more races.
nSeries := 6
samplesPerSeries := config.DefaultQueueConfig.Capacity * size
recs := generateRecords(recCase{series: nSeries, samplesPerSeries: samplesPerSeries})
t.Logf("about to send %v samples", len(recs.samples))
recs := testwal.GenerateRecords(recCase{Series: nSeries, SamplesPerSeries: samplesPerSeries})
t.Logf("about to send %v samples", len(recs.Samples))
cfg := config.DefaultQueueConfig
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg)
c.expectSamples(recs.samples, recs.series)
m.StoreSeries(recs.series, 0)
c.expectSamples(recs.Samples, recs.Series)
m.StoreSeries(recs.Series, 0)
m.Start()
defer m.Stop()
go func() {
for i := 0; i < len(recs.samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(recs.samples[i : i+config.DefaultQueueConfig.Capacity])
for i := 0; i < len(recs.Samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(recs.Samples[i : i+config.DefaultQueueConfig.Capacity])
require.True(t, sent, "samples not sent")
time.Sleep(100 * time.Millisecond)
}
}()
for i := 1; i < len(recs.samples)/config.DefaultQueueConfig.Capacity; i++ {
for i := 1; i < len(recs.Samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
time.Sleep(100 * time.Millisecond)
@ -588,7 +591,7 @@ func TestReshardPartialBatch(t *testing.T) {
t.Parallel()
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
recs := generateRecords(recCase{series: 1, samplesPerSeries: 10})
recs := testwal.GenerateRecords(recCase{Series: 1, SamplesPerSeries: 10})
c := NewTestBlockedWriteClient()
@ -600,14 +603,14 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
m.StoreSeries(recs.series, 0)
m.StoreSeries(recs.Series, 0)
m.Start()
for range 100 {
done := make(chan struct{})
go func() {
m.Append(recs.samples)
m.Append(recs.Samples)
time.Sleep(batchSendDeadline)
m.shards.stop()
m.shards.start(1)
@ -633,7 +636,7 @@ func TestReshardPartialBatch(t *testing.T) {
func TestQueueFilledDeadlock(t *testing.T) {
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
recs := generateRecords(recCase{series: 50, samplesPerSeries: 1})
recs := testwal.GenerateRecords(recCase{Series: 50, SamplesPerSeries: 1})
c := NewNopWriteClient()
@ -647,7 +650,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
m.StoreSeries(recs.series, 0)
m.StoreSeries(recs.Series, 0)
m.Start()
defer m.Stop()
@ -655,7 +658,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
done := make(chan struct{})
go func() {
time.Sleep(batchSendDeadline)
m.Append(recs.samples)
m.Append(recs.Samples)
done <- struct{}{}
}()
select {
@ -745,7 +748,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
defer onStoreCalled()
var (
recs = generateRecords(recCase{series: 100, samplesPerSeries: 100})
recs = testwal.GenerateRecords(recCase{Series: 100, SamplesPerSeries: 100})
cfg = config.DefaultQueueConfig
mcfg = config.DefaultMetadataConfig
@ -768,14 +771,14 @@ func TestDisableReshardOnRetry(t *testing.T) {
)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType)
m.StoreSeries(recs.series, 0)
m.StoreSeries(recs.Series, 0)
// Attempt to samples while the manager is running. We immediately stop the
// manager after the recoverable error is generated to prevent the manager
// from resharding itself.
m.Start()
{
m.Append(recs.samples)
m.Append(recs.Samples)
select {
case <-onStoredContext.Done():
@ -801,134 +804,6 @@ func TestDisableReshardOnRetry(t *testing.T) {
}, time.Minute, retryAfter, "shouldReshard should have been re-enabled")
}
type recCase struct {
name string
series int
samplesPerSeries int
histogramsPerSeries int
floatHistogramsPerSeries int
exemplarsPerSeries int
extraLabels []labels.Label
labelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels
tsFn func(i, j int) int64
}
type records struct {
series []record.RefSeries
samples []record.RefSample
histograms []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
exemplars []record.RefExemplar
metadata []record.RefMetadata
}
func newTestHist(i int) *histogram.Histogram {
return &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
}
}
func generateRecords(c recCase) (ret records) {
ret.series = make([]record.RefSeries, c.series)
ret.metadata = make([]record.RefMetadata, c.series)
ret.samples = make([]record.RefSample, c.series*c.samplesPerSeries)
ret.histograms = make([]record.RefHistogramSample, c.series*c.histogramsPerSeries)
ret.floatHistograms = make([]record.RefFloatHistogramSample, c.series*c.floatHistogramsPerSeries)
ret.exemplars = make([]record.RefExemplar, c.series*c.exemplarsPerSeries)
if c.labelsFn == nil {
c.labelsFn = func(lb *labels.ScratchBuilder, i int) labels.Labels {
// Create series with labels that contains name of series plus any extra labels supplied.
name := fmt.Sprintf("test_metric_%d", i)
lb.Reset()
lb.Add(model.MetricNameLabel, name)
for _, l := range c.extraLabels {
lb.Add(l.Name, l.Value)
}
lb.Sort()
return lb.Labels()
}
}
if c.tsFn == nil {
c.tsFn = func(_, j int) int64 { return int64(j) }
}
lb := labels.NewScratchBuilder(1 + len(c.extraLabels))
for i := range ret.series {
ret.series[i] = record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.labelsFn(&lb, i),
}
ret.metadata[i] = record.RefMetadata{
Ref: chunks.HeadSeriesRef(i),
Type: uint8(record.Counter),
Unit: "unit text",
Help: "help text",
}
for j := range c.samplesPerSeries {
ret.samples[i*c.samplesPerSeries+j] = record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: c.tsFn(i, j),
V: float64(i),
}
}
h := newTestHist(i)
for j := range c.histogramsPerSeries {
ret.histograms[i*c.histogramsPerSeries+j] = record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.tsFn(i, j),
H: h,
}
}
for j := range c.floatHistogramsPerSeries {
ret.floatHistograms[i*c.floatHistogramsPerSeries+j] = record.RefFloatHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.tsFn(i, j),
FH: h.ToFloat(nil),
}
}
for j := range c.exemplarsPerSeries {
ret.exemplars[i*c.exemplarsPerSeries+j] = record.RefExemplar{
Ref: chunks.HeadSeriesRef(i),
T: c.tsFn(i, j),
V: float64(i),
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
}
}
}
return ret
}
// BenchmarkGenerateRecords checks data generator performance.
// Recommended CLI:
/*
export bench=genRecs && go test ./storage/remote/... \
-run '^$' -bench '^BenchmarkGenerateRecords' \
-benchtime 1s -count 6 -cpu 2 -timeout 999m -benchmem \
| tee ${bench}.txt
*/
func BenchmarkGenerateRecords(b *testing.B) {
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
// This will generate 16M samples and 4k series.
generateRecords(recCase{series: n, samplesPerSeries: n})
}
}
func createProtoTimeseriesWithOld(numSamples, baseTs int64) []prompb.TimeSeries {
samples := make([]prompb.TimeSeries, numSamples)
// use a fixed rand source so tests are consistent
@ -1395,7 +1270,7 @@ func BenchmarkSampleSend(b *testing.B) {
const numSamples = 1
const numSeries = 10000
recs := generateRecords(recCase{series: numSeries, samplesPerSeries: numSamples, extraLabels: extraLabels})
recs := testwal.GenerateRecords(recCase{Series: numSeries, SamplesPerSeries: numSamples, ExtraLabels: extraLabels})
c := NewNopWriteClient()
@ -1409,7 +1284,7 @@ func BenchmarkSampleSend(b *testing.B) {
for _, format := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
b.Run(string(format), func(b *testing.B) {
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, format)
m.StoreSeries(recs.series, 0)
m.StoreSeries(recs.Series, 0)
// These should be received by the client.
m.Start()
@ -1417,8 +1292,8 @@ func BenchmarkSampleSend(b *testing.B) {
b.ResetTimer()
for i := 0; b.Loop(); i++ {
m.Append(recs.samples)
m.UpdateSeriesSegment(recs.series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
m.Append(recs.Samples)
m.UpdateSeriesSegment(recs.Series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
m.SeriesReset(i + 1)
}
// Do not include shutdown
@ -1460,7 +1335,7 @@ func BenchmarkStoreSeries(b *testing.B) {
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager.
const numSeries = 1000
recs := generateRecords(recCase{series: numSeries, samplesPerSeries: 0, extraLabels: extraLabels})
recs := testwal.GenerateRecords(recCase{Series: numSeries, SamplesPerSeries: 0, ExtraLabels: extraLabels})
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
@ -1475,7 +1350,7 @@ func BenchmarkStoreSeries(b *testing.B) {
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
m.StoreSeries(recs.series, 0)
m.StoreSeries(recs.Series, 0)
}
})
}
@ -1985,25 +1860,25 @@ func TestDropOldTimeSeries(t *testing.T) {
size := 10
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
pastRecs := generateRecords(recCase{
series: nSeries,
samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
tsFn: func(_, j int) int64 {
pastRecs := testwal.GenerateRecords(recCase{
Series: nSeries,
SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
TsFn: func(_, j int) int64 {
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute))
return past + int64(j)
},
})
newRecs := generateRecords(recCase{
series: nSeries,
samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
tsFn: func(_, j int) int64 {
newRecs := testwal.GenerateRecords(recCase{
Series: nSeries,
SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
TsFn: func(_, j int) int64 {
return time.Now().UnixMilli() + int64(j)
},
})
series := pastRecs.series // Series is the same for both old and new.
newSamples := newRecs.samples
samples := append(pastRecs.samples, newRecs.samples...)
series := pastRecs.Series // Series is the same for both old and new.
newSamples := newRecs.Samples
samples := append(pastRecs.Samples, newRecs.Samples...)
c := NewTestWriteClient(protoMsg)
c.expectSamples(newSamples, series)
@ -2067,13 +1942,13 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
// Use a fixed rand source so tests are consistent.
r := rand.New(rand.NewSource(99))
recs := generateRecords(recCase{
series: numberOfSeries,
samplesPerSeries: 1,
tsFn: func(_, _ int) int64 {
recs := testwal.GenerateRecords(recCase{
Series: numberOfSeries,
SamplesPerSeries: 1,
TsFn: func(_, _ int) int64 {
return time.Now().Add(timeAdd).UnixMilli()
},
labelsFn: func(lb *labels.ScratchBuilder, i int) labels.Labels {
LabelsFn: func(lb *labels.ScratchBuilder, i int) labels.Labels {
lb.Reset()
labelsCount := r.Intn(maxLabels)
lb.Add("__name__", "batch_"+strconv.Itoa(batchID)+"_id_"+strconv.Itoa(i))
@ -2086,12 +1961,12 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
},
})
m.StoreSeries(recs.series, batchID)
sent := m.Append(recs.samples)
m.StoreSeries(recs.Series, batchID)
sent := m.Append(recs.Samples)
require.True(t, sent, "samples not sent")
if !shouldBeDropped {
for _, s := range recs.samples {
tsID := getSeriesIDFromRef(recs.series[s.Ref])
for _, s := range recs.Samples {
tsID := getSeriesIDFromRef(recs.Series[s.Ref])
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
Timestamp: s.T,
Value: s.V,
@ -2615,7 +2490,7 @@ func TestHighestTimestampOnAppend(t *testing.T) {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
nSamples := 11 * config.DefaultQueueConfig.Capacity
nSeries := 3
recs := generateRecords(recCase{series: nSeries, samplesPerSeries: nSamples / nSeries})
recs := testwal.GenerateRecords(recCase{Series: nSeries, SamplesPerSeries: nSamples / nSeries})
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
m.Start()
@ -2623,11 +2498,11 @@ func TestHighestTimestampOnAppend(t *testing.T) {
require.Equal(t, 0.0, m.metrics.highestTimestamp.Get())
m.StoreSeries(recs.series, 0)
require.True(t, m.Append(recs.samples))
m.StoreSeries(recs.Series, 0)
require.True(t, m.Append(recs.Samples))
// Check that Append sets the highest timestamp correctly.
// NOTE: generateRecords yields nSamples/nSeries samples (36666), with <i for samplesPerSeries> timestamp.
// NOTE: testwal.GenerateRecords yields nSamples/nSeries samples (36666), with <i for samplesPerSeries> timestamp.
// This gives the highest timestamp of 36666/1000 (seconds).
const expectedHighestTsSeconds = 36.0
require.Equal(t, expectedHighestTsSeconds, m.metrics.highestTimestamp.Get())

138
util/testwal/records.go Normal file
View File

@ -0,0 +1,138 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testwal
import (
"fmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
// RecordsCase represents record generation option in a form of a test case.
type RecordsCase struct {
Name string
Series int
SamplesPerSeries int
HistogramsPerSeries int
FloatHistogramsPerSeries int
ExemplarsPerSeries int
ExtraLabels []labels.Label
LabelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels
TsFn func(i, j int) int64
}
// Records represents batches of generated WAL records.
type Records struct {
Series []record.RefSeries
Samples []record.RefSample
Histograms []record.RefHistogramSample
FloatHistograms []record.RefFloatHistogramSample
Exemplars []record.RefExemplar
Metadata []record.RefMetadata
}
func newTestHist(i int) *histogram.Histogram {
return &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
}
}
// GenerateRecords generates batches of WAL records for a given RecordsCase.
// Batches represents set of series with the given number of counter samples, histograms, etc. per each series ref.
func GenerateRecords(c RecordsCase) (ret Records) {
ret.Series = make([]record.RefSeries, c.Series)
ret.Metadata = make([]record.RefMetadata, c.Series)
ret.Samples = make([]record.RefSample, c.Series*c.SamplesPerSeries)
ret.Histograms = make([]record.RefHistogramSample, c.Series*c.HistogramsPerSeries)
ret.FloatHistograms = make([]record.RefFloatHistogramSample, c.Series*c.FloatHistogramsPerSeries)
ret.Exemplars = make([]record.RefExemplar, c.Series*c.ExemplarsPerSeries)
if c.LabelsFn == nil {
c.LabelsFn = func(lb *labels.ScratchBuilder, i int) labels.Labels {
// Create series with labels that contains name of series plus any extra labels supplied.
name := fmt.Sprintf("test_metric_%d", i)
lb.Reset()
lb.Add(model.MetricNameLabel, name)
for _, l := range c.ExtraLabels {
lb.Add(l.Name, l.Value)
}
lb.Sort()
return lb.Labels()
}
}
if c.TsFn == nil {
c.TsFn = func(_, j int) int64 { return int64(j) }
}
lb := labels.NewScratchBuilder(1 + len(c.ExtraLabels))
for i := range ret.Series {
ret.Series[i] = record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.LabelsFn(&lb, i),
}
ret.Metadata[i] = record.RefMetadata{
Ref: chunks.HeadSeriesRef(i),
Type: uint8(record.Counter),
Unit: "unit text",
Help: fmt.Sprintf("help text for %d", i),
}
for j := range c.SamplesPerSeries {
ret.Samples[i*c.SamplesPerSeries+j] = record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
V: float64(i),
}
}
h := newTestHist(i)
for j := range c.HistogramsPerSeries {
ret.Histograms[i*c.HistogramsPerSeries+j] = record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
H: h,
}
}
for j := range c.FloatHistogramsPerSeries {
ret.FloatHistograms[i*c.FloatHistogramsPerSeries+j] = record.RefFloatHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
FH: h.ToFloat(nil),
}
}
for j := range c.ExemplarsPerSeries {
ret.Exemplars[i*c.ExemplarsPerSeries+j] = record.RefExemplar{
Ref: chunks.HeadSeriesRef(i),
T: c.TsFn(i, j),
V: float64(i),
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
}
}
}
return ret
}

View File

@ -0,0 +1,39 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testwal
import (
"testing"
"github.com/prometheus/prometheus/config"
)
// BenchmarkGenerateRecords checks data generator performance.
// Recommended CLI:
/*
export bench=genRecs && go test ./util/testwal/... \
-run '^$' -bench '^BenchmarkGenerateRecords' \
-benchtime 1s -count 6 -cpu 2 -timeout 999m -benchmem \
| tee ${bench}.txt
*/
func BenchmarkGenerateRecords(b *testing.B) {
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
// This will generate 16M samples and 4k series.
GenerateRecords(RecordsCase{Series: n, SamplesPerSeries: n})
}
}