mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-04 12:01:06 +02:00
update test to test both v1 and v2 (#17467)
Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
parent
0093e2159e
commit
784ec0a792
@ -236,7 +236,6 @@ func TestSampleDelivery(t *testing.T) {
|
||||
{protoMsg: remoteapi.WriteV1MessageType, samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
||||
{protoMsg: remoteapi.WriteV1MessageType, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
||||
|
||||
// TODO(alexg): update some portion of this test to check for the 2.0 metadata
|
||||
{protoMsg: remoteapi.WriteV2MessageType, samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
|
||||
{protoMsg: remoteapi.WriteV2MessageType, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
|
||||
{protoMsg: remoteapi.WriteV2MessageType, samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
|
||||
@ -307,6 +306,9 @@ func TestSampleDelivery(t *testing.T) {
|
||||
c.expectExemplars(exemplars[len(exemplars)/2:], series)
|
||||
c.expectHistograms(histograms[len(histograms)/2:], series)
|
||||
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
|
||||
if tc.protoMsg == remoteapi.WriteV2MessageType && len(metadata) > 0 {
|
||||
c.expectMetadataForBatch(metadata, series, samples[len(samples)/2:], exemplars[len(exemplars)/2:], histograms[len(histograms)/2:], floatHistograms[len(floatHistograms)/2:])
|
||||
}
|
||||
qm.Append(samples[len(samples)/2:])
|
||||
qm.AppendExemplars(exemplars[len(exemplars)/2:])
|
||||
qm.AppendHistograms(histograms[len(histograms)/2:])
|
||||
@ -471,54 +473,76 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||
|
||||
func TestShutdown(t *testing.T) {
|
||||
t.Parallel()
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
deadline := 15 * time.Second
|
||||
c := NewTestBlockedWriteClient()
|
||||
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
|
||||
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
deadline := 15 * time.Second
|
||||
c := NewTestBlockedWriteClient()
|
||||
|
||||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
|
||||
m := newTestQueueManager(t, cfg, mcfg, deadline, c, remoteapi.WriteV1MessageType)
|
||||
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
||||
samples, series := createTimeseries(n, n)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg)
|
||||
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
||||
samples, series := createTimeseries(n, n)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
|
||||
// Append blocks to guarantee delivery, so we do it in the background.
|
||||
go func() {
|
||||
m.Append(samples)
|
||||
}()
|
||||
synctest.Wait()
|
||||
// Append blocks to guarantee delivery, so we do it in the background.
|
||||
go func() {
|
||||
m.Append(samples)
|
||||
}()
|
||||
synctest.Wait()
|
||||
|
||||
// Test to ensure that Stop doesn't block.
|
||||
start := time.Now()
|
||||
m.Stop()
|
||||
// The samples will never be delivered, so duration should
|
||||
// be at least equal to deadline, otherwise the flush deadline
|
||||
// was not respected.
|
||||
require.Equal(t, time.Since(start), deadline)
|
||||
})
|
||||
// Test to ensure that Stop doesn't block.
|
||||
start := time.Now()
|
||||
m.Stop()
|
||||
// The samples will never be delivered, so duration should
|
||||
// be at least equal to deadline, otherwise the flush deadline
|
||||
// was not respected.
|
||||
require.Equal(t, time.Since(start), deadline)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeriesReset(t *testing.T) {
|
||||
c := NewTestBlockedWriteClient()
|
||||
deadline := 5 * time.Second
|
||||
numSegments := 4
|
||||
numSeries := 25
|
||||
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
|
||||
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
||||
c := NewTestBlockedWriteClient()
|
||||
deadline := 5 * time.Second
|
||||
numSegments := 4
|
||||
numSeries := 25
|
||||
|
||||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
m := newTestQueueManager(t, cfg, mcfg, deadline, c, remoteapi.WriteV1MessageType)
|
||||
for i := range numSegments {
|
||||
series := []record.RefSeries{}
|
||||
for j := range numSeries {
|
||||
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")})
|
||||
}
|
||||
m.StoreSeries(series, i)
|
||||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg)
|
||||
for i := range numSegments {
|
||||
series := []record.RefSeries{}
|
||||
metadata := []record.RefMetadata{}
|
||||
for j := range numSeries {
|
||||
ref := chunks.HeadSeriesRef((i * 100) + j)
|
||||
series = append(series, record.RefSeries{Ref: ref, Labels: labels.FromStrings("a", "a")})
|
||||
metadata = append(metadata, record.RefMetadata{Ref: ref, Type: 1, Unit: "", Help: "test"})
|
||||
}
|
||||
m.StoreSeries(series, i)
|
||||
m.StoreMetadata(metadata)
|
||||
}
|
||||
require.Len(t, m.seriesLabels, numSegments*numSeries)
|
||||
// V2 stores metadata in seriesMetadata map for inline sending.
|
||||
// V1 sends metadata separately via MetadataWatcher, so seriesMetadata is not populated.
|
||||
if protoMsg == remoteapi.WriteV2MessageType {
|
||||
require.Len(t, m.seriesMetadata, numSegments*numSeries)
|
||||
}
|
||||
|
||||
m.SeriesReset(2)
|
||||
require.Len(t, m.seriesLabels, numSegments*numSeries/2)
|
||||
// Verify metadata is also reset for V2
|
||||
if protoMsg == remoteapi.WriteV2MessageType {
|
||||
require.Len(t, m.seriesMetadata, numSegments*numSeries/2)
|
||||
}
|
||||
})
|
||||
}
|
||||
require.Len(t, m.seriesLabels, numSegments*numSeries)
|
||||
m.SeriesReset(2)
|
||||
require.Len(t, m.seriesLabels, numSegments*numSeries/2)
|
||||
}
|
||||
|
||||
func TestReshard(t *testing.T) {
|
||||
@ -2015,65 +2039,69 @@ func TestIsSampleOld(t *testing.T) {
|
||||
// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing.
|
||||
func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
maxSamplesPerSend := 10
|
||||
sampleAgeLimit := time.Second * 2
|
||||
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
|
||||
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
||||
maxSamplesPerSend := 10
|
||||
sampleAgeLimit := time.Second * 2
|
||||
|
||||
cfg := config.DefaultQueueConfig
|
||||
cfg.MaxShards = 1
|
||||
cfg.SampleAgeLimit = model.Duration(sampleAgeLimit)
|
||||
// Set the batch send deadline to 5 minutes to effectively disable it.
|
||||
cfg.BatchSendDeadline = model.Duration(time.Minute * 5)
|
||||
cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test
|
||||
cfg.MaxBackoff = model.Duration(time.Millisecond * 100)
|
||||
cfg.MinBackoff = model.Duration(time.Millisecond * 100)
|
||||
cfg.MaxSamplesPerSend = maxSamplesPerSend
|
||||
metadataCfg := config.DefaultMetadataConfig
|
||||
metadataCfg.Send = true
|
||||
metadataCfg.SendInterval = model.Duration(time.Second * 60)
|
||||
metadataCfg.MaxSamplesPerSend = maxSamplesPerSend
|
||||
c := NewTestWriteClient(remoteapi.WriteV1MessageType)
|
||||
m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, remoteapi.WriteV1MessageType)
|
||||
cfg := config.DefaultQueueConfig
|
||||
cfg.MaxShards = 1
|
||||
cfg.SampleAgeLimit = model.Duration(sampleAgeLimit)
|
||||
// Set the batch send deadline to 5 minutes to effectively disable it.
|
||||
cfg.BatchSendDeadline = model.Duration(time.Minute * 5)
|
||||
cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test
|
||||
cfg.MaxBackoff = model.Duration(time.Millisecond * 100)
|
||||
cfg.MinBackoff = model.Duration(time.Millisecond * 100)
|
||||
cfg.MaxSamplesPerSend = maxSamplesPerSend
|
||||
metadataCfg := config.DefaultMetadataConfig
|
||||
metadataCfg.Send = true
|
||||
metadataCfg.SendInterval = model.Duration(time.Second * 60)
|
||||
metadataCfg.MaxSamplesPerSend = maxSamplesPerSend
|
||||
c := NewTestWriteClient(protoMsg)
|
||||
m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, protoMsg)
|
||||
|
||||
m.Start()
|
||||
m.Start()
|
||||
|
||||
batchID := 0
|
||||
expectedSamples := map[string][]prompb.Sample{}
|
||||
batchID := 0
|
||||
expectedSamples := map[string][]prompb.Sample{}
|
||||
|
||||
appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) {
|
||||
t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped)
|
||||
samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9)
|
||||
m.StoreSeries(series, batchID)
|
||||
sent := m.Append(samples)
|
||||
require.True(t, sent, "samples not sent")
|
||||
if !shouldBeDropped {
|
||||
for _, s := range samples {
|
||||
tsID := getSeriesIDFromRef(series[s.Ref])
|
||||
expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
|
||||
Timestamp: s.T,
|
||||
Value: s.V,
|
||||
})
|
||||
appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) {
|
||||
t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped)
|
||||
samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9)
|
||||
m.StoreSeries(series, batchID)
|
||||
sent := m.Append(samples)
|
||||
require.True(t, sent, "samples not sent")
|
||||
if !shouldBeDropped {
|
||||
for _, s := range samples {
|
||||
tsID := getSeriesIDFromRef(series[s.Ref])
|
||||
expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
|
||||
Timestamp: s.T,
|
||||
Value: s.V,
|
||||
})
|
||||
}
|
||||
}
|
||||
batchID++
|
||||
}
|
||||
}
|
||||
batchID++
|
||||
}
|
||||
timeShift := -time.Millisecond * 5
|
||||
timeShift := -time.Millisecond * 5
|
||||
|
||||
c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff})
|
||||
c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff})
|
||||
|
||||
appendData(maxSamplesPerSend/2, timeShift, true)
|
||||
time.Sleep(sampleAgeLimit)
|
||||
appendData(maxSamplesPerSend/2, timeShift, true)
|
||||
time.Sleep(sampleAgeLimit / 10)
|
||||
appendData(maxSamplesPerSend/2, timeShift, true)
|
||||
time.Sleep(2 * sampleAgeLimit)
|
||||
appendData(2*maxSamplesPerSend, timeShift, false)
|
||||
time.Sleep(sampleAgeLimit / 2)
|
||||
c.SetReturnError(nil)
|
||||
appendData(5, timeShift, false)
|
||||
m.Stop()
|
||||
appendData(maxSamplesPerSend/2, timeShift, true)
|
||||
time.Sleep(sampleAgeLimit)
|
||||
appendData(maxSamplesPerSend/2, timeShift, true)
|
||||
time.Sleep(sampleAgeLimit / 10)
|
||||
appendData(maxSamplesPerSend/2, timeShift, true)
|
||||
time.Sleep(2 * sampleAgeLimit)
|
||||
appendData(2*maxSamplesPerSend, timeShift, false)
|
||||
time.Sleep(sampleAgeLimit / 2)
|
||||
c.SetReturnError(nil)
|
||||
appendData(5, timeShift, false)
|
||||
m.Stop()
|
||||
|
||||
if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" {
|
||||
t.Errorf("mismatch (-want +got):\n%s", diff)
|
||||
if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" {
|
||||
t.Errorf("mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user