From 0ad8516ce09dba2367d003f4acdbbf792b692313 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Wed, 25 Feb 2026 19:15:22 +0000 Subject: [PATCH] fixed tests after rebase Signed-off-by: bwplotka --- storage/remote/queue_manager_test.go | 62 ++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 5a572e7deb..e6b933bb78 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -139,7 +139,10 @@ func TestBasicContentNegotiation(t *testing.T) { s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() - recs := generateRecords(recCase{series: 1, samplesPerSeries: 1}) + recs := generateRecords(recCase{ + noST: tc.senderProtoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: 1, samplesPerSeries: 1, + }) conf.RemoteWriteConfigs[0].ProtobufMessage = tc.senderProtoMsg require.NoError(t, s.ApplyConfig(conf)) @@ -221,6 +224,7 @@ func TestSampleDelivery(t *testing.T) { s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) defer s.Close() + rc.noST = protoMsg == remoteapi.WriteV1MessageType // RW1 does not support ST. recs := generateRecords(rc) var ( @@ -374,7 +378,10 @@ func TestWALMetadataDelivery(t *testing.T) { func TestSampleDeliveryTimeout(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { - recs := generateRecords(recCase{series: 10, samplesPerSeries: 10}) + recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: 10, samplesPerSeries: 10, + }) cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 @@ -403,7 +410,10 @@ func TestSampleDeliveryOrder(t *testing.T) { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts - recs := generateRecords(recCase{series: n, samplesPerSeries: 1}) + recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: n, samplesPerSeries: 1, + }) c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg) c.expectSamples(recs.samples, recs.series) @@ -432,7 +442,10 @@ func TestShutdown(t *testing.T) { m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg) // Send 2x batch size, so we know it will need at least two sends. n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend - recs := generateRecords(recCase{series: n / 1000, samplesPerSeries: 1000}) + recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: n / 1000, samplesPerSeries: 1000, + }) m.StoreSeries(recs.series, 0) m.Start() @@ -501,7 +514,10 @@ func TestReshard(t *testing.T) { size := 10 // Make bigger to find more races. nSeries := 6 samplesPerSeries := config.DefaultQueueConfig.Capacity * size - recs := generateRecords(recCase{series: nSeries, samplesPerSeries: samplesPerSeries}) + recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: nSeries, samplesPerSeries: samplesPerSeries, + }) t.Logf("about to send %v samples", len(recs.samples)) cfg := config.DefaultQueueConfig @@ -577,7 +593,10 @@ func TestReshardPartialBatch(t *testing.T) { t.Parallel() for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { - recs := generateRecords(recCase{series: 1, samplesPerSeries: 10}) + recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: 1, samplesPerSeries: 10, + }) c := NewTestBlockedWriteClient() @@ -622,7 +641,10 @@ func TestReshardPartialBatch(t *testing.T) { func TestQueueFilledDeadlock(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { - recs := generateRecords(recCase{series: 50, samplesPerSeries: 1}) + recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: 50, samplesPerSeries: 1, + }) c := NewNopWriteClient() @@ -803,6 +825,8 @@ type recCase struct { labelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels tsFn func(i, j int) int64 + + noST bool } type records struct { @@ -867,8 +891,12 @@ func generateRecords(c recCase) (ret records) { } for j := range c.samplesPerSeries { ts := c.tsFn(i, j) - st := ts - 1 // Keep ST simple for now; we don't need to - // test exact semantics. + st := int64(0) + if !c.noST { + // Keep ST simple for now; we don't test the exact semantics, just + // if RW passes this data. + st = ts - 1 + } ret.samples[i*c.samplesPerSeries+j] = record.RefSample{ Ref: chunks.HeadSeriesRef(i), ST: st, @@ -2014,7 +2042,9 @@ func TestDropOldTimeSeries(t *testing.T) { size := 10 nSeries := 6 nSamples := config.DefaultQueueConfig.Capacity * size + noST := protoMsg == remoteapi.WriteV1MessageType // RW1 does not support ST. pastRecs := generateRecords(recCase{ + noST: noST, series: nSeries, samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. tsFn: func(_, j int) int64 { @@ -2023,6 +2053,7 @@ func TestDropOldTimeSeries(t *testing.T) { }, }) newRecs := generateRecords(recCase{ + noST: noST, series: nSeries, samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past. tsFn: func(_, j int) int64 { @@ -2097,6 +2128,7 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { r := rand.New(rand.NewSource(99)) recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. series: numberOfSeries, samplesPerSeries: 1, tsFn: func(_, _ int) int64 { @@ -2121,9 +2153,10 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { if !shouldBeDropped { for _, s := range recs.samples { tsID := getSeriesIDFromRef(recs.series[s.Ref]) - c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ - Timestamp: s.T, - Value: s.V, + c.expectedSamples[tsID] = append(c.expectedSamples[tsID], writev2.Sample{ + StartTimestamp: s.ST, + Timestamp: s.T, + Value: s.V, }) } } @@ -2644,7 +2677,10 @@ func TestHighestTimestampOnAppend(t *testing.T) { t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { nSamples := 11 * config.DefaultQueueConfig.Capacity nSeries := 3 - recs := generateRecords(recCase{series: nSeries, samplesPerSeries: nSamples / nSeries}) + recs := generateRecords(recCase{ + noST: protoMsg == remoteapi.WriteV1MessageType, // RW1 does not support ST. + series: nSeries, samplesPerSeries: nSamples / nSeries, + }) _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg) m.Start()