fix(http): concurrent map read/write (#5753)

* fix(http): concurrent map read/write

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* fix(http): concurrent map read/write

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* fix(http): concurrent map read/write

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* fix(http): concurrent map read/write

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* fix(http): concurrent map read/write

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

---------

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
Ivan Ka 2025-08-21 12:53:06 +01:00 committed by GitHub
parent 9cbe200e3b
commit ccebff99e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 132 additions and 285 deletions

View File

@ -29,7 +29,6 @@ import (
)
var (
RequestDurationLabels = metrics.NewLabels([]string{"scheme", "host", "path", "method", "status"})
RequestDurationMetric = metrics.NewSummaryVecWithOpts(
prometheus.SummaryOpts{
Name: "request_duration_seconds",
@ -38,7 +37,7 @@ var (
ConstLabels: prometheus.Labels{"handler": "instrumented_http"},
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
*RequestDurationLabels,
[]string{metrics.LabelScheme, metrics.LabelHost, metrics.LabelPath, metrics.LabelMethod, metrics.LabelStatus},
)
)
@ -64,15 +63,13 @@ func (r *CustomRoundTripper) RoundTrip(req *http.Request) (*http.Response, error
status = fmt.Sprintf("%d", resp.StatusCode)
}
RequestDurationLabels.WithOptions(
metrics.WithLabel("scheme", req.URL.Scheme),
metrics.WithLabel("host", req.URL.Host),
metrics.WithLabel("path", metrics.PathProcessor(req.URL.Path)),
metrics.WithLabel("method", req.Method),
metrics.WithLabel("status", status),
)
RequestDurationMetric.SetWithLabels(time.Since(start).Seconds(), RequestDurationLabels)
RequestDurationMetric.SetWithLabels(time.Since(start).Seconds(), metrics.Labels{
metrics.LabelScheme: req.URL.Scheme,
metrics.LabelHost: req.URL.Host,
metrics.LabelPath: metrics.PathProcessor(req.URL.Path),
metrics.LabelMethod: req.Method,
metrics.LabelStatus: status,
})
return resp, err
}

View File

@ -0,0 +1,97 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package http
import (
"bytes"
"io"
"net/http"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type roundTripFunc func(req *http.Request) *http.Response
func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req), nil
}
// newTestClient returns *http.client with Transport replaced to avoid making real calls
func newTestClient(fn roundTripFunc) *http.Client {
return &http.Client{
Transport: NewInstrumentedTransport(fn),
}
}
type apiUnderTest struct {
client *http.Client
baseURL string
}
func (api *apiUnderTest) doStuff() ([]byte, error) {
resp, err := api.client.Get(api.baseURL + "/some/path")
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
func BenchmarkRoundTripper(b *testing.B) {
client := newTestClient(func(req *http.Request) *http.Response {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString(`OK`)),
Header: make(http.Header),
}
})
for b.Loop() {
api := apiUnderTest{client, "http://example.com"}
body, err := api.doStuff()
require.NoError(b, err)
assert.Equal(b, []byte("OK"), body)
}
}
func TestRoundTripper_Concurrent(t *testing.T) {
client := newTestClient(func(req *http.Request) *http.Response {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString(`OK`)),
Header: make(http.Header),
}
})
api := &apiUnderTest{client: client, baseURL: "http://example.com"}
const numGoroutines = 100
var wg sync.WaitGroup
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
body, err := api.doStuff()
assert.NoError(t, err)
assert.Equal(t, []byte("OK"), body)
}()
}
wg.Wait()
}

View File

@ -17,62 +17,15 @@ limitations under the License.
package metrics
import (
"sort"
"strings"
"github.com/sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
)
type Labels struct {
values map[string]string
}
const (
LabelScheme = "scheme"
LabelHost = "host"
LabelPath = "path"
LabelMethod = "method"
LabelStatus = "status"
)
func (labels *Labels) GetKeysInOrder() []string {
keys := make([]string, 0, len(labels.values))
for key := range labels.values {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
}
func (labels *Labels) GetValuesOrderedByKey() []string {
var orderedValues []string
for _, key := range labels.GetKeysInOrder() {
orderedValues = append(orderedValues, labels.values[key])
}
return orderedValues
}
type LabelOption func(*Labels)
func NewLabels(labelNames []string) *Labels {
labels := &Labels{
values: make(map[string]string),
}
for _, label := range labelNames {
labels.values[strings.ToLower(label)] = ""
}
return labels
}
func (labels *Labels) WithOptions(options ...LabelOption) {
for _, option := range options {
option(labels)
}
}
func WithLabel(labelName string, labelValue string) LabelOption {
return func(labels *Labels) {
if _, ok := labels.values[strings.ToLower(labelName)]; !ok {
logrus.Errorf("Attempting to set a value for a label that doesn't exist! '%s' does not exist!", labelName)
} else {
labels.values[strings.ToLower(labelName)] = labelValue
}
}
}
type Labels = prometheus.Labels

View File

@ -1,191 +0,0 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"sigs.k8s.io/external-dns/internal/testutils"
)
func TestNewLabels(t *testing.T) {
tests := []struct {
name string
labelNames []string
expectedLabelNames []string
}{
{
name: "NewLabels initializes Values with labelNames",
labelNames: []string{"label1", "label2"},
expectedLabelNames: []string{"label1", "label2"},
},
{
name: "NewLabels sets labelNames as lower-case",
labelNames: []string{"LABEL1", "label2"},
expectedLabelNames: []string{"label1", "label2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
labels := NewLabels(tt.labelNames)
keys := labels.GetKeysInOrder()
assert.Equal(t, tt.expectedLabelNames, keys)
})
}
}
func TestLabelsWithOptions(t *testing.T) {
tests := []struct {
name string
labelNames []string
options []LabelOption
expectedValuesMap map[string]string
}{
{
name: "WithOptions sets label values",
labelNames: []string{"label1", "label2"},
options: []LabelOption{
WithLabel("label1", "alpha"),
WithLabel("label2", "beta"),
},
expectedValuesMap: map[string]string{
"label1": "alpha",
"label2": "beta",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
labels := NewLabels(tt.labelNames)
labels.WithOptions(tt.options...)
assert.Equal(t, tt.expectedValuesMap, labels.values)
})
}
}
func TestLabelsWithLabel(t *testing.T) {
tests := []struct {
name string
labelNames []string
labelName string
labelValue string
expectedLabels *Labels
expectedErrorLog string
}{
{
name: "WithLabel sets label and value",
labelNames: []string{"label1"},
labelName: "label1",
labelValue: "alpha",
expectedLabels: &Labels{
values: map[string]string{
"label1": "alpha",
}},
},
{
name: "WithLabel sets labelName to lowercase",
labelNames: []string{"label1"},
labelName: "LABEL1",
labelValue: "alpha",
expectedLabels: &Labels{
values: map[string]string{
"label1": "alpha",
}},
},
{
name: "WithLabel errors if label doesn't exist",
labelNames: []string{"label1"},
labelName: "notreal",
labelValue: "",
expectedErrorLog: "Attempting to set a value for a label that doesn't exist! 'notreal' does not exist!",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
hook := testutils.LogsUnderTestWithLogLevel(logrus.WarnLevel, t)
labels := NewLabels(tt.labelNames)
labels.WithOptions(WithLabel(tt.labelName, tt.labelValue))
if tt.expectedLabels != nil {
assert.Equal(t, tt.expectedLabels, labels)
}
if tt.expectedErrorLog != "" {
testutils.TestHelperLogContains(tt.expectedErrorLog, hook, t)
}
})
}
}
func TestLabelsGetKeysInOrder(t *testing.T) {
tests := []struct {
name string
labels *Labels
expectedKeysInOrder []string
}{
{
"GetKeysInOrder returns keys in alphabetical order",
NewLabels([]string{"label2", "label1"}),
[]string{"label1", "label2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
orderedKeys := tt.labels.GetKeysInOrder()
assert.Equal(t, tt.expectedKeysInOrder, orderedKeys)
})
}
}
func TestLabelsGetValuesOrderedByKey(t *testing.T) {
tests := []struct {
name string
labels *Labels
labelOptions []LabelOption
expectedValuesInOrder []string
}{
{
"GetKeysInOrder returns keys in alphabetical order",
NewLabels([]string{"label1", "label2"}),
[]LabelOption{
WithLabel("label2", "beta"),
WithLabel("label1", "alpha"),
},
[]string{"alpha", "beta"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.labels.WithOptions(tt.labelOptions...)
orderedValues := tt.labels.GetValuesOrderedByKey()
assert.Equal(t, tt.expectedValuesInOrder, orderedValues)
})
}
}

View File

@ -33,12 +33,6 @@ func (m *MockMetric) Get() *Metric {
return &Metric{FQDN: m.FQDN}
}
func getTestLabels(t *testing.T, labelNames []string) *Labels {
t.Helper()
return NewLabels(labelNames)
}
func TestMustRegister(t *testing.T) {
tests := []struct {
name string
@ -67,7 +61,7 @@ func TestMustRegister(t *testing.T) {
NewCounterWithOpts(prometheus.CounterOpts{Name: "test_counter_3"}),
NewCounterVecWithOpts(prometheus.CounterOpts{Name: "test_counter_vec_3"}, []string{"label"}),
NewGaugedVectorOpts(prometheus.GaugeOpts{Name: "test_gauge_v_3"}, []string{"label"}),
NewSummaryVecWithOpts(prometheus.SummaryOpts{Name: "test_summary_v_3"}, *NewLabels([]string{"label"})),
NewSummaryVecWithOpts(prometheus.SummaryOpts{Name: "test_summary_v_3"}, []string{"label"}),
},
expected: 5,
},

View File

@ -186,11 +186,11 @@ func (s SummaryVecMetric) Get() *Metric {
return &s.Metric
}
func (s SummaryVecMetric) SetWithLabels(value float64, labels *Labels) {
s.SummaryVec.WithLabelValues(labels.GetValuesOrderedByKey()...).Observe(value)
func (s SummaryVecMetric) SetWithLabels(value float64, labels prometheus.Labels) {
s.SummaryVec.With(labels).Observe(value)
}
func NewSummaryVecWithOpts(opts prometheus.SummaryOpts, labels Labels) SummaryVecMetric {
func NewSummaryVecWithOpts(opts prometheus.SummaryOpts, labels []string) SummaryVecMetric {
opts.Namespace = Namespace
return SummaryVecMetric{
Metric: Metric{
@ -201,7 +201,7 @@ func NewSummaryVecWithOpts(opts prometheus.SummaryOpts, labels Labels) SummaryVe
Subsystem: opts.Subsystem,
Help: opts.Help,
},
SummaryVec: *prometheus.NewSummaryVec(opts, labels.GetKeysInOrder()),
SummaryVec: *prometheus.NewSummaryVec(opts, labels),
}
}

View File

@ -192,13 +192,11 @@ func TestSummaryV_SetWithLabels(t *testing.T) {
Help: "help text",
}
labels := NewLabels([]string{"label1", "label2"})
sv := NewSummaryVecWithOpts(opts, *labels)
labels := Labels{}
sv := NewSummaryVecWithOpts(opts, []string{"label1", "label2"})
labels.WithOptions(
WithLabel("label1", "alpha"),
WithLabel("label2", "beta"),
)
labels["label1"] = "alpha"
labels["label2"] = "beta"
sv.SetWithLabels(5.01, labels)

View File

@ -61,23 +61,22 @@ var extractAWSRequestParameters = middleware.DeserializeMiddlewareFunc("extractA
requestMetrics := getRequestMetric(ctx)
labels := metrics.Labels{}
if req, ok := in.Request.(*smithyhttp.Request); ok && req != nil {
extdnshttp.RequestDurationLabels.WithOptions(
metrics.WithLabel("scheme", req.URL.Scheme),
metrics.WithLabel("host", req.URL.Host),
metrics.WithLabel("path", metrics.PathProcessor(req.URL.Path)),
metrics.WithLabel("method", req.Method),
)
labels[metrics.LabelScheme] = req.URL.Scheme
labels[metrics.LabelHost] = req.URL.Host
labels[metrics.LabelPath] = metrics.PathProcessor(req.URL.Path)
labels[metrics.LabelMethod] = req.Method
labels[metrics.LabelStatus] = "unknown"
}
// Try to access HTTP response and status code
if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil {
extdnshttp.RequestDurationLabels.WithOptions(
metrics.WithLabel("status", fmt.Sprintf("%d", resp.StatusCode)),
)
labels[metrics.LabelStatus] = fmt.Sprintf("%d", resp.StatusCode)
}
extdnshttp.RequestDurationMetric.SetWithLabels(time.Since(requestMetrics.StartTime).Seconds(), extdnshttp.RequestDurationLabels)
extdnshttp.RequestDurationMetric.SetWithLabels(time.Since(requestMetrics.StartTime).Seconds(), labels)
return out, metadata, err
})