mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 02:11:01 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			477 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			477 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2017 The Prometheus Authors
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package remote
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"net/http"
 | 
						|
	"net/http/httptest"
 | 
						|
	"net/url"
 | 
						|
	"strings"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/gogo/protobuf/proto"
 | 
						|
	"github.com/golang/snappy"
 | 
						|
	config_util "github.com/prometheus/common/config"
 | 
						|
	"github.com/prometheus/common/model"
 | 
						|
	"github.com/stretchr/testify/require"
 | 
						|
 | 
						|
	"github.com/prometheus/prometheus/config"
 | 
						|
	"github.com/prometheus/prometheus/prompb"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | 
						|
)
 | 
						|
 | 
						|
var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
 | 
						|
 | 
						|
func TestStoreHTTPErrorHandling(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		code int
 | 
						|
		err  error
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			code: 200,
 | 
						|
			err:  nil,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			code: 300,
 | 
						|
			err:  errors.New("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]),
 | 
						|
		},
 | 
						|
		{
 | 
						|
			code: 404,
 | 
						|
			err:  errors.New("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]),
 | 
						|
		},
 | 
						|
		{
 | 
						|
			code: 500,
 | 
						|
			err:  RecoverableError{errors.New("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen]), defaultBackoff},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, test := range tests {
 | 
						|
		server := httptest.NewServer(
 | 
						|
			http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
 | 
						|
				http.Error(w, longErrMessage, test.code)
 | 
						|
			}),
 | 
						|
		)
 | 
						|
 | 
						|
		serverURL, err := url.Parse(server.URL)
 | 
						|
		require.NoError(t, err)
 | 
						|
 | 
						|
		conf := &ClientConfig{
 | 
						|
			URL:     &config_util.URL{URL: serverURL},
 | 
						|
			Timeout: model.Duration(time.Second),
 | 
						|
		}
 | 
						|
 | 
						|
		hash, err := toHash(conf)
 | 
						|
		require.NoError(t, err)
 | 
						|
		c, err := NewWriteClient(hash, conf)
 | 
						|
		require.NoError(t, err)
 | 
						|
 | 
						|
		_, err = c.Store(context.Background(), []byte{}, 0)
 | 
						|
		if test.err != nil {
 | 
						|
			require.EqualError(t, err, test.err.Error())
 | 
						|
		} else {
 | 
						|
			require.NoError(t, err)
 | 
						|
		}
 | 
						|
 | 
						|
		server.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestClientRetryAfter(t *testing.T) {
 | 
						|
	setupServer := func(statusCode int) *httptest.Server {
 | 
						|
		return httptest.NewServer(
 | 
						|
			http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
 | 
						|
				w.Header().Set("Retry-After", "5")
 | 
						|
				http.Error(w, longErrMessage, statusCode)
 | 
						|
			}),
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	getClientConfig := func(serverURL *url.URL, retryOnRateLimit bool) *ClientConfig {
 | 
						|
		return &ClientConfig{
 | 
						|
			URL:              &config_util.URL{URL: serverURL},
 | 
						|
			Timeout:          model.Duration(time.Second),
 | 
						|
			RetryOnRateLimit: retryOnRateLimit,
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	getClient := func(conf *ClientConfig) WriteClient {
 | 
						|
		hash, err := toHash(conf)
 | 
						|
		require.NoError(t, err)
 | 
						|
		c, err := NewWriteClient(hash, conf)
 | 
						|
		require.NoError(t, err)
 | 
						|
		return c
 | 
						|
	}
 | 
						|
 | 
						|
	testCases := []struct {
 | 
						|
		name                string
 | 
						|
		statusCode          int
 | 
						|
		retryOnRateLimit    bool
 | 
						|
		expectedRecoverable bool
 | 
						|
		expectedRetryAfter  model.Duration
 | 
						|
	}{
 | 
						|
		{"TooManyRequests - No Retry", http.StatusTooManyRequests, false, false, 0},
 | 
						|
		{"TooManyRequests - With Retry", http.StatusTooManyRequests, true, true, 5 * model.Duration(time.Second)},
 | 
						|
		{"InternalServerError", http.StatusInternalServerError, false, true, 5 * model.Duration(time.Second)}, // HTTP 5xx errors do not depend on retryOnRateLimit.
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range testCases {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			server := setupServer(tc.statusCode)
 | 
						|
			defer server.Close()
 | 
						|
 | 
						|
			serverURL, err := url.Parse(server.URL)
 | 
						|
			require.NoError(t, err)
 | 
						|
 | 
						|
			c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit))
 | 
						|
 | 
						|
			var recErr RecoverableError
 | 
						|
			_, err = c.Store(context.Background(), []byte{}, 0)
 | 
						|
			require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.")
 | 
						|
			if tc.expectedRecoverable {
 | 
						|
				require.Equal(t, tc.expectedRetryAfter, recErr.retryAfter)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestRetryAfterDuration(t *testing.T) {
 | 
						|
	tc := []struct {
 | 
						|
		name     string
 | 
						|
		tInput   string
 | 
						|
		expected model.Duration
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:     "seconds",
 | 
						|
			tInput:   "120",
 | 
						|
			expected: model.Duration(time.Second * 120),
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:     "date-time default",
 | 
						|
			tInput:   time.RFC1123, // Expected layout is http.TimeFormat, hence an error.
 | 
						|
			expected: defaultBackoff,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:     "retry-after not provided",
 | 
						|
			tInput:   "", // Expected layout is http.TimeFormat, hence an error.
 | 
						|
			expected: defaultBackoff,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	for _, c := range tc {
 | 
						|
		require.Equal(t, c.expected, retryAfterDuration(c.tInput), c.name)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestClientCustomHeaders(t *testing.T) {
 | 
						|
	headersToSend := map[string]string{"Foo": "Bar", "Baz": "qux"}
 | 
						|
 | 
						|
	var called bool
 | 
						|
	server := httptest.NewServer(
 | 
						|
		http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
 | 
						|
			called = true
 | 
						|
			receivedHeaders := r.Header
 | 
						|
			for name, value := range headersToSend {
 | 
						|
				require.Equal(
 | 
						|
					t,
 | 
						|
					[]string{value},
 | 
						|
					receivedHeaders.Values(name),
 | 
						|
					"expected %v to be part of the received headers %v",
 | 
						|
					headersToSend,
 | 
						|
					receivedHeaders,
 | 
						|
				)
 | 
						|
			}
 | 
						|
		}),
 | 
						|
	)
 | 
						|
	defer server.Close()
 | 
						|
 | 
						|
	serverURL, err := url.Parse(server.URL)
 | 
						|
	require.NoError(t, err)
 | 
						|
 | 
						|
	conf := &ClientConfig{
 | 
						|
		URL:     &config_util.URL{URL: serverURL},
 | 
						|
		Timeout: model.Duration(time.Second),
 | 
						|
		Headers: headersToSend,
 | 
						|
	}
 | 
						|
 | 
						|
	c, err := NewWriteClient("c", conf)
 | 
						|
	require.NoError(t, err)
 | 
						|
 | 
						|
	_, err = c.Store(context.Background(), []byte{}, 0)
 | 
						|
	require.NoError(t, err)
 | 
						|
 | 
						|
	require.True(t, called, "The remote server wasn't called")
 | 
						|
}
 | 
						|
 | 
						|
func TestReadClient(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		name                  string
 | 
						|
		query                 *prompb.Query
 | 
						|
		httpHandler           http.HandlerFunc
 | 
						|
		timeout               time.Duration
 | 
						|
		expectedLabels        []map[string]string
 | 
						|
		expectedSamples       [][]model.SamplePair
 | 
						|
		expectedErrorContains string
 | 
						|
		sortSeries            bool
 | 
						|
		unwrap                bool
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:        "sorted sampled response",
 | 
						|
			httpHandler: sampledResponseHTTPHandler(t),
 | 
						|
			expectedLabels: []map[string]string{
 | 
						|
				{"foo1": "bar"},
 | 
						|
				{"foo2": "bar"},
 | 
						|
			},
 | 
						|
			expectedSamples: [][]model.SamplePair{
 | 
						|
				{
 | 
						|
					{Timestamp: model.Time(0), Value: model.SampleValue(3)},
 | 
						|
					{Timestamp: model.Time(5), Value: model.SampleValue(4)},
 | 
						|
				},
 | 
						|
				{
 | 
						|
					{Timestamp: model.Time(0), Value: model.SampleValue(1)},
 | 
						|
					{Timestamp: model.Time(5), Value: model.SampleValue(2)},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expectedErrorContains: "",
 | 
						|
			sortSeries:            true,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:        "unsorted sampled response",
 | 
						|
			httpHandler: sampledResponseHTTPHandler(t),
 | 
						|
			expectedLabels: []map[string]string{
 | 
						|
				{"foo2": "bar"},
 | 
						|
				{"foo1": "bar"},
 | 
						|
			},
 | 
						|
			expectedSamples: [][]model.SamplePair{
 | 
						|
				{
 | 
						|
					{Timestamp: model.Time(0), Value: model.SampleValue(1)},
 | 
						|
					{Timestamp: model.Time(5), Value: model.SampleValue(2)},
 | 
						|
				},
 | 
						|
				{
 | 
						|
					{Timestamp: model.Time(0), Value: model.SampleValue(3)},
 | 
						|
					{Timestamp: model.Time(5), Value: model.SampleValue(4)},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expectedErrorContains: "",
 | 
						|
			sortSeries:            false,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "chunked response",
 | 
						|
			query: &prompb.Query{
 | 
						|
				StartTimestampMs: 4000,
 | 
						|
				EndTimestampMs:   12000,
 | 
						|
			},
 | 
						|
			httpHandler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
 | 
						|
				w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
 | 
						|
 | 
						|
				flusher, ok := w.(http.Flusher)
 | 
						|
				require.True(t, ok)
 | 
						|
 | 
						|
				cw := NewChunkedWriter(w, flusher)
 | 
						|
				l := []prompb.Label{
 | 
						|
					{Name: "foo", Value: "bar"},
 | 
						|
				}
 | 
						|
 | 
						|
				chunks := buildTestChunks(t)
 | 
						|
				for i, c := range chunks {
 | 
						|
					cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}}
 | 
						|
					readResp := prompb.ChunkedReadResponse{
 | 
						|
						ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
 | 
						|
						QueryIndex:    int64(i),
 | 
						|
					}
 | 
						|
 | 
						|
					b, err := proto.Marshal(&readResp)
 | 
						|
					require.NoError(t, err)
 | 
						|
 | 
						|
					_, err = cw.Write(b)
 | 
						|
					require.NoError(t, err)
 | 
						|
				}
 | 
						|
			}),
 | 
						|
			expectedLabels: []map[string]string{
 | 
						|
				{"foo": "bar"},
 | 
						|
				{"foo": "bar"},
 | 
						|
				{"foo": "bar"},
 | 
						|
			},
 | 
						|
			// This is the output of buildTestChunks minus the samples outside the query range.
 | 
						|
			expectedSamples: [][]model.SamplePair{
 | 
						|
				{
 | 
						|
					{Timestamp: model.Time(4000), Value: model.SampleValue(4)},
 | 
						|
				},
 | 
						|
				{
 | 
						|
					{Timestamp: model.Time(5000), Value: model.SampleValue(1)},
 | 
						|
					{Timestamp: model.Time(6000), Value: model.SampleValue(2)},
 | 
						|
					{Timestamp: model.Time(7000), Value: model.SampleValue(3)},
 | 
						|
					{Timestamp: model.Time(8000), Value: model.SampleValue(4)},
 | 
						|
					{Timestamp: model.Time(9000), Value: model.SampleValue(5)},
 | 
						|
				},
 | 
						|
				{
 | 
						|
					{Timestamp: model.Time(10000), Value: model.SampleValue(2)},
 | 
						|
					{Timestamp: model.Time(11000), Value: model.SampleValue(3)},
 | 
						|
					{Timestamp: model.Time(12000), Value: model.SampleValue(4)},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expectedErrorContains: "",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "unsupported content type",
 | 
						|
			httpHandler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
 | 
						|
				w.Header().Set("Content-Type", "foobar")
 | 
						|
			}),
 | 
						|
			expectedErrorContains: "unsupported content type",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                  "timeout",
 | 
						|
			httpHandler:           delayedResponseHTTPHandler(t, 15*time.Millisecond),
 | 
						|
			timeout:               5 * time.Millisecond,
 | 
						|
			expectedErrorContains: "context deadline exceeded: request timed out after 5ms",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "unwrap error",
 | 
						|
			httpHandler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
 | 
						|
				http.Error(w, "test error", http.StatusBadRequest)
 | 
						|
			}),
 | 
						|
			expectedErrorContains: "test error",
 | 
						|
			unwrap:                true,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, test := range tests {
 | 
						|
		t.Run(test.name, func(t *testing.T) {
 | 
						|
			server := httptest.NewServer(test.httpHandler)
 | 
						|
			defer server.Close()
 | 
						|
 | 
						|
			u, err := url.Parse(server.URL)
 | 
						|
			require.NoError(t, err)
 | 
						|
 | 
						|
			if test.timeout == 0 {
 | 
						|
				test.timeout = 5 * time.Second
 | 
						|
			}
 | 
						|
 | 
						|
			conf := &ClientConfig{
 | 
						|
				URL:              &config_util.URL{URL: u},
 | 
						|
				Timeout:          model.Duration(test.timeout),
 | 
						|
				ChunkedReadLimit: config.DefaultChunkedReadLimit,
 | 
						|
			}
 | 
						|
			c, err := NewReadClient("test", conf)
 | 
						|
			require.NoError(t, err)
 | 
						|
 | 
						|
			query := &prompb.Query{}
 | 
						|
			if test.query != nil {
 | 
						|
				query = test.query
 | 
						|
			}
 | 
						|
 | 
						|
			ss, err := c.Read(context.Background(), query, test.sortSeries)
 | 
						|
			if test.expectedErrorContains != "" {
 | 
						|
				require.ErrorContains(t, err, test.expectedErrorContains)
 | 
						|
				if test.unwrap {
 | 
						|
					err = errors.Unwrap(err)
 | 
						|
					require.EqualError(t, err, test.expectedErrorContains)
 | 
						|
				}
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			require.NoError(t, err)
 | 
						|
 | 
						|
			i := 0
 | 
						|
 | 
						|
			for ss.Next() {
 | 
						|
				require.NoError(t, ss.Err())
 | 
						|
				s := ss.At()
 | 
						|
 | 
						|
				l := s.Labels()
 | 
						|
				require.Len(t, test.expectedLabels[i], l.Len())
 | 
						|
				for k, v := range test.expectedLabels[i] {
 | 
						|
					require.True(t, l.Has(k))
 | 
						|
					require.Equal(t, v, l.Get(k))
 | 
						|
				}
 | 
						|
 | 
						|
				it := s.Iterator(nil)
 | 
						|
				j := 0
 | 
						|
 | 
						|
				for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() {
 | 
						|
					require.NoError(t, it.Err())
 | 
						|
 | 
						|
					ts, v := it.At()
 | 
						|
					expectedSample := test.expectedSamples[i][j]
 | 
						|
 | 
						|
					require.Equal(t, int64(expectedSample.Timestamp), ts)
 | 
						|
					require.Equal(t, float64(expectedSample.Value), v)
 | 
						|
 | 
						|
					j++
 | 
						|
				}
 | 
						|
 | 
						|
				require.Len(t, test.expectedSamples[i], j)
 | 
						|
 | 
						|
				i++
 | 
						|
			}
 | 
						|
 | 
						|
			require.NoError(t, ss.Err())
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func sampledResponseHTTPHandler(t *testing.T) http.HandlerFunc {
 | 
						|
	return func(w http.ResponseWriter, _ *http.Request) {
 | 
						|
		w.Header().Set("Content-Type", "application/x-protobuf")
 | 
						|
 | 
						|
		resp := prompb.ReadResponse{
 | 
						|
			Results: []*prompb.QueryResult{
 | 
						|
				{
 | 
						|
					Timeseries: []*prompb.TimeSeries{
 | 
						|
						{
 | 
						|
							Labels: []prompb.Label{
 | 
						|
								{Name: "foo2", Value: "bar"},
 | 
						|
							},
 | 
						|
							Samples: []prompb.Sample{
 | 
						|
								{Value: float64(1), Timestamp: int64(0)},
 | 
						|
								{Value: float64(2), Timestamp: int64(5)},
 | 
						|
							},
 | 
						|
							Exemplars: []prompb.Exemplar{},
 | 
						|
						},
 | 
						|
						{
 | 
						|
							Labels: []prompb.Label{
 | 
						|
								{Name: "foo1", Value: "bar"},
 | 
						|
							},
 | 
						|
							Samples: []prompb.Sample{
 | 
						|
								{Value: float64(3), Timestamp: int64(0)},
 | 
						|
								{Value: float64(4), Timestamp: int64(5)},
 | 
						|
							},
 | 
						|
							Exemplars: []prompb.Exemplar{},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}
 | 
						|
		b, err := proto.Marshal(&resp)
 | 
						|
		require.NoError(t, err)
 | 
						|
 | 
						|
		_, err = w.Write(snappy.Encode(nil, b))
 | 
						|
		require.NoError(t, err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func delayedResponseHTTPHandler(t *testing.T, delay time.Duration) http.HandlerFunc {
 | 
						|
	return func(w http.ResponseWriter, _ *http.Request) {
 | 
						|
		time.Sleep(delay)
 | 
						|
 | 
						|
		w.Header().Set("Content-Type", "application/x-protobuf")
 | 
						|
		b, err := proto.Marshal(&prompb.ReadResponse{})
 | 
						|
		require.NoError(t, err)
 | 
						|
 | 
						|
		_, err = w.Write(snappy.Encode(nil, b))
 | 
						|
		require.NoError(t, err)
 | 
						|
	}
 | 
						|
}
 |