From 596830ee724b4217c4fe8c33a8268c02072f8f5e Mon Sep 17 00:00:00 2001 From: bwplotka Date: Tue, 3 Mar 2026 09:59:52 +0000 Subject: [PATCH] tests(util/testwal): Move WAL record generation to separate package for reuse Signed-off-by: bwplotka tmp Signed-off-by: bwplotka --- storage/remote/queue_manager_test.go | 297 ++++++++------------------- util/testwal/records.go | 138 +++++++++++++ util/testwal/records_test.go | 39 ++++ 3 files changed, 263 insertions(+), 211 deletions(-) create mode 100644 util/testwal/records.go create mode 100644 util/testwal/records_test.go diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 210a61a287..d876327c83 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -51,10 +51,13 @@ import ( "github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil/synctest" + "github.com/prometheus/prometheus/util/testwal" ) const defaultFlushDeadline = 1 * time.Minute +type recCase = testwal.RecordsCase + func newHighestTimestampMetric() *maxTimestamp { return &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -139,7 +142,7 @@ func TestBasicContentNegotiation(t *testing.T) { s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() - recs := generateRecords(recCase{series: 1, samplesPerSeries: 1}) + recs := testwal.GenerateRecords(recCase{Series: 1, SamplesPerSeries: 1}) conf.RemoteWriteConfigs[0].ProtobufMessage = tc.senderProtoMsg require.NoError(t, s.ApplyConfig(conf)) @@ -151,18 +154,18 @@ func TestBasicContentNegotiation(t *testing.T) { c.injectErrors(tc.injectErrs) qm.SetClient(c) - qm.StoreSeries(recs.series, 0) - qm.StoreMetadata(recs.metadata) + qm.StoreSeries(recs.Series, 0) + qm.StoreMetadata(recs.Metadata) // Do we expect some data back? if !tc.expectFail { - c.expectSamples(recs.samples, recs.series) + c.expectSamples(recs.Samples, recs.Series) } else { c.expectSamples(nil, nil) } // Schedule send. - qm.Append(recs.samples) + qm.Append(recs.Samples) if !tc.expectFail { // No error expected, so wait for data. @@ -210,26 +213,26 @@ func TestSampleDelivery(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { for _, rc := range []recCase{ - {series: n, samplesPerSeries: n, histogramsPerSeries: 0, floatHistogramsPerSeries: 0, exemplarsPerSeries: 0, name: "samples only"}, - {series: n, samplesPerSeries: 0, histogramsPerSeries: n, floatHistogramsPerSeries: 0, exemplarsPerSeries: 0, name: "histograms only"}, - {series: n, samplesPerSeries: 0, histogramsPerSeries: 0, floatHistogramsPerSeries: n, exemplarsPerSeries: 0, name: "float histograms only"}, - {series: n, samplesPerSeries: 0, histogramsPerSeries: 0, floatHistogramsPerSeries: 0, exemplarsPerSeries: n, name: "exemplars only"}, - {series: n, samplesPerSeries: n, histogramsPerSeries: n, floatHistogramsPerSeries: n, exemplarsPerSeries: n, name: "all"}, + {Series: n, SamplesPerSeries: n, HistogramsPerSeries: 0, FloatHistogramsPerSeries: 0, ExemplarsPerSeries: 0, Name: "samples only"}, + {Series: n, SamplesPerSeries: 0, HistogramsPerSeries: n, FloatHistogramsPerSeries: 0, ExemplarsPerSeries: 0, Name: "histograms only"}, + {Series: n, SamplesPerSeries: 0, HistogramsPerSeries: 0, FloatHistogramsPerSeries: n, ExemplarsPerSeries: 0, Name: "float histograms only"}, + {Series: n, SamplesPerSeries: 0, HistogramsPerSeries: 0, FloatHistogramsPerSeries: 0, ExemplarsPerSeries: n, Name: "exemplars only"}, + {Series: n, SamplesPerSeries: n, HistogramsPerSeries: n, FloatHistogramsPerSeries: n, ExemplarsPerSeries: n, Name: "all"}, } { - t.Run(fmt.Sprintf("proto=%s/case=%s", protoMsg, rc.name), func(t *testing.T) { + t.Run(fmt.Sprintf("proto=%s/case=%s", protoMsg, rc.Name), func(t *testing.T) { dir := t.TempDir() s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() - recs := generateRecords(rc) + recs := testwal.GenerateRecords(rc) var ( - series = recs.series - metadata = recs.metadata - samples = recs.samples - exemplars = recs.exemplars - histograms = recs.histograms - floatHistograms = recs.floatHistograms + series = recs.Series + metadata = recs.Metadata + samples = recs.Samples + exemplars = recs.Exemplars + histograms = recs.Histograms + floatHistograms = recs.FloatHistograms ) // Apply new config. @@ -349,7 +352,7 @@ func TestWALMetadataDelivery(t *testing.T) { } n := 3 - recs := generateRecords(recCase{series: n, samplesPerSeries: n}) + recs := testwal.GenerateRecords(recCase{Series: n, SamplesPerSeries: n}) require.NoError(t, s.ApplyConfig(conf)) hash, err := toHash(writeConfig) @@ -359,15 +362,15 @@ func TestWALMetadataDelivery(t *testing.T) { c := NewTestWriteClient(remoteapi.WriteV2MessageType) qm.SetClient(c) - qm.StoreSeries(recs.series, 0) - qm.StoreMetadata(recs.metadata) + qm.StoreSeries(recs.Series, 0) + qm.StoreMetadata(recs.Metadata) require.Len(t, qm.seriesLabels, n) require.Len(t, qm.seriesMetadata, n) - c.expectSamples(recs.samples, recs.series) - c.expectMetadataForBatch(recs.metadata, recs.series, recs.samples, nil, nil, nil) - qm.Append(recs.samples) + c.expectSamples(recs.Samples, recs.Series) + c.expectMetadataForBatch(recs.Metadata, recs.Series, recs.Samples, nil, nil, nil) + qm.Append(recs.Samples) c.waitForExpectedData(t, 30*time.Second) } @@ -375,24 +378,24 @@ func TestSampleDeliveryTimeout(t *testing.T) { t.Parallel() for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { - recs := generateRecords(recCase{series: 10, samplesPerSeries: 10}) + recs := testwal.GenerateRecords(recCase{Series: 10, SamplesPerSeries: 10}) cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 c := NewTestWriteClient(protoMsg) m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg) - m.StoreSeries(recs.series, 0) + m.StoreSeries(recs.Series, 0) m.Start() defer m.Stop() // Send the samples twice, waiting for the samples in the meantime. - c.expectSamples(recs.samples, recs.series) - m.Append(recs.samples) + c.expectSamples(recs.Samples, recs.Series) + m.Append(recs.Samples) c.waitForExpectedData(t, 30*time.Second) - c.expectSamples(recs.samples, recs.series) - m.Append(recs.samples) + c.expectSamples(recs.Samples, recs.Series) + m.Append(recs.Samples) c.waitForExpectedData(t, 30*time.Second) }) } @@ -404,16 +407,16 @@ func TestSampleDeliveryOrder(t *testing.T) { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts - recs := generateRecords(recCase{series: n, samplesPerSeries: 1}) + recs := testwal.GenerateRecords(recCase{Series: n, SamplesPerSeries: 1}) c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg) - c.expectSamples(recs.samples, recs.series) - m.StoreSeries(recs.series, 0) + c.expectSamples(recs.Samples, recs.Series) + m.StoreSeries(recs.Series, 0) m.Start() defer m.Stop() // These should be received by the client. - m.Append(recs.samples) + m.Append(recs.Samples) c.waitForExpectedData(t, 30*time.Second) }) } @@ -433,13 +436,13 @@ func TestShutdown(t *testing.T) { m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg) // Send 2x batch size, so we know it will need at least two sends. n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend - recs := generateRecords(recCase{series: n / 1000, samplesPerSeries: 1000}) - m.StoreSeries(recs.series, 0) + recs := testwal.GenerateRecords(recCase{Series: n / 1000, SamplesPerSeries: 1000}) + m.StoreSeries(recs.Series, 0) m.Start() // Append blocks to guarantee delivery, so we do it in the background. go func() { - m.Append(recs.samples) + m.Append(recs.Samples) }() synctest.Wait() @@ -502,29 +505,29 @@ func TestReshard(t *testing.T) { size := 10 // Make bigger to find more races. nSeries := 6 samplesPerSeries := config.DefaultQueueConfig.Capacity * size - recs := generateRecords(recCase{series: nSeries, samplesPerSeries: samplesPerSeries}) - t.Logf("about to send %v samples", len(recs.samples)) + recs := testwal.GenerateRecords(recCase{Series: nSeries, SamplesPerSeries: samplesPerSeries}) + t.Logf("about to send %v samples", len(recs.Samples)) cfg := config.DefaultQueueConfig cfg.MaxShards = 1 c := NewTestWriteClient(protoMsg) m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg) - c.expectSamples(recs.samples, recs.series) - m.StoreSeries(recs.series, 0) + c.expectSamples(recs.Samples, recs.Series) + m.StoreSeries(recs.Series, 0) m.Start() defer m.Stop() go func() { - for i := 0; i < len(recs.samples); i += config.DefaultQueueConfig.Capacity { - sent := m.Append(recs.samples[i : i+config.DefaultQueueConfig.Capacity]) + for i := 0; i < len(recs.Samples); i += config.DefaultQueueConfig.Capacity { + sent := m.Append(recs.Samples[i : i+config.DefaultQueueConfig.Capacity]) require.True(t, sent, "samples not sent") time.Sleep(100 * time.Millisecond) } }() - for i := 1; i < len(recs.samples)/config.DefaultQueueConfig.Capacity; i++ { + for i := 1; i < len(recs.Samples)/config.DefaultQueueConfig.Capacity; i++ { m.shards.stop() m.shards.start(i) time.Sleep(100 * time.Millisecond) @@ -578,7 +581,7 @@ func TestReshardPartialBatch(t *testing.T) { t.Parallel() for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { - recs := generateRecords(recCase{series: 1, samplesPerSeries: 10}) + recs := testwal.GenerateRecords(recCase{Series: 1, SamplesPerSeries: 10}) c := NewTestBlockedWriteClient() @@ -590,14 +593,14 @@ func TestReshardPartialBatch(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg) - m.StoreSeries(recs.series, 0) + m.StoreSeries(recs.Series, 0) m.Start() for range 100 { done := make(chan struct{}) go func() { - m.Append(recs.samples) + m.Append(recs.Samples) time.Sleep(batchSendDeadline) m.shards.stop() m.shards.start(1) @@ -623,7 +626,7 @@ func TestReshardPartialBatch(t *testing.T) { func TestQueueFilledDeadlock(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { - recs := generateRecords(recCase{series: 50, samplesPerSeries: 1}) + recs := testwal.GenerateRecords(recCase{Series: 50, SamplesPerSeries: 1}) c := NewNopWriteClient() @@ -637,7 +640,7 @@ func TestQueueFilledDeadlock(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg) - m.StoreSeries(recs.series, 0) + m.StoreSeries(recs.Series, 0) m.Start() defer m.Stop() @@ -645,7 +648,7 @@ func TestQueueFilledDeadlock(t *testing.T) { done := make(chan struct{}) go func() { time.Sleep(batchSendDeadline) - m.Append(recs.samples) + m.Append(recs.Samples) done <- struct{}{} }() select { @@ -735,7 +738,7 @@ func TestDisableReshardOnRetry(t *testing.T) { defer onStoreCalled() var ( - recs = generateRecords(recCase{series: 100, samplesPerSeries: 100}) + recs = testwal.GenerateRecords(recCase{Series: 100, SamplesPerSeries: 100}) cfg = config.DefaultQueueConfig mcfg = config.DefaultMetadataConfig @@ -758,14 +761,14 @@ func TestDisableReshardOnRetry(t *testing.T) { ) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType) - m.StoreSeries(recs.series, 0) + m.StoreSeries(recs.Series, 0) // Attempt to samples while the manager is running. We immediately stop the // manager after the recoverable error is generated to prevent the manager // from resharding itself. m.Start() { - m.Append(recs.samples) + m.Append(recs.Samples) select { case <-onStoredContext.Done(): @@ -791,134 +794,6 @@ func TestDisableReshardOnRetry(t *testing.T) { }, time.Minute, retryAfter, "shouldReshard should have been re-enabled") } -type recCase struct { - name string - - series int - samplesPerSeries int - histogramsPerSeries int - floatHistogramsPerSeries int - exemplarsPerSeries int - - extraLabels []labels.Label - - labelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels - tsFn func(i, j int) int64 -} - -type records struct { - series []record.RefSeries - samples []record.RefSample - histograms []record.RefHistogramSample - floatHistograms []record.RefFloatHistogramSample - exemplars []record.RefExemplar - metadata []record.RefMetadata -} - -func newTestHist(i int) *histogram.Histogram { - return &histogram.Histogram{ - Schema: 2, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 2, - Sum: 0, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - PositiveBuckets: []int64{int64(i) + 1}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, - NegativeBuckets: []int64{int64(-i) - 1}, - } -} - -func generateRecords(c recCase) (ret records) { - ret.series = make([]record.RefSeries, c.series) - ret.metadata = make([]record.RefMetadata, c.series) - ret.samples = make([]record.RefSample, c.series*c.samplesPerSeries) - ret.histograms = make([]record.RefHistogramSample, c.series*c.histogramsPerSeries) - ret.floatHistograms = make([]record.RefFloatHistogramSample, c.series*c.floatHistogramsPerSeries) - ret.exemplars = make([]record.RefExemplar, c.series*c.exemplarsPerSeries) - - if c.labelsFn == nil { - c.labelsFn = func(lb *labels.ScratchBuilder, i int) labels.Labels { - // Create series with labels that contains name of series plus any extra labels supplied. - name := fmt.Sprintf("test_metric_%d", i) - lb.Reset() - lb.Add(model.MetricNameLabel, name) - for _, l := range c.extraLabels { - lb.Add(l.Name, l.Value) - } - lb.Sort() - return lb.Labels() - } - } - if c.tsFn == nil { - c.tsFn = func(_, j int) int64 { return int64(j) } - } - - lb := labels.NewScratchBuilder(1 + len(c.extraLabels)) - for i := range ret.series { - ret.series[i] = record.RefSeries{ - Ref: chunks.HeadSeriesRef(i), - Labels: c.labelsFn(&lb, i), - } - ret.metadata[i] = record.RefMetadata{ - Ref: chunks.HeadSeriesRef(i), - Type: uint8(record.Counter), - Unit: "unit text", - Help: "help text", - } - for j := range c.samplesPerSeries { - ret.samples[i*c.samplesPerSeries+j] = record.RefSample{ - Ref: chunks.HeadSeriesRef(i), - T: c.tsFn(i, j), - V: float64(i), - } - } - h := newTestHist(i) - for j := range c.histogramsPerSeries { - ret.histograms[i*c.histogramsPerSeries+j] = record.RefHistogramSample{ - Ref: chunks.HeadSeriesRef(i), - T: c.tsFn(i, j), - H: h, - } - } - for j := range c.floatHistogramsPerSeries { - ret.floatHistograms[i*c.floatHistogramsPerSeries+j] = record.RefFloatHistogramSample{ - Ref: chunks.HeadSeriesRef(i), - T: c.tsFn(i, j), - FH: h.ToFloat(nil), - } - } - for j := range c.exemplarsPerSeries { - ret.exemplars[i*c.exemplarsPerSeries+j] = record.RefExemplar{ - Ref: chunks.HeadSeriesRef(i), - T: c.tsFn(i, j), - V: float64(i), - Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)), - } - } - } - return ret -} - -// BenchmarkGenerateRecords checks data generator performance. -// Recommended CLI: -/* - export bench=genRecs && go test ./storage/remote/... \ - -run '^$' -bench '^BenchmarkGenerateRecords' \ - -benchtime 1s -count 6 -cpu 2 -timeout 999m -benchmem \ - | tee ${bench}.txt -*/ -func BenchmarkGenerateRecords(b *testing.B) { - n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend - - b.ReportAllocs() - b.ResetTimer() - for b.Loop() { - // This will generate 16M samples and 4k series. - generateRecords(recCase{series: n, samplesPerSeries: n}) - } -} - func createProtoTimeseriesWithOld(numSamples, baseTs int64) []prompb.TimeSeries { samples := make([]prompb.TimeSeries, numSamples) // use a fixed rand source so tests are consistent @@ -1385,7 +1260,7 @@ func BenchmarkSampleSend(b *testing.B) { const numSamples = 1 const numSeries = 10000 - recs := generateRecords(recCase{series: numSeries, samplesPerSeries: numSamples, extraLabels: extraLabels}) + recs := testwal.GenerateRecords(recCase{Series: numSeries, SamplesPerSeries: numSamples, ExtraLabels: extraLabels}) c := NewNopWriteClient() @@ -1399,7 +1274,7 @@ func BenchmarkSampleSend(b *testing.B) { for _, format := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { b.Run(string(format), func(b *testing.B) { m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, format) - m.StoreSeries(recs.series, 0) + m.StoreSeries(recs.Series, 0) // These should be received by the client. m.Start() @@ -1407,8 +1282,8 @@ func BenchmarkSampleSend(b *testing.B) { b.ResetTimer() for i := 0; b.Loop(); i++ { - m.Append(recs.samples) - m.UpdateSeriesSegment(recs.series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does + m.Append(recs.Samples) + m.UpdateSeriesSegment(recs.Series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does m.SeriesReset(i + 1) } // Do not include shutdown @@ -1450,7 +1325,7 @@ func BenchmarkStoreSeries(b *testing.B) { // numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager. const numSeries = 1000 - recs := generateRecords(recCase{series: numSeries, samplesPerSeries: 0, extraLabels: extraLabels}) + recs := testwal.GenerateRecords(recCase{Series: numSeries, SamplesPerSeries: 0, ExtraLabels: extraLabels}) for _, tc := range testCases { b.Run(tc.name, func(b *testing.B) { @@ -1465,7 +1340,7 @@ func BenchmarkStoreSeries(b *testing.B) { m.externalLabels = tc.externalLabels m.relabelConfigs = tc.relabelConfigs - m.StoreSeries(recs.series, 0) + m.StoreSeries(recs.Series, 0) } }) } @@ -1975,25 +1850,25 @@ func TestDropOldTimeSeries(t *testing.T) { size := 10 nSeries := 6 nSamples := config.DefaultQueueConfig.Capacity * size - pastRecs := generateRecords(recCase{ - series: nSeries, - samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. - tsFn: func(_, j int) int64 { + pastRecs := testwal.GenerateRecords(recCase{ + Series: nSeries, + SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. + TsFn: func(_, j int) int64 { past := timestamp.FromTime(time.Now().Add(-5 * time.Minute)) return past + int64(j) }, }) - newRecs := generateRecords(recCase{ - series: nSeries, - samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. - tsFn: func(_, j int) int64 { + newRecs := testwal.GenerateRecords(recCase{ + Series: nSeries, + SamplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. + TsFn: func(_, j int) int64 { return time.Now().UnixMilli() + int64(j) }, }) - series := pastRecs.series // Series is the same for both old and new. - newSamples := newRecs.samples - samples := append(pastRecs.samples, newRecs.samples...) + series := pastRecs.Series // Series is the same for both old and new. + newSamples := newRecs.Samples + samples := append(pastRecs.Samples, newRecs.Samples...) c := NewTestWriteClient(protoMsg) c.expectSamples(newSamples, series) @@ -2057,13 +1932,13 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { // Use a fixed rand source so tests are consistent. r := rand.New(rand.NewSource(99)) - recs := generateRecords(recCase{ - series: numberOfSeries, - samplesPerSeries: 1, - tsFn: func(_, _ int) int64 { + recs := testwal.GenerateRecords(recCase{ + Series: numberOfSeries, + SamplesPerSeries: 1, + TsFn: func(_, _ int) int64 { return time.Now().Add(timeAdd).UnixMilli() }, - labelsFn: func(lb *labels.ScratchBuilder, i int) labels.Labels { + LabelsFn: func(lb *labels.ScratchBuilder, i int) labels.Labels { lb.Reset() labelsCount := r.Intn(maxLabels) lb.Add("__name__", "batch_"+strconv.Itoa(batchID)+"_id_"+strconv.Itoa(i)) @@ -2076,12 +1951,12 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { }, }) - m.StoreSeries(recs.series, batchID) - sent := m.Append(recs.samples) + m.StoreSeries(recs.Series, batchID) + sent := m.Append(recs.Samples) require.True(t, sent, "samples not sent") if !shouldBeDropped { - for _, s := range recs.samples { - tsID := getSeriesIDFromRef(recs.series[s.Ref]) + for _, s := range recs.Samples { + tsID := getSeriesIDFromRef(recs.Series[s.Ref]) c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ Timestamp: s.T, Value: s.V, @@ -2605,7 +2480,7 @@ func TestHighestTimestampOnAppend(t *testing.T) { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { nSamples := 11 * config.DefaultQueueConfig.Capacity nSeries := 3 - recs := generateRecords(recCase{series: nSeries, samplesPerSeries: nSamples / nSeries}) + recs := testwal.GenerateRecords(recCase{Series: nSeries, SamplesPerSeries: nSamples / nSeries}) _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg) m.Start() @@ -2613,11 +2488,11 @@ func TestHighestTimestampOnAppend(t *testing.T) { require.Equal(t, 0.0, m.metrics.highestTimestamp.Get()) - m.StoreSeries(recs.series, 0) - require.True(t, m.Append(recs.samples)) + m.StoreSeries(recs.Series, 0) + require.True(t, m.Append(recs.Samples)) // Check that Append sets the highest timestamp correctly. - // NOTE: generateRecords yields nSamples/nSeries samples (36666), with timestamp. + // NOTE: testwal.GenerateRecords yields nSamples/nSeries samples (36666), with timestamp. // This gives the highest timestamp of 36666/1000 (seconds). const expectedHighestTsSeconds = 36.0 require.Equal(t, expectedHighestTsSeconds, m.metrics.highestTimestamp.Get()) diff --git a/util/testwal/records.go b/util/testwal/records.go new file mode 100644 index 0000000000..9a75efb235 --- /dev/null +++ b/util/testwal/records.go @@ -0,0 +1,138 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testwal + +import ( + "fmt" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" +) + +// RecordsCase represents record generation option in a form of a test case. +type RecordsCase struct { + Name string + + Series int + SamplesPerSeries int + HistogramsPerSeries int + FloatHistogramsPerSeries int + ExemplarsPerSeries int + + ExtraLabels []labels.Label + + LabelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels + TsFn func(i, j int) int64 +} + +// Records represents batches of generated WAL records. +type Records struct { + Series []record.RefSeries + Samples []record.RefSample + Histograms []record.RefHistogramSample + FloatHistograms []record.RefFloatHistogramSample + Exemplars []record.RefExemplar + Metadata []record.RefMetadata +} + +func newTestHist(i int) *histogram.Histogram { + return &histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{int64(i) + 1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{int64(-i) - 1}, + } +} + +// GenerateRecords generates batches of WAL records for a given RecordsCase. +// Batches represents set of series with the given number of counter samples, histograms, etc. per each series ref. +func GenerateRecords(c RecordsCase) (ret Records) { + ret.Series = make([]record.RefSeries, c.Series) + ret.Metadata = make([]record.RefMetadata, c.Series) + ret.Samples = make([]record.RefSample, c.Series*c.SamplesPerSeries) + ret.Histograms = make([]record.RefHistogramSample, c.Series*c.HistogramsPerSeries) + ret.FloatHistograms = make([]record.RefFloatHistogramSample, c.Series*c.FloatHistogramsPerSeries) + ret.Exemplars = make([]record.RefExemplar, c.Series*c.ExemplarsPerSeries) + + if c.LabelsFn == nil { + c.LabelsFn = func(lb *labels.ScratchBuilder, i int) labels.Labels { + // Create series with labels that contains name of series plus any extra labels supplied. + name := fmt.Sprintf("test_metric_%d", i) + lb.Reset() + lb.Add(model.MetricNameLabel, name) + for _, l := range c.ExtraLabels { + lb.Add(l.Name, l.Value) + } + lb.Sort() + return lb.Labels() + } + } + if c.TsFn == nil { + c.TsFn = func(_, j int) int64 { return int64(j) } + } + + lb := labels.NewScratchBuilder(1 + len(c.ExtraLabels)) + for i := range ret.Series { + ret.Series[i] = record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: c.LabelsFn(&lb, i), + } + ret.Metadata[i] = record.RefMetadata{ + Ref: chunks.HeadSeriesRef(i), + Type: uint8(record.Counter), + Unit: "unit text", + Help: fmt.Sprintf("help text for %d", i), + } + for j := range c.SamplesPerSeries { + ret.Samples[i*c.SamplesPerSeries+j] = record.RefSample{ + Ref: chunks.HeadSeriesRef(i), + T: c.TsFn(i, j), + V: float64(i), + } + } + h := newTestHist(i) + for j := range c.HistogramsPerSeries { + ret.Histograms[i*c.HistogramsPerSeries+j] = record.RefHistogramSample{ + Ref: chunks.HeadSeriesRef(i), + T: c.TsFn(i, j), + H: h, + } + } + for j := range c.FloatHistogramsPerSeries { + ret.FloatHistograms[i*c.FloatHistogramsPerSeries+j] = record.RefFloatHistogramSample{ + Ref: chunks.HeadSeriesRef(i), + T: c.TsFn(i, j), + FH: h.ToFloat(nil), + } + } + for j := range c.ExemplarsPerSeries { + ret.Exemplars[i*c.ExemplarsPerSeries+j] = record.RefExemplar{ + Ref: chunks.HeadSeriesRef(i), + T: c.TsFn(i, j), + V: float64(i), + Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)), + } + } + } + return ret +} diff --git a/util/testwal/records_test.go b/util/testwal/records_test.go new file mode 100644 index 0000000000..4106fed24d --- /dev/null +++ b/util/testwal/records_test.go @@ -0,0 +1,39 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testwal + +import ( + "testing" + + "github.com/prometheus/prometheus/config" +) + +// BenchmarkGenerateRecords checks data generator performance. +// Recommended CLI: +/* + export bench=genRecs && go test ./util/testwal/... \ + -run '^$' -bench '^BenchmarkGenerateRecords' \ + -benchtime 1s -count 6 -cpu 2 -timeout 999m -benchmem \ + | tee ${bench}.txt +*/ +func BenchmarkGenerateRecords(b *testing.B) { + n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + // This will generate 16M samples and 4k series. + GenerateRecords(RecordsCase{Series: n, SamplesPerSeries: n}) + } +}