From 6bb367970e8d4c3ea50b3f0ecbace4f2470ed155 Mon Sep 17 00:00:00 2001 From: Minh Nguyen <148210689+pipiland2612@users.noreply.github.com> Date: Mon, 27 Oct 2025 15:56:48 +0200 Subject: [PATCH] feat(promtool): add RW2 support to promtool push metrics using client_golang library (#17280) * Add WriteProto method and tests for promtool metrics This commit adds: 1. WriteProto method to storage/remote/client.go that handles marshaling and compression of protobuf messages 2. Updated parseAndPushMetrics in cmd/promtool/metrics.go to use the new WriteProto method 3. Comprehensive tests for PushMetrics functionality The WriteProto method provides a cleaner API for sending protobuf messages without manually handling marshaling and compression. Signed-off-by: pipiland2612 * use Write method from exp/api/remote Signed-off-by: pipiland2612 * fix Signed-off-by: pipiland2612 * fix lint Signed-off-by: pipiland2612 * fix test Signed-off-by: pipiland2612 * fix Signed-off-by: pipiland2612 * nit fixed Signed-off-by: pipiland2612 * fix lint Signed-off-by: pipiland2612 --------- Signed-off-by: pipiland2612 --- cmd/promtool/metrics.go | 57 +++++----------- cmd/promtool/metrics_test.go | 120 +++++++++++++++++++++++++++++++++ storage/remote/client.go | 1 + storage/remote/storage_test.go | 17 +++-- storage/remote/write_test.go | 4 +- 5 files changed, 151 insertions(+), 48 deletions(-) create mode 100644 cmd/promtool/metrics_test.go diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 56b5209541..ee9c03d61c 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -23,11 +23,8 @@ import ( "os" "time" - config_util "github.com/prometheus/common/config" - "github.com/prometheus/common/model" + remoteapi "github.com/prometheus/client_golang/exp/api/remote" - "github.com/prometheus/prometheus/storage/remote" - "github.com/prometheus/prometheus/util/compression" "github.com/prometheus/prometheus/util/fmtutil" ) @@ -39,28 +36,22 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return failureExitCode } - // build remote write client - writeClient, err := remote.NewWriteClient("remote-write", &remote.ClientConfig{ - URL: &config_util.URL{URL: addressURL}, - Timeout: model.Duration(timeout), - }) + // Build HTTP client with custom transport for headers. + httpClient := &http.Client{ + Transport: &setHeadersTransport{ + RoundTripper: roundTripper, + headers: headers, + }, + Timeout: timeout, + } + + // Create remote write API client. + writeAPI, err := remoteapi.NewAPI(addressURL.String(), remoteapi.WithAPIHTTPClient(httpClient)) if err != nil { fmt.Fprintln(os.Stderr, err) return failureExitCode } - // set custom tls config from httpConfigFilePath - // set custom headers to every request - client, ok := writeClient.(*remote.Client) - if !ok { - fmt.Fprintln(os.Stderr, fmt.Errorf("unexpected type %T", writeClient)) - return failureExitCode - } - client.Client.Transport = &setHeadersTransport{ - RoundTripper: roundTripper, - headers: headers, - } - var data []byte var failed bool @@ -71,7 +62,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return failureExitCode } fmt.Printf("Parsing standard input\n") - if parseAndPushMetrics(client, data, labels) { + if parseAndPushMetrics(writeAPI, data, labels) { fmt.Printf(" SUCCESS: metrics pushed to remote write.\n") return successExitCode } @@ -87,7 +78,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin } fmt.Printf("Parsing metrics file %s\n", file) - if parseAndPushMetrics(client, data, labels) { + if parseAndPushMetrics(writeAPI, data, labels) { fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file) continue } @@ -101,28 +92,16 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return successExitCode } -// TODO(bwplotka): Add PRW 2.0 support. -func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool { +func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string]string) bool { metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) return false } - raw, err := metricsData.Marshal() - if err != nil { - fmt.Fprintln(os.Stderr, " FAILED:", err) - return false - } - - // Encode the request body into snappy encoding. - compressed, err := compression.Encode(compression.Snappy, raw, nil) - if err != nil { - fmt.Fprintln(os.Stderr, " FAILED:", err) - return false - } - - _, err = client.Store(context.Background(), compressed, 0) + // Use remoteapi.Write which handles marshaling and compression internally. + // TODO: Add feature flags to support V2. + _, err = writeAPI.Write(context.Background(), remoteapi.WriteV1MessageType, metricsData) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) return false diff --git a/cmd/promtool/metrics_test.go b/cmd/promtool/metrics_test.go new file mode 100644 index 0000000000..c745869a31 --- /dev/null +++ b/cmd/promtool/metrics_test.go @@ -0,0 +1,120 @@ +// Copyright 2025 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "io" + "net/http" + "net/http/httptest" + "net/url" + "os" + "testing" + "time" + + remoteapi "github.com/prometheus/client_golang/exp/api/remote" + "github.com/stretchr/testify/require" +) + +func TestPushMetrics(t *testing.T) { + tests := []struct { + name string + metricsData string + }{ + { + name: "successful push with gauge metrics", + metricsData: `# HELP test_metric A test metric +# TYPE test_metric gauge +test_metric{label="value1"} 42.0 +test_metric{label="value2"} 43.0 +`, + }, + { + name: "successful push with counter metrics", + metricsData: `# HELP test_counter A test counter +# TYPE test_counter counter +test_counter 100 +`, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create test server using client_golang's remote write handler. + store := &mockStorage{} + handler := remoteapi.NewWriteHandler( + store, + remoteapi.MessageTypes{remoteapi.WriteV1MessageType}, + ) + + server := httptest.NewServer(handler) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + // Create a temp file with metrics data. + tmpFile := t.TempDir() + "/metrics.txt" + err = os.WriteFile(tmpFile, []byte(tc.metricsData), 0o644) + require.NoError(t, err) + + // Call PushMetrics. + status := PushMetrics( + serverURL, + http.DefaultTransport, + map[string]string{}, + 30*time.Second, + map[string]string{"job": "test"}, + tmpFile, + ) + + require.Equal(t, successExitCode, status) + // Verify that the handler received and processed the request. + require.True(t, store.called, "Handler should have been called") + require.NoError(t, store.lastErr, "Handler should not have returned an error") + + // Verify proper data propagation. + require.NotEmpty(t, store.receivedData, "Request should contain data (compression and decompression successful)") + require.Contains(t, store.receivedContentType, "application/x-protobuf", "Content-Type should be protobuf") + }) + } +} + +// mockStorage is a simple mock for testing the remote write handler. +type mockStorage struct { + called bool + lastErr error + receivedData []byte + receivedContentType string +} + +func (m *mockStorage) Store(req *http.Request, _ remoteapi.WriteMessageType) (*remoteapi.WriteResponse, error) { + m.called = true + + // Capture content-type header. + m.receivedContentType = req.Header.Get("Content-Type") + + if req.Body != nil { + data, err := io.ReadAll(req.Body) + if err == nil { + m.receivedData = data + } + } + + if m.lastErr != nil { + return nil, m.lastErr + } + resp := remoteapi.NewWriteResponse() + resp.SetStatusCode(http.StatusNoContent) + return resp, nil +} diff --git a/storage/remote/client.go b/storage/remote/client.go index e0d24c3991..c535ea3425 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -225,6 +225,7 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) })) + return &Client{ remoteName: name, urlString: conf.URL.String(), diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index 51e290bf48..f567c7a80b 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -126,14 +126,21 @@ func TestIgnoreExternalLabels(t *testing.T) { require.NoError(t, err) } +// mustURLParse parses a URL and panics on error. +func mustURLParse(rawURL string) *url.URL { + u, err := url.Parse(rawURL) + if err != nil { + panic(fmt.Sprintf("failed to parse URL %q: %v", rawURL, err)) + } + return u +} + // baseRemoteWriteConfig copy values from global Default Write config // to avoid change global state and cross impact test execution. func baseRemoteWriteConfig(host string) *config.RemoteWriteConfig { cfg := config.DefaultRemoteWriteConfig cfg.URL = &common_config.URL{ - URL: &url.URL{ - Host: host, - }, + URL: mustURLParse(host), } return &cfg } @@ -143,9 +150,7 @@ func baseRemoteWriteConfig(host string) *config.RemoteWriteConfig { func baseRemoteReadConfig(host string) *config.RemoteReadConfig { cfg := config.DefaultRemoteReadConfig cfg.URL = &common_config.URL{ - URL: &url.URL{ - Host: host, - }, + URL: mustURLParse(host), } return &cfg } diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index f7726d3178..6103a7f262 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -311,9 +311,7 @@ func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) { // We need to set URL's so that metric creation doesn't panic. for i := range conf.RemoteWriteConfigs { conf.RemoteWriteConfigs[i].URL = &common_config.URL{ - URL: &url.URL{ - Host: "http://test-storage.com", - }, + URL: mustURLParse("http://test-storage.com"), } } require.NoError(t, s.ApplyConfig(conf))