From 784ec0a792dead2588ebb5b9d0fe09aa84e3cb66 Mon Sep 17 00:00:00 2001 From: Minh Nguyen <148210689+pipiland2612@users.noreply.github.com> Date: Mon, 3 Nov 2025 11:22:46 +0200 Subject: [PATCH] update test to test both v1 and v2 (#17467) Signed-off-by: pipiland2612 --- storage/remote/queue_manager_test.go | 210 +++++++++++++++------------ 1 file changed, 119 insertions(+), 91 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index bb72b7f998..2f92df0c59 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -236,7 +236,6 @@ func TestSampleDelivery(t *testing.T) { {protoMsg: remoteapi.WriteV1MessageType, samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, {protoMsg: remoteapi.WriteV1MessageType, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, - // TODO(alexg): update some portion of this test to check for the 2.0 metadata {protoMsg: remoteapi.WriteV2MessageType, samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"}, {protoMsg: remoteapi.WriteV2MessageType, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"}, {protoMsg: remoteapi.WriteV2MessageType, samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"}, @@ -307,6 +306,9 @@ func TestSampleDelivery(t *testing.T) { c.expectExemplars(exemplars[len(exemplars)/2:], series) c.expectHistograms(histograms[len(histograms)/2:], series) c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series) + if tc.protoMsg == remoteapi.WriteV2MessageType && len(metadata) > 0 { + c.expectMetadataForBatch(metadata, series, samples[len(samples)/2:], exemplars[len(exemplars)/2:], histograms[len(histograms)/2:], floatHistograms[len(floatHistograms)/2:]) + } qm.Append(samples[len(samples)/2:]) qm.AppendExemplars(exemplars[len(exemplars)/2:]) qm.AppendHistograms(histograms[len(histograms)/2:]) @@ -471,54 +473,76 @@ func TestSampleDeliveryOrder(t *testing.T) { func TestShutdown(t *testing.T) { t.Parallel() - synctest.Test(t, func(t *testing.T) { - deadline := 15 * time.Second - c := NewTestBlockedWriteClient() + for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { + t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + deadline := 15 * time.Second + c := NewTestBlockedWriteClient() - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig - m := newTestQueueManager(t, cfg, mcfg, deadline, c, remoteapi.WriteV1MessageType) - n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend - samples, series := createTimeseries(n, n) - m.StoreSeries(series, 0) - m.Start() + m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg) + n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend + samples, series := createTimeseries(n, n) + m.StoreSeries(series, 0) + m.Start() - // Append blocks to guarantee delivery, so we do it in the background. - go func() { - m.Append(samples) - }() - synctest.Wait() + // Append blocks to guarantee delivery, so we do it in the background. + go func() { + m.Append(samples) + }() + synctest.Wait() - // Test to ensure that Stop doesn't block. - start := time.Now() - m.Stop() - // The samples will never be delivered, so duration should - // be at least equal to deadline, otherwise the flush deadline - // was not respected. - require.Equal(t, time.Since(start), deadline) - }) + // Test to ensure that Stop doesn't block. + start := time.Now() + m.Stop() + // The samples will never be delivered, so duration should + // be at least equal to deadline, otherwise the flush deadline + // was not respected. + require.Equal(t, time.Since(start), deadline) + }) + }) + } } func TestSeriesReset(t *testing.T) { - c := NewTestBlockedWriteClient() - deadline := 5 * time.Second - numSegments := 4 - numSeries := 25 + for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { + t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { + c := NewTestBlockedWriteClient() + deadline := 5 * time.Second + numSegments := 4 + numSeries := 25 - cfg := config.DefaultQueueConfig - mcfg := config.DefaultMetadataConfig - m := newTestQueueManager(t, cfg, mcfg, deadline, c, remoteapi.WriteV1MessageType) - for i := range numSegments { - series := []record.RefSeries{} - for j := range numSeries { - series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")}) - } - m.StoreSeries(series, i) + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg) + for i := range numSegments { + series := []record.RefSeries{} + metadata := []record.RefMetadata{} + for j := range numSeries { + ref := chunks.HeadSeriesRef((i * 100) + j) + series = append(series, record.RefSeries{Ref: ref, Labels: labels.FromStrings("a", "a")}) + metadata = append(metadata, record.RefMetadata{Ref: ref, Type: 1, Unit: "", Help: "test"}) + } + m.StoreSeries(series, i) + m.StoreMetadata(metadata) + } + require.Len(t, m.seriesLabels, numSegments*numSeries) + // V2 stores metadata in seriesMetadata map for inline sending. + // V1 sends metadata separately via MetadataWatcher, so seriesMetadata is not populated. + if protoMsg == remoteapi.WriteV2MessageType { + require.Len(t, m.seriesMetadata, numSegments*numSeries) + } + + m.SeriesReset(2) + require.Len(t, m.seriesLabels, numSegments*numSeries/2) + // Verify metadata is also reset for V2 + if protoMsg == remoteapi.WriteV2MessageType { + require.Len(t, m.seriesMetadata, numSegments*numSeries/2) + } + }) } - require.Len(t, m.seriesLabels, numSegments*numSeries) - m.SeriesReset(2) - require.Len(t, m.seriesLabels, numSegments*numSeries/2) } func TestReshard(t *testing.T) { @@ -2015,65 +2039,69 @@ func TestIsSampleOld(t *testing.T) { // Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing. func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { t.Parallel() - maxSamplesPerSend := 10 - sampleAgeLimit := time.Second * 2 + for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { + t.Run(fmt.Sprint(protoMsg), func(t *testing.T) { + maxSamplesPerSend := 10 + sampleAgeLimit := time.Second * 2 - cfg := config.DefaultQueueConfig - cfg.MaxShards = 1 - cfg.SampleAgeLimit = model.Duration(sampleAgeLimit) - // Set the batch send deadline to 5 minutes to effectively disable it. - cfg.BatchSendDeadline = model.Duration(time.Minute * 5) - cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test - cfg.MaxBackoff = model.Duration(time.Millisecond * 100) - cfg.MinBackoff = model.Duration(time.Millisecond * 100) - cfg.MaxSamplesPerSend = maxSamplesPerSend - metadataCfg := config.DefaultMetadataConfig - metadataCfg.Send = true - metadataCfg.SendInterval = model.Duration(time.Second * 60) - metadataCfg.MaxSamplesPerSend = maxSamplesPerSend - c := NewTestWriteClient(remoteapi.WriteV1MessageType) - m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, remoteapi.WriteV1MessageType) + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.SampleAgeLimit = model.Duration(sampleAgeLimit) + // Set the batch send deadline to 5 minutes to effectively disable it. + cfg.BatchSendDeadline = model.Duration(time.Minute * 5) + cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test + cfg.MaxBackoff = model.Duration(time.Millisecond * 100) + cfg.MinBackoff = model.Duration(time.Millisecond * 100) + cfg.MaxSamplesPerSend = maxSamplesPerSend + metadataCfg := config.DefaultMetadataConfig + metadataCfg.Send = true + metadataCfg.SendInterval = model.Duration(time.Second * 60) + metadataCfg.MaxSamplesPerSend = maxSamplesPerSend + c := NewTestWriteClient(protoMsg) + m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, protoMsg) - m.Start() + m.Start() - batchID := 0 - expectedSamples := map[string][]prompb.Sample{} + batchID := 0 + expectedSamples := map[string][]prompb.Sample{} - appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) { - t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped) - samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9) - m.StoreSeries(series, batchID) - sent := m.Append(samples) - require.True(t, sent, "samples not sent") - if !shouldBeDropped { - for _, s := range samples { - tsID := getSeriesIDFromRef(series[s.Ref]) - expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ - Timestamp: s.T, - Value: s.V, - }) + appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) { + t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped) + samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9) + m.StoreSeries(series, batchID) + sent := m.Append(samples) + require.True(t, sent, "samples not sent") + if !shouldBeDropped { + for _, s := range samples { + tsID := getSeriesIDFromRef(series[s.Ref]) + expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ + Timestamp: s.T, + Value: s.V, + }) + } + } + batchID++ } - } - batchID++ - } - timeShift := -time.Millisecond * 5 + timeShift := -time.Millisecond * 5 - c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff}) + c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff}) - appendData(maxSamplesPerSend/2, timeShift, true) - time.Sleep(sampleAgeLimit) - appendData(maxSamplesPerSend/2, timeShift, true) - time.Sleep(sampleAgeLimit / 10) - appendData(maxSamplesPerSend/2, timeShift, true) - time.Sleep(2 * sampleAgeLimit) - appendData(2*maxSamplesPerSend, timeShift, false) - time.Sleep(sampleAgeLimit / 2) - c.SetReturnError(nil) - appendData(5, timeShift, false) - m.Stop() + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(sampleAgeLimit) + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(sampleAgeLimit / 10) + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(2 * sampleAgeLimit) + appendData(2*maxSamplesPerSend, timeShift, false) + time.Sleep(sampleAgeLimit / 2) + c.SetReturnError(nil) + appendData(5, timeShift, false) + m.Stop() - if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" { - t.Errorf("mismatch (-want +got):\n%s", diff) + if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" { + t.Errorf("mismatch (-want +got):\n%s", diff) + } + }) } }