diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 68cfe9899f..8aad3afeb3 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -292,7 +292,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) { } func TestDecodeWriteRequest(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) require.NoError(t, err) actual, err := DecodeWriteRequest(bytes.NewReader(buf)) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 17e57ea85c..9ac3b926b5 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -433,7 +433,7 @@ func NewQueueManager( return t } -// AppendMetadata sends metadata the remote storage. Metadata is sent all at once and is not parallelized. +// AppendMetadata sends metadata the remote storage. Metadata is sent in batches, but is not parallelized. func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { mm := make([]prompb.MetricMetadata, 0, len(metadata)) for _, entry := range metadata { @@ -445,13 +445,14 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met }) } + pBuf := proto.NewBuffer(nil) numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend))) for i := 0; i < numSends; i++ { last := (i + 1) * t.mcfg.MaxSamplesPerSend if last > len(metadata) { last = len(metadata) } - err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last]) + err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last], pBuf) if err != nil { t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend))) level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err) @@ -459,9 +460,9 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met } } -func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata) error { +func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples. - req, _, err := buildWriteRequest(nil, metadata, nil) + req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) if err != nil { return err } @@ -1040,7 +1041,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface max = s.qm.cfg.MaxSamplesPerSend nPending, nPendingSamples, nPendingExemplars = 0, 0, 0 - buf []byte + pBuf = proto.NewBuffer(nil) + buf []byte ) if s.qm.sendExemplars { max += int(float64(max) * 0.1) @@ -1084,7 +1086,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface if !ok { if nPendingSamples > 0 || nPendingExemplars > 0 { level.Debug(s.qm.logger).Log("msg", "Flushing data to remote storage...", "samples", nPendingSamples, "exemplars", nPendingExemplars) - s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) + s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf) s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) level.Debug(s.qm.logger).Log("msg", "Done flushing.") @@ -1114,7 +1116,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface } if nPending >= max { - s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) + s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf) s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) nPendingSamples = 0 @@ -1128,7 +1130,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface case <-timer.C: if nPendingSamples > 0 || nPendingExemplars > 0 { level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) - s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf) + s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, pBuf, &buf) s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) nPendingSamples = 0 @@ -1140,9 +1142,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface } } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, buf) + err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, pBuf, buf) if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) @@ -1157,9 +1159,9 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, buf *[]byte) error { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount int, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) error { // Build the WriteRequest with no metadata. - req, highest, err := buildWriteRequest(samples, nil, *buf) + req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. @@ -1266,7 +1268,7 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } } -func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, buf []byte) ([]byte, int64, error) { +func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -1283,7 +1285,12 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta Metadata: metadata, } - data, err := proto.Marshal(req) + if pBuf == nil { + pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. + } else { + pBuf.Reset() + } + err := pBuf.Marshal(req) if err != nil { return nil, highest, err } @@ -1293,6 +1300,6 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta if buf != nil { buf = buf[0:cap(buf)] } - compressed := snappy.Encode(buf, data) + compressed := snappy.Encode(buf, pBuf.Bytes()) return compressed, highest, nil } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index d4ba5bfd8b..b4930f00b5 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -32,7 +32,7 @@ import ( ) func TestRemoteWriteHandler(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -68,7 +68,7 @@ func TestOutOfOrderSample(t *testing.T) { buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }}, nil, nil) + }}, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -93,7 +93,7 @@ func TestOutOfOrderExemplar(t *testing.T) { buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, - }}, nil, nil) + }}, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -113,7 +113,7 @@ func TestOutOfOrderExemplar(t *testing.T) { } func TestCommitErr(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf))