diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index f97b0cd1e7..d73e8c72b1 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -871,7 +871,8 @@ func generateRecords(c recCase) (ret records) { // test exact semantics. ret.samples[i*c.samplesPerSeries+j] = record.RefSample{ Ref: chunks.HeadSeriesRef(i), - T: st, + ST: st, + T: ts, V: float64(i), } } @@ -952,8 +953,8 @@ func getSeriesIDFromRef(r record.RefSeries) string { // TestWriteClient represents write client which does not call remote storage, // but instead re-implements fake WriteHandler for test purposes. type TestWriteClient struct { - receivedSamples map[string][]prompb.Sample - expectedSamples map[string][]prompb.Sample + receivedSamples map[string][]writev2.Sample + expectedSamples map[string][]writev2.Sample receivedExemplars map[string][]prompb.Exemplar expectedExemplars map[string][]prompb.Exemplar receivedHistograms map[string][]prompb.Histogram @@ -977,8 +978,8 @@ type TestWriteClient struct { // NewTestWriteClient creates a new testing write client. func NewTestWriteClient(protoMsg remoteapi.WriteMessageType) *TestWriteClient { return &TestWriteClient{ - receivedSamples: map[string][]prompb.Sample{}, - expectedSamples: map[string][]prompb.Sample{}, + receivedSamples: map[string][]writev2.Sample{}, + expectedSamples: map[string][]writev2.Sample{}, receivedMetadata: map[string][]prompb.MetricMetadata{}, expectedMetadata: map[string][]prompb.MetricMetadata{}, protoMsg: protoMsg, @@ -993,18 +994,20 @@ func (c *TestWriteClient) injectErrors(injectedErrs []error) { c.retry = false } +// expectSamples injects samples that will be expected on waitForExpectedData. func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { c.mtx.Lock() defer c.mtx.Unlock() - c.expectedSamples = map[string][]prompb.Sample{} - c.receivedSamples = map[string][]prompb.Sample{} + c.expectedSamples = map[string][]writev2.Sample{} + c.receivedSamples = map[string][]writev2.Sample{} for _, s := range ss { tsID := getSeriesIDFromRef(series[s.Ref]) - c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ - Timestamp: s.T, - Value: s.V, + c.expectedSamples[tsID] = append(c.expectedSamples[tsID], writev2.Sample{ + StartTimestamp: s.ST, + Timestamp: s.T, + Value: s.V, }) } } @@ -1182,7 +1185,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp } } - var reqProto *prompb.WriteRequest + var ( + reqProto *prompb.WriteRequest + reqProtoV2 *writev2.Request + ) switch c.protoMsg { case remoteapi.WriteV1MessageType: reqProto = &prompb.WriteRequest{} @@ -1190,10 +1196,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp case remoteapi.WriteV2MessageType: // NOTE(bwplotka): v1 msg can be unmarshaled to v2 sometimes, without // errors. - var reqProtoV2 writev2.Request - err = proto.Unmarshal(reqBuf, &reqProtoV2) + reqProtoV2 = &writev2.Request{} + err = proto.Unmarshal(reqBuf, reqProtoV2) if err == nil { - reqProto, err = v2RequestToWriteRequest(&reqProtoV2) + reqProto, err = v2RequestToWriteRequest(reqProtoV2) } } if err != nil { @@ -1202,11 +1208,21 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResp rs := WriteResponseStats{} b := labels.NewScratchBuilder(0) - for _, ts := range reqProto.Timeseries { + for i, ts := range reqProto.Timeseries { labels := ts.ToLabels(&b, nil) tsID := labels.String() - if len(ts.Samples) > 0 { - c.receivedSamples[tsID] = append(c.receivedSamples[tsID], ts.Samples...) + for j, s := range ts.Samples { + st := int64(0) + if reqProtoV2 != nil { + // TODO(bwplotka): Refactor queue manager TestWriteClient for tighter validation + // and native support for new RW2 features. For now we inject STs in RW2 case to the existing test suite. + st = reqProtoV2.Timeseries[i].Samples[j].StartTimestamp + } + c.receivedSamples[tsID] = append(c.receivedSamples[tsID], writev2.Sample{ + StartTimestamp: st, + Timestamp: s.Timestamp, + Value: s.Value, + }) } rs.Samples += len(ts.Samples)