feat(promtool): add RW2 support to promtool push metrics using client_golang library (#17280)

* Add WriteProto method and tests for promtool metrics

This commit adds:
1. WriteProto method to storage/remote/client.go that handles
   marshaling and compression of protobuf messages
2. Updated parseAndPushMetrics in cmd/promtool/metrics.go to use
   the new WriteProto method
3. Comprehensive tests for PushMetrics functionality

The WriteProto method provides a cleaner API for sending protobuf
messages without manually handling marshaling and compression.

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* use Write method from exp/api/remote

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* fix

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* fix lint

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* fix test

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* fix

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* nit fixed

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* fix lint

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

---------

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
Minh Nguyen 2025-10-27 15:56:48 +02:00 committed by GitHub
parent dca3289c28
commit 6bb367970e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 151 additions and 48 deletions

View File

@ -23,11 +23,8 @@ import (
"os"
"time"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
remoteapi "github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/compression"
"github.com/prometheus/prometheus/util/fmtutil"
)
@ -39,28 +36,22 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return failureExitCode
}
// build remote write client
writeClient, err := remote.NewWriteClient("remote-write", &remote.ClientConfig{
URL: &config_util.URL{URL: addressURL},
Timeout: model.Duration(timeout),
})
// Build HTTP client with custom transport for headers.
httpClient := &http.Client{
Transport: &setHeadersTransport{
RoundTripper: roundTripper,
headers: headers,
},
Timeout: timeout,
}
// Create remote write API client.
writeAPI, err := remoteapi.NewAPI(addressURL.String(), remoteapi.WithAPIHTTPClient(httpClient))
if err != nil {
fmt.Fprintln(os.Stderr, err)
return failureExitCode
}
// set custom tls config from httpConfigFilePath
// set custom headers to every request
client, ok := writeClient.(*remote.Client)
if !ok {
fmt.Fprintln(os.Stderr, fmt.Errorf("unexpected type %T", writeClient))
return failureExitCode
}
client.Client.Transport = &setHeadersTransport{
RoundTripper: roundTripper,
headers: headers,
}
var data []byte
var failed bool
@ -71,7 +62,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return failureExitCode
}
fmt.Printf("Parsing standard input\n")
if parseAndPushMetrics(client, data, labels) {
if parseAndPushMetrics(writeAPI, data, labels) {
fmt.Printf(" SUCCESS: metrics pushed to remote write.\n")
return successExitCode
}
@ -87,7 +78,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
}
fmt.Printf("Parsing metrics file %s\n", file)
if parseAndPushMetrics(client, data, labels) {
if parseAndPushMetrics(writeAPI, data, labels) {
fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file)
continue
}
@ -101,28 +92,16 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return successExitCode
}
// TODO(bwplotka): Add PRW 2.0 support.
func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool {
func parseAndPushMetrics(writeAPI *remoteapi.API, data []byte, labels map[string]string) bool {
metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
}
raw, err := metricsData.Marshal()
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
}
// Encode the request body into snappy encoding.
compressed, err := compression.Encode(compression.Snappy, raw, nil)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
}
_, err = client.Store(context.Background(), compressed, 0)
// Use remoteapi.Write which handles marshaling and compression internally.
// TODO: Add feature flags to support V2.
_, err = writeAPI.Write(context.Background(), remoteapi.WriteV1MessageType, metricsData)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false

View File

@ -0,0 +1,120 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"time"
remoteapi "github.com/prometheus/client_golang/exp/api/remote"
"github.com/stretchr/testify/require"
)
func TestPushMetrics(t *testing.T) {
tests := []struct {
name string
metricsData string
}{
{
name: "successful push with gauge metrics",
metricsData: `# HELP test_metric A test metric
# TYPE test_metric gauge
test_metric{label="value1"} 42.0
test_metric{label="value2"} 43.0
`,
},
{
name: "successful push with counter metrics",
metricsData: `# HELP test_counter A test counter
# TYPE test_counter counter
test_counter 100
`,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Create test server using client_golang's remote write handler.
store := &mockStorage{}
handler := remoteapi.NewWriteHandler(
store,
remoteapi.MessageTypes{remoteapi.WriteV1MessageType},
)
server := httptest.NewServer(handler)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Create a temp file with metrics data.
tmpFile := t.TempDir() + "/metrics.txt"
err = os.WriteFile(tmpFile, []byte(tc.metricsData), 0o644)
require.NoError(t, err)
// Call PushMetrics.
status := PushMetrics(
serverURL,
http.DefaultTransport,
map[string]string{},
30*time.Second,
map[string]string{"job": "test"},
tmpFile,
)
require.Equal(t, successExitCode, status)
// Verify that the handler received and processed the request.
require.True(t, store.called, "Handler should have been called")
require.NoError(t, store.lastErr, "Handler should not have returned an error")
// Verify proper data propagation.
require.NotEmpty(t, store.receivedData, "Request should contain data (compression and decompression successful)")
require.Contains(t, store.receivedContentType, "application/x-protobuf", "Content-Type should be protobuf")
})
}
}
// mockStorage is a simple mock for testing the remote write handler.
type mockStorage struct {
called bool
lastErr error
receivedData []byte
receivedContentType string
}
func (m *mockStorage) Store(req *http.Request, _ remoteapi.WriteMessageType) (*remoteapi.WriteResponse, error) {
m.called = true
// Capture content-type header.
m.receivedContentType = req.Header.Get("Content-Type")
if req.Body != nil {
data, err := io.ReadAll(req.Body)
if err == nil {
m.receivedData = data
}
}
if m.lastErr != nil {
return nil, m.lastErr
}
resp := remoteapi.NewWriteResponse()
resp.SetStatusCode(http.StatusNoContent)
return resp, nil
}

View File

@ -225,6 +225,7 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace {
return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans())
}))
return &Client{
remoteName: name,
urlString: conf.URL.String(),

View File

@ -126,14 +126,21 @@ func TestIgnoreExternalLabels(t *testing.T) {
require.NoError(t, err)
}
// mustURLParse parses a URL and panics on error.
func mustURLParse(rawURL string) *url.URL {
u, err := url.Parse(rawURL)
if err != nil {
panic(fmt.Sprintf("failed to parse URL %q: %v", rawURL, err))
}
return u
}
// baseRemoteWriteConfig copy values from global Default Write config
// to avoid change global state and cross impact test execution.
func baseRemoteWriteConfig(host string) *config.RemoteWriteConfig {
cfg := config.DefaultRemoteWriteConfig
cfg.URL = &common_config.URL{
URL: &url.URL{
Host: host,
},
URL: mustURLParse(host),
}
return &cfg
}
@ -143,9 +150,7 @@ func baseRemoteWriteConfig(host string) *config.RemoteWriteConfig {
func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
cfg := config.DefaultRemoteReadConfig
cfg.URL = &common_config.URL{
URL: &url.URL{
Host: host,
},
URL: mustURLParse(host),
}
return &cfg
}

View File

@ -311,9 +311,7 @@ func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
// We need to set URL's so that metric creation doesn't panic.
for i := range conf.RemoteWriteConfigs {
conf.RemoteWriteConfigs[i].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
URL: mustURLParse("http://test-storage.com"),
}
}
require.NoError(t, s.ApplyConfig(conf))