From b5389192583bac5171716e6467a370f29bda3c43 Mon Sep 17 00:00:00 2001
From: tyltr
Date: Wed, 31 Jan 2024 20:27:23 +0800
Subject: [PATCH 01/49] remove redundant code
Signed-off-by: tyltr
---
discovery/legacymanager/manager_test.go | 1 -
1 file changed, 1 deletion(-)
diff --git a/discovery/legacymanager/manager_test.go b/discovery/legacymanager/manager_test.go
index 6fbecabc2a..7675321689 100644
--- a/discovery/legacymanager/manager_test.go
+++ b/discovery/legacymanager/manager_test.go
@@ -1091,7 +1091,6 @@ func TestCoordinationWithReceiver(t *testing.T) {
}
for _, tc := range testCases {
- tc := tc
t.Run(tc.title, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
From d8a9d69f81d4bfd2183289263e74333ac4d628c1 Mon Sep 17 00:00:00 2001
From: Jan Fajerski
Date: Tue, 25 Jun 2024 09:15:09 +0200
Subject: [PATCH 02/49] ci: Add job to report build_all status
This should enable proper status reporting of matrix jobs for release
branches. See also https://github.com/orgs/community/discussions/4324.
The new job will succeed if all build_all jobs succeeded and fail if
there is a single failed or cancelled build job.
This only runs for PRs, not for release tags or the main branch, unlike
the build_all step.
Signed-off-by: Jan Fajerski
---
.github/workflows/ci.yml | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 978218dba2..995aef6757 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -143,6 +143,18 @@ jobs:
with:
parallelism: 12
thread: ${{ matrix.thread }}
+ build_all_status:
+ name: Report status of build Prometheus for all architectures
+ runs-on: ubuntu-latest
+ needs: [build_all]
+ if: github.event_name == 'pull_request' && startsWith(github.event.pull_request.base.ref, 'release-')
+ steps:
+ - name: Successful build
+ if: ${{ !(contains(needs.*.result, 'failure')) && !(contains(needs.*.result, 'cancelled')) }}
+ run: exit 0
+ - name: Failing or cancelled build
+ if: ${{ contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') }}
+ run: exit 1
check_generated_parser:
name: Check generated parser
runs-on: ubuntu-latest
From d84282b105e40a6f5f1e3263ae6a7d735155df76 Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 15 Jul 2024 09:47:16 +0100
Subject: [PATCH 03/49] Labels: use single byte as separator - small speedup
Since `seps` is a variable, `seps[0]` has to be bounds-checked every
time. Replacing with a constant everywhere it is used skips this
overhead.
Signed-off-by: Bryan Boreham
---
model/labels/labels.go | 24 ++++++++++++------------
model/labels/labels_common.go | 5 +++--
model/labels/labels_dedupelabels.go | 24 ++++++++++++------------
model/labels/labels_stringlabels.go | 8 ++++----
model/labels/sharding.go | 4 ++--
model/labels/sharding_dedupelabels.go | 4 ++--
model/labels/sharding_stringlabels.go | 4 ++--
7 files changed, 37 insertions(+), 36 deletions(-)
diff --git a/model/labels/labels.go b/model/labels/labels.go
index 01514abf38..cd30f4f8ff 100644
--- a/model/labels/labels.go
+++ b/model/labels/labels.go
@@ -38,10 +38,10 @@ func (ls Labels) Bytes(buf []byte) []byte {
b.WriteByte(labelSep)
for i, l := range ls {
if i > 0 {
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
}
b.WriteString(l.Name)
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
b.WriteString(l.Value)
}
return b.Bytes()
@@ -86,9 +86,9 @@ func (ls Labels) Hash() uint64 {
}
b = append(b, v.Name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, v.Value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
return xxhash.Sum64(b)
}
@@ -106,9 +106,9 @@ func (ls Labels) HashForLabels(b []byte, names ...string) (uint64, []byte) {
i++
default:
b = append(b, ls[i].Name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, ls[i].Value...)
- b = append(b, seps[0])
+ b = append(b, sep)
i++
j++
}
@@ -130,9 +130,9 @@ func (ls Labels) HashWithoutLabels(b []byte, names ...string) (uint64, []byte) {
continue
}
b = append(b, ls[i].Name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, ls[i].Value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
return xxhash.Sum64(b), b
}
@@ -151,10 +151,10 @@ func (ls Labels) BytesWithLabels(buf []byte, names ...string) []byte {
i++
default:
if b.Len() > 1 {
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
}
b.WriteString(ls[i].Name)
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
b.WriteString(ls[i].Value)
i++
j++
@@ -177,10 +177,10 @@ func (ls Labels) BytesWithoutLabels(buf []byte, names ...string) []byte {
continue
}
if b.Len() > 1 {
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
}
b.WriteString(ls[i].Name)
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
b.WriteString(ls[i].Value)
}
return b.Bytes()
diff --git a/model/labels/labels_common.go b/model/labels/labels_common.go
index 4bc94f84fe..6db86b03c7 100644
--- a/model/labels/labels_common.go
+++ b/model/labels/labels_common.go
@@ -29,10 +29,11 @@ const (
BucketLabel = "le"
InstanceName = "instance"
- labelSep = '\xfe'
+ labelSep = '\xfe' // Used at beginning of `Bytes` return.
+ sep = '\xff' // Used between labels in `Bytes` and `Hash`.
)
-var seps = []byte{'\xff'}
+var seps = []byte{sep} // Used with Hash, which has no WriteByte method.
// Label is a key/value pair of strings.
type Label struct {
diff --git a/model/labels/labels_dedupelabels.go b/model/labels/labels_dedupelabels.go
index 0e5bb048be..da8a88cc15 100644
--- a/model/labels/labels_dedupelabels.go
+++ b/model/labels/labels_dedupelabels.go
@@ -146,13 +146,13 @@ func (ls Labels) Bytes(buf []byte) []byte {
b := bytes.NewBuffer(buf[:0])
for i := 0; i < len(ls.data); {
if i > 0 {
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
}
var name, value string
name, i = decodeString(ls.syms, ls.data, i)
value, i = decodeString(ls.syms, ls.data, i)
b.WriteString(name)
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
b.WriteString(value)
}
return b.Bytes()
@@ -201,9 +201,9 @@ func (ls Labels) Hash() uint64 {
}
b = append(b, name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, value...)
- b = append(b, seps[0])
+ b = append(b, sep)
pos = newPos
}
return xxhash.Sum64(b)
@@ -226,9 +226,9 @@ func (ls Labels) HashForLabels(b []byte, names ...string) (uint64, []byte) {
}
if name == names[j] {
b = append(b, name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
}
@@ -252,9 +252,9 @@ func (ls Labels) HashWithoutLabels(b []byte, names ...string) (uint64, []byte) {
continue
}
b = append(b, name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
return xxhash.Sum64(b), b
}
@@ -275,10 +275,10 @@ func (ls Labels) BytesWithLabels(buf []byte, names ...string) []byte {
}
if lName == names[j] {
if b.Len() > 1 {
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
}
b.WriteString(lName)
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
b.WriteString(lValue)
}
pos = newPos
@@ -299,10 +299,10 @@ func (ls Labels) BytesWithoutLabels(buf []byte, names ...string) []byte {
}
if j == len(names) || lName != names[j] {
if b.Len() > 1 {
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
}
b.WriteString(lName)
- b.WriteByte(seps[0])
+ b.WriteByte(sep)
b.WriteString(lValue)
}
pos = newPos
diff --git a/model/labels/labels_stringlabels.go b/model/labels/labels_stringlabels.go
index bccceb61fe..c8bce51234 100644
--- a/model/labels/labels_stringlabels.go
+++ b/model/labels/labels_stringlabels.go
@@ -112,9 +112,9 @@ func (ls Labels) HashForLabels(b []byte, names ...string) (uint64, []byte) {
}
if name == names[j] {
b = append(b, name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
}
@@ -138,9 +138,9 @@ func (ls Labels) HashWithoutLabels(b []byte, names ...string) (uint64, []byte) {
continue
}
b = append(b, name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
return xxhash.Sum64(b), b
}
diff --git a/model/labels/sharding.go b/model/labels/sharding.go
index 5e3e89fbbb..8b3a369397 100644
--- a/model/labels/sharding.go
+++ b/model/labels/sharding.go
@@ -39,9 +39,9 @@ func StableHash(ls Labels) uint64 {
}
b = append(b, v.Name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, v.Value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
return xxhash.Sum64(b)
}
diff --git a/model/labels/sharding_dedupelabels.go b/model/labels/sharding_dedupelabels.go
index 5912724f9b..5bf41b05d6 100644
--- a/model/labels/sharding_dedupelabels.go
+++ b/model/labels/sharding_dedupelabels.go
@@ -43,9 +43,9 @@ func StableHash(ls Labels) uint64 {
}
b = append(b, name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, value...)
- b = append(b, seps[0])
+ b = append(b, sep)
pos = newPos
}
return xxhash.Sum64(b)
diff --git a/model/labels/sharding_stringlabels.go b/model/labels/sharding_stringlabels.go
index 3ad2027d8c..798f268eb9 100644
--- a/model/labels/sharding_stringlabels.go
+++ b/model/labels/sharding_stringlabels.go
@@ -43,9 +43,9 @@ func StableHash(ls Labels) uint64 {
}
b = append(b, v.Name...)
- b = append(b, seps[0])
+ b = append(b, sep)
b = append(b, v.Value...)
- b = append(b, seps[0])
+ b = append(b, sep)
}
if h != nil {
return h.Sum64()
From a25b626792d36e82fe95b05cb3e41f896c4ad2ac Mon Sep 17 00:00:00 2001
From: Arve Knudsen
Date: Mon, 3 Jun 2024 18:02:26 +0200
Subject: [PATCH 04/49] prometheusremotewrite: Support resource attribute
promotion
Signed-off-by: Arve Knudsen
---
config/config.go | 16 ++++++++
docs/configuration/configuration.md | 4 ++
.../prometheusremotewrite/helper.go | 38 +++++++++++++------
.../prometheusremotewrite/histograms.go | 2 +-
.../prometheusremotewrite/metrics_to_prw.go | 13 ++++---
.../number_data_points.go | 4 +-
storage/remote/write_handler.go | 17 ++++++---
storage/remote/write_test.go | 6 ++-
web/api/v1/api.go | 2 +-
web/api/v1/api_test.go | 1 +
10 files changed, 75 insertions(+), 28 deletions(-)
diff --git a/config/config.go b/config/config.go
index c924e30989..0880d518dc 100644
--- a/config/config.go
+++ b/config/config.go
@@ -227,6 +227,9 @@ var (
DefaultExemplarsConfig = ExemplarsConfig{
MaxExemplars: 100000,
}
+
+ // DefaultOTLPConfig is the default OTLP configuration.
+ DefaultOTLPConfig = OTLPConfig{}
)
// Config is the top-level configuration for Prometheus's config files.
@@ -242,6 +245,7 @@ type Config struct {
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
+ OTLPConfig OTLPConfig `yaml:"otlp,omitempty"`
}
// SetDirectory joins any relative file paths with dir.
@@ -1304,3 +1308,15 @@ func getGoGCEnv() int {
}
return DefaultRuntimeConfig.GoGC
}
+
+// OTLPConfig is the configuration for writing to the OTLP endpoint.
+type OTLPConfig struct {
+ PromoteResourceAttributes []string `yaml:"promote_resource_attributes,omitempty"`
+}
+
+// UnmarshalYAML implements the yaml.Unmarshaler interface.
+func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
+ *c = DefaultOTLPConfig
+ type plain OTLPConfig
+ return unmarshal((*plain)(c))
+}
diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md
index 35976871b9..5675210b7c 100644
--- a/docs/configuration/configuration.md
+++ b/docs/configuration/configuration.md
@@ -152,6 +152,10 @@ alerting:
remote_write:
[ - ... ]
+# Settings related to the OTLP receiver feature.
+otlp:
+ [ promote_resource_attributes: [, ...] | default = [ ] ]
+
# Settings related to the remote read feature.
remote_read:
[ - ... ]
diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go
index 2571338532..f2d7ecd4e3 100644
--- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go
+++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go
@@ -65,14 +65,14 @@ type bucketBoundsData struct {
bound float64
}
-// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds
+// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds.
type byBucketBoundsData []bucketBoundsData
func (m byBucketBoundsData) Len() int { return len(m) }
func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound }
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
-// ByLabelName enables the usage of sort.Sort() with a slice of labels
+// ByLabelName enables the usage of sort.Sort() with a slice of labels.
type ByLabelName []prompb.Label
func (a ByLabelName) Len() int { return len(a) }
@@ -115,14 +115,23 @@ var seps = []byte{'\xff'}
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
-func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
+// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
+func createAttributes(resource pcommon.Resource, attributes pcommon.Map, settings Settings,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)
+ promotedAttrs := make([]prompb.Label, 0, len(settings.PromoteResourceAttributes))
+ for _, name := range settings.PromoteResourceAttributes {
+ if value, exists := resourceAttrs.Get(name); exists {
+ promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()})
+ }
+ }
+ sort.Stable(ByLabelName(promotedAttrs))
+
// Calculate the maximum possible number of labels we could return so we can preallocate l
- maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2
+ maxLabelCount := attributes.Len() + len(settings.ExternalLabels) + len(promotedAttrs) + len(extras)/2
if haveServiceName {
maxLabelCount++
@@ -132,9 +141,6 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
maxLabelCount++
}
- // map ensures no duplicate label name
- l := make(map[string]string, maxLabelCount)
-
// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount)
@@ -148,6 +154,8 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
})
sort.Stable(ByLabelName(labels))
+ // map ensures no duplicate label names.
+ l := make(map[string]string, maxLabelCount)
for _, label := range labels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists {
@@ -157,6 +165,13 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
}
}
+ for _, lbl := range promotedAttrs {
+ normalized := prometheustranslator.NormalizeLabel(lbl.Name)
+ if _, exists := l[normalized]; !exists {
+ l[normalized] = lbl.Value
+ }
+ }
+
// Map service.name + service.namespace to job
if haveServiceName {
val := serviceName.AsString()
@@ -169,7 +184,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
if haveInstanceID {
l[model.InstanceLabel] = instance.AsString()
}
- for key, value := range externalLabels {
+ for key, value := range settings.ExternalLabels {
// External labels have already been sanitized
if _, alreadyExists := l[key]; alreadyExists {
// Skip external labels if they are overridden by metric attributes
@@ -232,7 +247,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
- baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
+ baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
// If the sum is unset, it indicates the _sum metric point should be
// omitted
@@ -408,7 +423,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
- baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
+ baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
@@ -554,7 +569,8 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name
}
- labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
+ settings.PromoteResourceAttributes = nil
+ labels := createAttributes(resource, attributes, settings, identifyingAttrs, false, model.MetricNameLabel, name)
haveIdentifier := false
for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {
diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go
index 21b3f5dd9f..73528019d8 100644
--- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go
+++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go
@@ -45,7 +45,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr
lbls := createAttributes(
resource,
pt.Attributes(),
- settings.ExternalLabels,
+ settings,
nil,
true,
model.MetricNameLabel,
diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go
index 65dac99c50..a3a7897232 100644
--- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go
+++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go
@@ -30,12 +30,13 @@ import (
)
type Settings struct {
- Namespace string
- ExternalLabels map[string]string
- DisableTargetInfo bool
- ExportCreatedMetric bool
- AddMetricSuffixes bool
- SendMetadata bool
+ Namespace string
+ ExternalLabels map[string]string
+ DisableTargetInfo bool
+ ExportCreatedMetric bool
+ AddMetricSuffixes bool
+ SendMetadata bool
+ PromoteResourceAttributes []string
}
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go
index aafebc6c46..80ccb46c75 100644
--- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go
+++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go
@@ -34,7 +34,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number
labels := createAttributes(
resource,
pt.Attributes(),
- settings.ExternalLabels,
+ settings,
nil,
true,
model.MetricNameLabel,
@@ -64,7 +64,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa
lbls := createAttributes(
resource,
pt.Attributes(),
- settings.ExternalLabels,
+ settings,
nil,
true,
model.MetricNameLabel,
diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go
index d822373717..b695049492 100644
--- a/storage/remote/write_handler.go
+++ b/storage/remote/write_handler.go
@@ -490,21 +490,23 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
-func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
+func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {
rwHandler := &writeHandler{
logger: logger,
appendable: appendable,
}
return &otlpWriteHandler{
- logger: logger,
- rwHandler: rwHandler,
+ logger: logger,
+ rwHandler: rwHandler,
+ configFunc: configFunc,
}
}
type otlpWriteHandler struct {
- logger log.Logger
- rwHandler *writeHandler
+ logger log.Logger
+ rwHandler *writeHandler
+ configFunc func() config.Config
}
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -515,9 +517,12 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
+ otlpCfg := h.configFunc().OTLPConfig
+
converter := otlptranslator.NewPrometheusConverter()
if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{
- AddMetricSuffixes: true,
+ AddMetricSuffixes: true,
+ PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
}); err != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
}
diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go
index 6e7422a585..83dfffbaef 100644
--- a/storage/remote/write_test.go
+++ b/storage/remote/write_test.go
@@ -379,7 +379,11 @@ func TestOTLPWriteHandler(t *testing.T) {
req.Header.Set("Content-Type", "application/x-protobuf")
appendable := &mockAppendable{}
- handler := NewOTLPWriteHandler(nil, appendable)
+ handler := NewOTLPWriteHandler(nil, appendable, func() config.Config {
+ return config.Config{
+ OTLPConfig: config.DefaultOTLPConfig,
+ }
+ })
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
diff --git a/web/api/v1/api.go b/web/api/v1/api.go
index 03854787f8..d58be211f2 100644
--- a/web/api/v1/api.go
+++ b/web/api/v1/api.go
@@ -295,7 +295,7 @@ func NewAPI(
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs)
}
if otlpEnabled {
- a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)
+ a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc)
}
return a
diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go
index 86a57ca088..ba38ddc978 100644
--- a/web/api/v1/api_test.go
+++ b/web/api/v1/api_test.go
@@ -359,6 +359,7 @@ var samplePrometheusCfg = config.Config{
ScrapeConfigs: []*config.ScrapeConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{},
RemoteReadConfigs: []*config.RemoteReadConfig{},
+ OTLPConfig: config.OTLPConfig{},
}
var sampleFlagMap = map[string]string{
From ae82a0a9413eb0a27205428b3348ffe0743e5351 Mon Sep 17 00:00:00 2001
From: Arve Knudsen
Date: Tue, 16 Jul 2024 14:32:24 +0200
Subject: [PATCH 05/49] Sanitize configured OTel resource attributes
Signed-off-by: Arve Knudsen
---
config/config.go | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git a/config/config.go b/config/config.go
index 0880d518dc..fd2e6e06ca 100644
--- a/config/config.go
+++ b/config/config.go
@@ -19,6 +19,7 @@ import (
"net/url"
"os"
"path/filepath"
+ "slices"
"sort"
"strconv"
"strings"
@@ -1318,5 +1319,22 @@ type OTLPConfig struct {
func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultOTLPConfig
type plain OTLPConfig
- return unmarshal((*plain)(c))
+ if err := unmarshal((*plain)(c)); err != nil {
+ return err
+ }
+
+ seen := map[string]struct{}{}
+ i := 0
+ for i < len(c.PromoteResourceAttributes) {
+ s := strings.TrimSpace(c.PromoteResourceAttributes[i])
+ if _, exists := seen[s]; exists {
+ c.PromoteResourceAttributes = slices.Delete(c.PromoteResourceAttributes, i, i+1)
+ continue
+ }
+
+ seen[s] = struct{}{}
+ c.PromoteResourceAttributes[i] = s
+ i++
+ }
+ return nil
}
From ec818332dcc7a381add078d091bc3968c0d91d3f Mon Sep 17 00:00:00 2001
From: Arve Knudsen
Date: Tue, 16 Jul 2024 14:49:04 +0200
Subject: [PATCH 06/49] Add config tests
Signed-off-by: Arve Knudsen
---
config/config_test.go | 18 ++++++++++++++++++
config/testdata/conf.good.yml | 3 +++
.../otlp_sanitize_resource_attributes.good.yml | 2 ++
3 files changed, 23 insertions(+)
create mode 100644 config/testdata/otlp_sanitize_resource_attributes.good.yml
diff --git a/config/config_test.go b/config/config_test.go
index 3c4907a46c..5822d2ceb0 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -156,6 +156,12 @@ var expectedConf = &Config{
},
},
+ OTLPConfig: OTLPConfig{
+ PromoteResourceAttributes: []string{
+ "k8s.cluster.name", "k8s.job.name", "k8s.namespace.name",
+ },
+ },
+
RemoteReadConfigs: []*RemoteReadConfig{
{
URL: mustParseURL("http://remote1/read"),
@@ -1471,6 +1477,18 @@ func TestRemoteWriteRetryOnRateLimit(t *testing.T) {
require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit)
}
+func TestOTLPSanitizeResourceAttributes(t *testing.T) {
+ want, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.good.yml"), false, false, log.NewNopLogger())
+ require.NoError(t, err)
+
+ out, err := yaml.Marshal(want)
+ require.NoError(t, err)
+ var got Config
+ require.NoError(t, yaml.UnmarshalStrict(out, &got))
+
+ require.Equal(t, []string{"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"}, got.OTLPConfig.PromoteResourceAttributes)
+}
+
func TestLoadConfig(t *testing.T) {
// Parse a valid file that sets a global scrape timeout. This tests whether parsing
// an overwritten default field in the global config permanently changes the default.
diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml
index 0e0aa2bd5d..56741822c2 100644
--- a/config/testdata/conf.good.yml
+++ b/config/testdata/conf.good.yml
@@ -45,6 +45,9 @@ remote_write:
headers:
name: value
+otlp:
+ promote_resource_attributes: ["k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"]
+
remote_read:
- url: http://remote1/read
read_recent: true
diff --git a/config/testdata/otlp_sanitize_resource_attributes.good.yml b/config/testdata/otlp_sanitize_resource_attributes.good.yml
new file mode 100644
index 0000000000..ce91302fe0
--- /dev/null
+++ b/config/testdata/otlp_sanitize_resource_attributes.good.yml
@@ -0,0 +1,2 @@
+otlp:
+ promote_resource_attributes: ["k8s.cluster.name", "k8s.job.name", "k8s.namespace.name", " k8s.job.name "]
From 6a9df95620bf882d206be7c771ad65042400f6ca Mon Sep 17 00:00:00 2001
From: Arve Knudsen
Date: Tue, 16 Jul 2024 15:13:22 +0200
Subject: [PATCH 07/49] Add to changelog
Signed-off-by: Arve Knudsen
---
CHANGELOG.md | 2 ++
1 file changed, 2 insertions(+)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d5a91e9009..8488af6dc7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,8 @@
## unreleased
+* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
+
## 2.53.1 / 2024-07-10
Fix a bug which would drop samples in remote-write if the sending flow stalled
From 1d21867d8b3d23863f78799cf60b6e3598ab414c Mon Sep 17 00:00:00 2001
From: Arve Knudsen
Date: Tue, 16 Jul 2024 15:13:40 +0200
Subject: [PATCH 08/49] Add otlptranslator tests
Signed-off-by: Arve Knudsen
---
.../prometheusremotewrite/helper_test.go | 161 ++++++++++++++++++
1 file changed, 161 insertions(+)
create mode 100644 storage/remote/otlptranslator/prometheusremotewrite/helper_test.go
diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go
new file mode 100644
index 0000000000..c4dd781ae6
--- /dev/null
+++ b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go
@@ -0,0 +1,161 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package prometheusremotewrite
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.opentelemetry.io/collector/pdata/pcommon"
+
+ "github.com/prometheus/prometheus/prompb"
+)
+
+func TestCreateAttributes(t *testing.T) {
+ resourceAttrs := map[string]string{
+ "service.name": "service name",
+ "service.instance.id": "service ID",
+ "existent-attr": "resource value",
+ // This one is for testing conflict with metric attribute.
+ "metric-attr": "resource value",
+ // This one is for testing conflict with auto-generated job attribute.
+ "job": "resource value",
+ // This one is for testing conflict with auto-generated instance attribute.
+ "instance": "resource value",
+ }
+
+ resource := pcommon.NewResource()
+ for k, v := range resourceAttrs {
+ resource.Attributes().PutStr(k, v)
+ }
+ attrs := pcommon.NewMap()
+ attrs.PutStr("__name__", "test_metric")
+ attrs.PutStr("metric-attr", "metric value")
+
+ testCases := []struct {
+ name string
+ promoteResourceAttributes []string
+ expectedLabels []prompb.Label
+ }{
+ {
+ name: "Successful conversion without resource attribute promotion",
+ promoteResourceAttributes: nil,
+ expectedLabels: []prompb.Label{
+ {
+ Name: "__name__",
+ Value: "test_metric",
+ },
+ {
+ Name: "instance",
+ Value: "service ID",
+ },
+ {
+ Name: "job",
+ Value: "service name",
+ },
+ {
+ Name: "metric_attr",
+ Value: "metric value",
+ },
+ },
+ },
+ {
+ name: "Successful conversion with resource attribute promotion",
+ promoteResourceAttributes: []string{"non-existent-attr", "existent-attr"},
+ expectedLabels: []prompb.Label{
+ {
+ Name: "__name__",
+ Value: "test_metric",
+ },
+ {
+ Name: "instance",
+ Value: "service ID",
+ },
+ {
+ Name: "job",
+ Value: "service name",
+ },
+ {
+ Name: "metric_attr",
+ Value: "metric value",
+ },
+ {
+ Name: "existent_attr",
+ Value: "resource value",
+ },
+ },
+ },
+ {
+ name: "Successful conversion with resource attribute promotion, conflicting resource attributes are ignored",
+ promoteResourceAttributes: []string{"non-existent-attr", "existent-attr", "metric-attr", "job", "instance"},
+ expectedLabels: []prompb.Label{
+ {
+ Name: "__name__",
+ Value: "test_metric",
+ },
+ {
+ Name: "instance",
+ Value: "service ID",
+ },
+ {
+ Name: "job",
+ Value: "service name",
+ },
+ {
+ Name: "existent_attr",
+ Value: "resource value",
+ },
+ {
+ Name: "metric_attr",
+ Value: "metric value",
+ },
+ },
+ },
+ {
+ name: "Successful conversion with resource attribute promotion, attributes are only promoted once",
+ promoteResourceAttributes: []string{"existent-attr", "existent-attr"},
+ expectedLabels: []prompb.Label{
+ {
+ Name: "__name__",
+ Value: "test_metric",
+ },
+ {
+ Name: "instance",
+ Value: "service ID",
+ },
+ {
+ Name: "job",
+ Value: "service name",
+ },
+ {
+ Name: "existent_attr",
+ Value: "resource value",
+ },
+ {
+ Name: "metric_attr",
+ Value: "metric value",
+ },
+ },
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ settings := Settings{
+ PromoteResourceAttributes: tc.promoteResourceAttributes,
+ }
+ lbls := createAttributes(resource, attrs, settings, nil, false)
+
+ assert.ElementsMatch(t, lbls, tc.expectedLabels)
+ })
+ }
+}
From 932918cd3fe003d0f4fd0fe153b610a068fe9fea Mon Sep 17 00:00:00 2001
From: Arve Knudsen
Date: Thu, 18 Jul 2024 10:40:47 +0200
Subject: [PATCH 09/49] OTLPConfig.UnmarshalYAML: Return error on invalid input
Signed-off-by: Arve Knudsen
---
config/config.go | 22 ++++++++++---------
config/config_test.go | 22 +++++++++++++------
.../otlp_sanitize_resource_attributes.bad.yml | 2 ++
...otlp_sanitize_resource_attributes.good.yml | 2 +-
4 files changed, 30 insertions(+), 18 deletions(-)
create mode 100644 config/testdata/otlp_sanitize_resource_attributes.bad.yml
diff --git a/config/config.go b/config/config.go
index fd2e6e06ca..9139838813 100644
--- a/config/config.go
+++ b/config/config.go
@@ -19,7 +19,6 @@ import (
"net/url"
"os"
"path/filepath"
- "slices"
"sort"
"strconv"
"strings"
@@ -1324,17 +1323,20 @@ func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
}
seen := map[string]struct{}{}
- i := 0
- for i < len(c.PromoteResourceAttributes) {
- s := strings.TrimSpace(c.PromoteResourceAttributes[i])
- if _, exists := seen[s]; exists {
- c.PromoteResourceAttributes = slices.Delete(c.PromoteResourceAttributes, i, i+1)
+ var err error
+ for i, attr := range c.PromoteResourceAttributes {
+ attr = strings.TrimSpace(attr)
+ if attr == "" {
+ err = errors.Join(err, fmt.Errorf("empty promoted OTel resource attribute"))
+ continue
+ }
+ if _, exists := seen[attr]; exists {
+ err = errors.Join(err, fmt.Errorf("duplicated promoted OTel resource attribute %q", attr))
continue
}
- seen[s] = struct{}{}
- c.PromoteResourceAttributes[i] = s
- i++
+ seen[attr] = struct{}{}
+ c.PromoteResourceAttributes[i] = attr
}
- return nil
+ return err
}
diff --git a/config/config_test.go b/config/config_test.go
index 5822d2ceb0..b684fdb50c 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -1478,15 +1478,23 @@ func TestRemoteWriteRetryOnRateLimit(t *testing.T) {
}
func TestOTLPSanitizeResourceAttributes(t *testing.T) {
- want, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.good.yml"), false, false, log.NewNopLogger())
- require.NoError(t, err)
+ t.Run("good config", func(t *testing.T) {
+ want, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.good.yml"), false, false, log.NewNopLogger())
+ require.NoError(t, err)
- out, err := yaml.Marshal(want)
- require.NoError(t, err)
- var got Config
- require.NoError(t, yaml.UnmarshalStrict(out, &got))
+ out, err := yaml.Marshal(want)
+ require.NoError(t, err)
+ var got Config
+ require.NoError(t, yaml.UnmarshalStrict(out, &got))
- require.Equal(t, []string{"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"}, got.OTLPConfig.PromoteResourceAttributes)
+ require.Equal(t, []string{"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"}, got.OTLPConfig.PromoteResourceAttributes)
+ })
+
+ t.Run("bad config", func(t *testing.T) {
+ _, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.bad.yml"), false, false, log.NewNopLogger())
+ require.ErrorContains(t, err, `duplicated promoted OTel resource attribute "k8s.job.name"`)
+ require.ErrorContains(t, err, `empty promoted OTel resource attribute`)
+ })
}
func TestLoadConfig(t *testing.T) {
diff --git a/config/testdata/otlp_sanitize_resource_attributes.bad.yml b/config/testdata/otlp_sanitize_resource_attributes.bad.yml
new file mode 100644
index 0000000000..37ec5d1209
--- /dev/null
+++ b/config/testdata/otlp_sanitize_resource_attributes.bad.yml
@@ -0,0 +1,2 @@
+otlp:
+ promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name", "k8s.job.name", ""]
diff --git a/config/testdata/otlp_sanitize_resource_attributes.good.yml b/config/testdata/otlp_sanitize_resource_attributes.good.yml
index ce91302fe0..67247e7743 100644
--- a/config/testdata/otlp_sanitize_resource_attributes.good.yml
+++ b/config/testdata/otlp_sanitize_resource_attributes.good.yml
@@ -1,2 +1,2 @@
otlp:
- promote_resource_attributes: ["k8s.cluster.name", "k8s.job.name", "k8s.namespace.name", " k8s.job.name "]
+ promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name"]
From 465891cc5686f207a27e7cd1f9718062a5a83aba Mon Sep 17 00:00:00 2001
From: gotjosh
Date: Mon, 22 Jul 2024 14:11:18 +0100
Subject: [PATCH 10/49] Rules: Refactor concurrency controller interface
(#14491)
* Rules: Refactor concurrency controller interface
Even though the main purpose of this refactor is to modify the interface of the concurrency controller to accept a Context. I did two drive-by modifications that I think are sensible:
1. I have moved the check for dependencies on rules to the controller itself - this aligns with how the controller should behave as it is a deciding factor on wether we should run concurrently or not.
2. I cleaned up some unused methods from the days of the old interface before #13527 changed it.
Signed-off-by: gotjosh
---------
Signed-off-by: gotjosh
---
rules/group.go | 10 ++-------
rules/manager.go | 56 ++++++++++++++++--------------------------------
2 files changed, 20 insertions(+), 46 deletions(-)
diff --git a/rules/group.go b/rules/group.go
index 0bc219a11b..201d3a67d7 100644
--- a/rules/group.go
+++ b/rules/group.go
@@ -621,14 +621,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}
- // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
- // Try run concurrently if there are slots available.
- if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() {
+ if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) {
wg.Add(1)
go eval(i, rule, func() {
wg.Done()
- ctrl.Done()
+ ctrl.Done(ctx)
})
} else {
eval(i, rule, nil)
@@ -1094,7 +1092,3 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return dependencies
}
-
-func isRuleEligibleForConcurrentExecution(rule Rule) bool {
- return rule.NoDependentRules() && rule.NoDependencyRules()
-}
diff --git a/rules/manager.go b/rules/manager.go
index ab33c3c7d8..9e5b33fbc9 100644
--- a/rules/manager.go
+++ b/rules/manager.go
@@ -457,67 +457,47 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) {
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
- // Allow determines whether any concurrent evaluation slots are available.
- // If Allow() returns true, then Done() must be called to release the acquired slot.
- Allow() bool
+ // Allow determines if the given rule is allowed to be evaluated concurrently.
+ // If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done.
+ // It is important that both *Group and Rule are not retained and only be used for the duration of the call.
+ Allow(ctx context.Context, group *Group, rule Rule) bool
// Done releases a concurrent evaluation slot.
- Done()
+ Done(ctx context.Context)
}
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct {
- sema *semaphore.Weighted
- depMapsMu sync.Mutex
- depMaps map[*Group]dependencyMap
+ sema *semaphore.Weighted
}
func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
return &concurrentRuleEvalController{
- sema: semaphore.NewWeighted(maxConcurrency),
- depMaps: map[*Group]dependencyMap{},
+ sema: semaphore.NewWeighted(maxConcurrency),
}
}
-func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
- c.depMapsMu.Lock()
- defer c.depMapsMu.Unlock()
-
- depMap, found := c.depMaps[g]
- if !found {
- depMap = buildDependencyMap(g.rules)
- c.depMaps[g] = depMap
+func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool {
+ // To allow a rule to be executed concurrently, we need 3 conditions:
+ // 1. The rule must not have any rules that depend on it.
+ // 2. The rule itself must not depend on any other rules.
+ // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot.
+ if rule.NoDependentRules() && rule.NoDependencyRules() {
+ return c.sema.TryAcquire(1)
}
- return depMap.isIndependent(r)
+ return false
}
-func (c *concurrentRuleEvalController) Allow() bool {
- return c.sema.TryAcquire(1)
-}
-
-func (c *concurrentRuleEvalController) Done() {
+func (c *concurrentRuleEvalController) Done(_ context.Context) {
c.sema.Release(1)
}
-func (c *concurrentRuleEvalController) Invalidate() {
- c.depMapsMu.Lock()
- defer c.depMapsMu.Unlock()
-
- // Clear out the memoized dependency maps because some or all groups may have been updated.
- c.depMaps = map[*Group]dependencyMap{}
-}
-
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
type sequentialRuleEvalController struct{}
-func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool {
+func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) bool {
return false
}
-func (c sequentialRuleEvalController) Allow() bool {
- return false
-}
-
-func (c sequentialRuleEvalController) Done() {}
-func (c sequentialRuleEvalController) Invalidate() {}
+func (c sequentialRuleEvalController) Done(_ context.Context) {}
From 23307b02c58adc56695844b9a851578fb6c26677 Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 22 Jul 2024 14:33:59 +0100
Subject: [PATCH 11/49] [TESTS] Storage: Improve MergeQuerier tests
`TestMergeQuerierWithSecondaries_ErrorHandling` now tests `NewMergeQuerier`
rather than creating the data structure directly. This means we now test
short-circuiting when only a single querier is required.
Merge `mockGenericQuerier` into `mockQuerier`.
Replace `unwrapMockGenericQuerier` with a visitor pattern.
No change in functionality intended.
Signed-off-by: Bryan Boreham
---
storage/merge_test.go | 284 +++++++++++++++++++++---------------------
1 file changed, 139 insertions(+), 145 deletions(-)
diff --git a/storage/merge_test.go b/storage/merge_test.go
index 7619af3c1f..a3cba9bb91 100644
--- a/storage/merge_test.go
+++ b/storage/merge_test.go
@@ -912,9 +912,23 @@ func TestConcatenatingChunkIterator(t *testing.T) {
}
type mockQuerier struct {
- LabelQuerier
+ mtx sync.Mutex
- toReturn []Series
+ toReturn []Series // Response for Select.
+
+ closed bool
+ labelNamesCalls int
+ labelNamesRequested []labelNameRequest
+ sortedSeriesRequested []bool
+
+ resp []string // Response for LabelNames and LabelValues; turned into Select response if toReturn is not supplied.
+ warnings annotations.Annotations
+ err error
+}
+
+type labelNameRequest struct {
+ name string
+ matchers []*labels.Matcher
}
type seriesByLabel []Series
@@ -924,13 +938,47 @@ func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
func (m *mockQuerier) Select(_ context.Context, sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet {
- cpy := make([]Series, len(m.toReturn))
- copy(cpy, m.toReturn)
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+ m.sortedSeriesRequested = append(m.sortedSeriesRequested, sortSeries)
+
+ var ret []Series
+ if len(m.toReturn) > 0 {
+ ret = make([]Series, len(m.toReturn))
+ copy(ret, m.toReturn)
+ } else if len(m.resp) > 0 {
+ ret = make([]Series, 0, len(m.resp))
+ for _, l := range m.resp {
+ ret = append(ret, NewListSeries(labels.FromStrings("test", string(l)), nil))
+ }
+ }
if sortSeries {
- sort.Sort(seriesByLabel(cpy))
+ sort.Sort(seriesByLabel(ret))
}
- return NewMockSeriesSet(cpy...)
+ return &mockSeriesSet{idx: -1, series: ret, warnings: m.warnings, err: m.err}
+}
+
+func (m *mockQuerier) LabelValues(_ context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
+ m.mtx.Lock()
+ m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
+ name: name,
+ matchers: matchers,
+ })
+ m.mtx.Unlock()
+ return m.resp, m.warnings, m.err
+}
+
+func (m *mockQuerier) LabelNames(context.Context, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
+ m.mtx.Lock()
+ m.labelNamesCalls++
+ m.mtx.Unlock()
+ return m.resp, m.warnings, m.err
+}
+
+func (m *mockQuerier) Close() error {
+ m.closed = true
+ return nil
}
type mockChunkQuerier struct {
@@ -960,6 +1008,9 @@ func (m *mockChunkQuerier) Select(_ context.Context, sortSeries bool, _ *SelectH
type mockSeriesSet struct {
idx int
series []Series
+
+ warnings annotations.Annotations
+ err error
}
func NewMockSeriesSet(series ...Series) SeriesSet {
@@ -970,15 +1021,18 @@ func NewMockSeriesSet(series ...Series) SeriesSet {
}
func (m *mockSeriesSet) Next() bool {
+ if m.err != nil {
+ return false
+ }
m.idx++
return m.idx < len(m.series)
}
func (m *mockSeriesSet) At() Series { return m.series[m.idx] }
-func (m *mockSeriesSet) Err() error { return nil }
+func (m *mockSeriesSet) Err() error { return m.err }
-func (m *mockSeriesSet) Warnings() annotations.Annotations { return nil }
+func (m *mockSeriesSet) Warnings() annotations.Annotations { return m.warnings }
type mockChunkSeriesSet struct {
idx int
@@ -1336,105 +1390,44 @@ func BenchmarkMergeSeriesSet(b *testing.B) {
}
}
-type mockGenericQuerier struct {
- mtx sync.Mutex
-
- closed bool
- labelNamesCalls int
- labelNamesRequested []labelNameRequest
- sortedSeriesRequested []bool
-
- resp []string
- warnings annotations.Annotations
- err error
-}
-
-type labelNameRequest struct {
- name string
- matchers []*labels.Matcher
-}
-
-func (m *mockGenericQuerier) Select(_ context.Context, b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet {
- m.mtx.Lock()
- m.sortedSeriesRequested = append(m.sortedSeriesRequested, b)
- m.mtx.Unlock()
- return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err}
-}
-
-func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
- m.mtx.Lock()
- m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
- name: name,
- matchers: matchers,
- })
- m.mtx.Unlock()
- return m.resp, m.warnings, m.err
-}
-
-func (m *mockGenericQuerier) LabelNames(context.Context, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
- m.mtx.Lock()
- m.labelNamesCalls++
- m.mtx.Unlock()
- return m.resp, m.warnings, m.err
-}
-
-func (m *mockGenericQuerier) Close() error {
- m.closed = true
- return nil
-}
-
-type mockGenericSeriesSet struct {
- resp []string
- warnings annotations.Annotations
- err error
-
- curr int
-}
-
-func (m *mockGenericSeriesSet) Next() bool {
- if m.err != nil {
- return false
+func visitMockQueriers(t *testing.T, qr Querier, f func(t *testing.T, q *mockQuerier)) int {
+ count := 0
+ switch x := qr.(type) {
+ case *mockQuerier:
+ count++
+ f(t, x)
+ case *querierAdapter:
+ count += visitMockQueriersInGenericQuerier(t, x.genericQuerier, f)
}
- if m.curr >= len(m.resp) {
- return false
+ return count
+}
+
+func visitMockQueriersInGenericQuerier(t *testing.T, g genericQuerier, f func(t *testing.T, q *mockQuerier)) int {
+ count := 0
+ switch x := g.(type) {
+ case *mergeGenericQuerier:
+ for _, q := range x.queriers {
+ count += visitMockQueriersInGenericQuerier(t, q, f)
+ }
+ case *genericQuerierAdapter:
+ // Visitor for chunkQuerier not implemented.
+ count += visitMockQueriers(t, x.q, f)
+ case *secondaryQuerier:
+ count += visitMockQueriersInGenericQuerier(t, x.genericQuerier, f)
}
- m.curr++
- return true
+ return count
}
-func (m *mockGenericSeriesSet) Err() error { return m.err }
-func (m *mockGenericSeriesSet) Warnings() annotations.Annotations { return m.warnings }
-
-func (m *mockGenericSeriesSet) At() Labels {
- return mockLabels(m.resp[m.curr-1])
-}
-
-type mockLabels string
-
-func (l mockLabels) Labels() labels.Labels {
- return labels.FromStrings("test", string(l))
-}
-
-func unwrapMockGenericQuerier(t *testing.T, qr genericQuerier) *mockGenericQuerier {
- m, ok := qr.(*mockGenericQuerier)
- if !ok {
- s, ok := qr.(*secondaryQuerier)
- require.True(t, ok, "expected secondaryQuerier got something else")
- m, ok = s.genericQuerier.(*mockGenericQuerier)
- require.True(t, ok, "expected mockGenericQuerier got something else")
- }
- return m
-}
-
-func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
+func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
var (
errStorage = errors.New("storage error")
warnStorage = errors.New("storage warning")
ctx = context.Background()
)
for _, tcase := range []struct {
- name string
- queriers []genericQuerier
+ name string
+ primaries []Querier
+ secondaries []Querier
expectedSelectsSeries []labels.Labels
expectedLabels []string
@@ -1443,10 +1436,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedErrs [4]error
}{
{
- // NewMergeQuerier will not create a mergeGenericQuerier
- // with just one querier inside, but we can test it anyway.
- name: "one successful primary querier",
- queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
+ name: "one successful primary querier",
+ primaries: []Querier{&mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
@@ -1455,9 +1446,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "multiple successful primary queriers",
- queriers: []genericQuerier{
- &mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
- &mockGenericQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil},
+ primaries: []Querier{
+ &mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
+ &mockQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@@ -1468,15 +1459,17 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "one failed primary querier",
- queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
+ primaries: []Querier{&mockQuerier{warnings: nil, err: errStorage}},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
- queriers: []genericQuerier{
- &mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
+ primaries: []Querier{
+ &mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
+ },
+ secondaries: []Querier{
+ &mockQuerier{resp: []string{"b"}, warnings: nil, err: nil},
+ &mockQuerier{resp: []string{"c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@@ -1487,10 +1480,12 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "one successful primary querier with empty response and successful secondaries",
- queriers: []genericQuerier{
- &mockGenericQuerier{resp: []string{}, warnings: nil, err: nil},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
+ primaries: []Querier{
+ &mockQuerier{resp: []string{}, warnings: nil, err: nil},
+ },
+ secondaries: []Querier{
+ &mockQuerier{resp: []string{"b"}, warnings: nil, err: nil},
+ &mockQuerier{resp: []string{"c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "b"),
@@ -1500,19 +1495,23 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "one failed primary querier with successful secondaries",
- queriers: []genericQuerier{
- &mockGenericQuerier{warnings: nil, err: errStorage},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
+ primaries: []Querier{
+ &mockQuerier{warnings: nil, err: errStorage},
+ },
+ secondaries: []Querier{
+ &mockQuerier{resp: []string{"b"}, warnings: nil, err: nil},
+ &mockQuerier{resp: []string{"c"}, warnings: nil, err: nil},
},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with failed secondaries",
- queriers: []genericQuerier{
- &mockGenericQuerier{resp: []string{"a"}, warnings: nil, err: nil},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: errStorage}},
+ primaries: []Querier{
+ &mockQuerier{resp: []string{"a"}, warnings: nil, err: nil},
+ },
+ secondaries: []Querier{
+ &mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage},
+ &mockQuerier{resp: []string{"c"}, warnings: nil, err: errStorage},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@@ -1522,9 +1521,11 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "successful queriers with warnings",
- queriers: []genericQuerier{
- &mockGenericQuerier{resp: []string{"a"}, warnings: annotations.New().Add(warnStorage), err: nil},
- &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: annotations.New().Add(warnStorage), err: nil}},
+ primaries: []Querier{
+ &mockQuerier{resp: []string{"a"}, warnings: annotations.New().Add(warnStorage), err: nil},
+ },
+ secondaries: []Querier{
+ &mockQuerier{resp: []string{"b"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@@ -1535,10 +1536,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
} {
t.Run(tcase.name, func(t *testing.T) {
- q := &mergeGenericQuerier{
- queriers: tcase.queriers,
- mergeFn: func(l ...Labels) Labels { return l[0] },
- }
+ q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] })
t.Run("Select", func(t *testing.T) {
res := q.Select(context.Background(), false, nil)
@@ -1551,11 +1549,13 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
require.ErrorIs(t, res.Err(), tcase.expectedErrs[0], "expected error doesn't match")
require.Equal(t, tcase.expectedSelectsSeries, lbls)
- for _, qr := range q.queriers {
- m := unwrapMockGenericQuerier(t, qr)
- // mergeGenericQuerier forces all Selects to be sorted.
- require.Equal(t, []bool{true}, m.sortedSeriesRequested)
- }
+ n := visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
+ // Single queries should be unsorted; merged queries sorted.
+ exp := len(tcase.primaries)+len(tcase.secondaries) > 1
+ require.Equal(t, []bool{exp}, m.sortedSeriesRequested)
+ })
+ // Check we visited all queriers.
+ require.Equal(t, len(tcase.primaries)+len(tcase.secondaries), n)
})
t.Run("LabelNames", func(t *testing.T) {
res, w, err := q.LabelNames(ctx, nil)
@@ -1566,11 +1566,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
if err != nil {
return
}
- for _, qr := range q.queriers {
- m := unwrapMockGenericQuerier(t, qr)
-
+ visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
require.Equal(t, 1, m.labelNamesCalls)
- }
+ })
})
t.Run("LabelValues", func(t *testing.T) {
res, w, err := q.LabelValues(ctx, "test", nil)
@@ -1581,11 +1579,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
if err != nil {
return
}
- for _, qr := range q.queriers {
- m := unwrapMockGenericQuerier(t, qr)
-
+ visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested)
- }
+ })
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
@@ -1597,14 +1593,12 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
if err != nil {
return
}
- for _, qr := range q.queriers {
- m := unwrapMockGenericQuerier(t, qr)
-
+ visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2", matchers: []*labels.Matcher{matcher}},
}, m.labelNamesRequested)
- }
+ })
})
})
}
From 677cdcdcecc3826390461b9574b882ebb9a42143 Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 22 Jul 2024 15:01:00 +0100
Subject: [PATCH 12/49] [TEST] Storage: ignore difference between nil and empty
We need this for subsequent changes.
Signed-off-by: Bryan Boreham
---
storage/merge_test.go | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git a/storage/merge_test.go b/storage/merge_test.go
index a3cba9bb91..dae338a004 100644
--- a/storage/merge_test.go
+++ b/storage/merge_test.go
@@ -1561,7 +1561,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
res, w, err := q.LabelNames(ctx, nil)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[1], "expected error doesn't match")
- require.Equal(t, tcase.expectedLabels, res)
+ requireEqualSlice(t, tcase.expectedLabels, res)
if err != nil {
return
@@ -1574,7 +1574,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
res, w, err := q.LabelValues(ctx, "test", nil)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[2], "expected error doesn't match")
- require.Equal(t, tcase.expectedLabels, res)
+ requireEqualSlice(t, tcase.expectedLabels, res)
if err != nil {
return
@@ -1588,7 +1588,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
res, w, err := q.LabelValues(ctx, "test2", nil, matcher)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
- require.Equal(t, tcase.expectedLabels, res)
+ requireEqualSlice(t, tcase.expectedLabels, res)
if err != nil {
return
@@ -1604,6 +1604,15 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
}
}
+// Check slice but ignore difference between nil and empty.
+func requireEqualSlice[T any](t require.TestingT, a, b []T, msgAndArgs ...interface{}) {
+ if len(a) == 0 {
+ require.Empty(t, b, msgAndArgs...)
+ } else {
+ require.Equal(t, a, b, msgAndArgs...)
+ }
+}
+
type errIterator struct {
err error
}
From 0ae881739bab56f91ad95e7b3d0dc7e470b9ce2e Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 22 Jul 2024 15:02:52 +0100
Subject: [PATCH 13/49] [TEST] Storage: check MergeQuerier with nil primary
This test fails on current code.
Signed-off-by: Bryan Boreham
---
storage/merge_test.go | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git a/storage/merge_test.go b/storage/merge_test.go
index dae338a004..488edb2e60 100644
--- a/storage/merge_test.go
+++ b/storage/merge_test.go
@@ -1504,6 +1504,25 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
},
+ {
+ name: "nil primary querier with failed secondary",
+ primaries: nil,
+ secondaries: []Querier{
+ &mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage},
+ },
+ expectedLabels: []string{},
+ expectedWarnings: annotations.New().Add(errStorage),
+ },
+ {
+ name: "nil primary querier with two failed secondaries",
+ primaries: nil,
+ secondaries: []Querier{
+ &mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage},
+ &mockQuerier{resp: []string{"c"}, warnings: nil, err: errStorage},
+ },
+ expectedLabels: []string{},
+ expectedWarnings: annotations.New().Add(errStorage),
+ },
{
name: "one successful primary querier with failed secondaries",
primaries: []Querier{
From 90d793e8c5e8f8c9823cfa9942e4e08019b37a6d Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 22 Jul 2024 15:33:07 +0100
Subject: [PATCH 14/49] [BUGFIX] Storage: Single secondary querier errors
should be warnings.
Signed-off-by: Bryan Boreham
---
storage/merge.go | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/storage/merge.go b/storage/merge.go
index 194494b6a9..66c4c3ed32 100644
--- a/storage/merge.go
+++ b/storage/merge.go
@@ -51,7 +51,7 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
- return secondaries[0]
+ return &querierAdapter{newSecondaryQuerierFrom(secondaries[0])}
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
@@ -89,7 +89,7 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
- return secondaries[0]
+ return &chunkQuerierAdapter{newSecondaryQuerierFromChunk(secondaries[0])}
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
From 12bd92a25ccc6516f7cda9f53b5798fb5992a6c3 Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 22 Jul 2024 15:35:09 +0100
Subject: [PATCH 15/49] [ENHANCEMENT] Storage: Short-circuit merge of single
querier with no-op queriers
Filter before checking whether there is only one.
Signed-off-by: Bryan Boreham
---
storage/merge.go | 46 ++++++++++++++++++++++++++++++++--------------
1 file changed, 32 insertions(+), 14 deletions(-)
diff --git a/storage/merge.go b/storage/merge.go
index 66c4c3ed32..2424b26ab7 100644
--- a/storage/merge.go
+++ b/storage/merge.go
@@ -45,8 +45,11 @@ type mergeGenericQuerier struct {
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
+ primaries = filterQueriers(primaries)
+ secondaries = filterQueriers(secondaries)
+
switch {
- case len(primaries)+len(secondaries) == 0:
+ case len(primaries) == 0 && len(secondaries) == 0:
return noopQuerier{}
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
@@ -56,14 +59,10 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
- if _, ok := q.(noopQuerier); !ok && q != nil {
- queriers = append(queriers, newGenericQuerierFrom(q))
- }
+ queriers = append(queriers, newGenericQuerierFrom(q))
}
for _, q := range secondaries {
- if _, ok := q.(noopQuerier); !ok && q != nil {
- queriers = append(queriers, newSecondaryQuerierFrom(q))
- }
+ queriers = append(queriers, newSecondaryQuerierFrom(q))
}
concurrentSelect := false
@@ -77,12 +76,25 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
}}
}
+func filterQueriers(qs []Querier) []Querier {
+ ret := make([]Querier, 0, len(qs))
+ for _, q := range qs {
+ if _, ok := q.(noopQuerier); !ok && q != nil {
+ ret = append(ret, q)
+ }
+ }
+ return ret
+}
+
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
+ primaries = filterChunkQueriers(primaries)
+ secondaries = filterChunkQueriers(secondaries)
+
switch {
case len(primaries) == 0 && len(secondaries) == 0:
return noopChunkQuerier{}
@@ -94,14 +106,10 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
- if _, ok := q.(noopChunkQuerier); !ok && q != nil {
- queriers = append(queriers, newGenericQuerierFromChunk(q))
- }
+ queriers = append(queriers, newGenericQuerierFromChunk(q))
}
- for _, querier := range secondaries {
- if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
- queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
- }
+ for _, q := range secondaries {
+ queriers = append(queriers, newSecondaryQuerierFromChunk(q))
}
concurrentSelect := false
@@ -115,6 +123,16 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
}}
}
+func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
+ ret := make([]ChunkQuerier, 0, len(qs))
+ for _, q := range qs {
+ if _, ok := q.(noopChunkQuerier); !ok && q != nil {
+ ret = append(ret, q)
+ }
+ }
+ return ret
+}
+
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
From cc7dcf5afea759263ca0fa555f70741b76ce4df6 Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 22 Jul 2024 16:17:35 +0100
Subject: [PATCH 16/49] [DOCS] Querying basics: explain range and instant
queries
I often see people ask questions that indicate they don't understand
this point, and launching into "instant vector" and "range vector" is
likely to point them in the wrong direction.
Remove the admonishment that the reader mustn't confuse these things.
Remove mention of "inferred sample timestamps" that is never explained.
Signed-off-by: Bryan Boreham
---
docs/querying/basics.md | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git a/docs/querying/basics.md b/docs/querying/basics.md
index 1c72adb3e5..304c9f07d4 100644
--- a/docs/querying/basics.md
+++ b/docs/querying/basics.md
@@ -8,9 +8,15 @@ sort_rank: 1
Prometheus provides a functional query language called PromQL (Prometheus Query
Language) that lets the user select and aggregate time series data in real
-time. The result of an expression can either be shown as a graph, viewed as
-tabular data in Prometheus's expression browser, or consumed by external
-systems via the [HTTP API](api.md).
+time.
+
+When you send a query request to Prometheus, it can be an _instant query_, evaluated at one point in time,
+or a _range query_ at equally-spaced steps between a start and an end time. PromQL works exactly the same
+in each cases; the range query is just like an instant query run multiple times at different timestamps.
+
+In the Prometheus UI, the "Table" tab is for instant queries and the "Graph" tab is for range queries.
+
+Other programs can fetch the result of a PromQL expression via the [HTTP API](api.md).
## Examples
@@ -94,9 +100,7 @@ Examples:
## Time series selectors
-Time series selectors are responsible for selecting the times series and raw or inferred sample timestamps and values.
-
-Time series *selectors* are not to be confused with higher level concept of instant and range *queries* that can execute the time series *selectors*. A higher level instant query would evaluate the given selector at one point in time, however the range query would evaluate the selector at multiple different times in between a minimum and maximum timestamp at regular steps.
+These are the basic building-blocks that instruct PromQL what data to fetch.
### Instant vector selectors
From c037a3df844d15b5e38be7eaa28f842c5ee07ee2 Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Mon, 22 Jul 2024 16:34:42 +0100
Subject: [PATCH 17/49] lint
Signed-off-by: Bryan Boreham
---
storage/merge_test.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/storage/merge_test.go b/storage/merge_test.go
index 488edb2e60..b145743c86 100644
--- a/storage/merge_test.go
+++ b/storage/merge_test.go
@@ -949,7 +949,7 @@ func (m *mockQuerier) Select(_ context.Context, sortSeries bool, _ *SelectHints,
} else if len(m.resp) > 0 {
ret = make([]Series, 0, len(m.resp))
for _, l := range m.resp {
- ret = append(ret, NewListSeries(labels.FromStrings("test", string(l)), nil))
+ ret = append(ret, NewListSeries(labels.FromStrings("test", l), nil))
}
}
if sortSeries {
From be7a4c9b83a9f074f823d12e3d58338407fe76a1 Mon Sep 17 00:00:00 2001
From: Filip Petkovski
Date: Fri, 26 Jul 2024 09:49:57 +0200
Subject: [PATCH 18/49] Ignore stale histograms for counter reset detection
The histogram stats decoder keeps track of the last seen histogram sample
in order to properly detect counter resets. We are seeing an issue where
a histogram with UnknownResetHint gets treated as a counter reset when it follows
a stale histogram sample.
I believe that this is incorrect since stale samples should be completely ignored
in PromQL. As a result, they should not be stored in the histogram stats iterator
and the counter reset detection needs to be done against the last non-stale sample.
Signed-off-by: Filip Petkovski
---
promql/histogram_stats_iterator.go | 2 -
promql/histogram_stats_iterator_test.go | 123 +++++++++++++++---------
tsdb/tsdbutil/histogram.go | 10 +-
3 files changed, 84 insertions(+), 51 deletions(-)
diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go
index dfafea5f8c..0a5f67ae7c 100644
--- a/promql/histogram_stats_iterator.go
+++ b/promql/histogram_stats_iterator.go
@@ -48,7 +48,6 @@ func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *hi
var t int64
t, f.currentH = f.Iterator.AtHistogram(f.currentH)
if value.IsStaleNaN(f.currentH.Sum) {
- f.setLastH(f.currentH)
h = &histogram.Histogram{Sum: f.currentH.Sum}
return t, h
}
@@ -77,7 +76,6 @@ func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram)
var t int64
t, f.currentFH = f.Iterator.AtFloatHistogram(f.currentFH)
if value.IsStaleNaN(f.currentFH.Sum) {
- f.setLastFH(f.currentFH)
return t, &histogram.FloatHistogram{Sum: f.currentFH.Sum}
}
diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go
index b71a9d6029..d5c081348c 100644
--- a/promql/histogram_stats_iterator_test.go
+++ b/promql/histogram_stats_iterator_test.go
@@ -14,62 +14,99 @@
package promql
import (
+ "fmt"
+ "math"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
+ "github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
func TestHistogramStatsDecoding(t *testing.T) {
- histograms := []*histogram.Histogram{
- tsdbutil.GenerateTestHistogram(0),
- tsdbutil.GenerateTestHistogram(1),
- tsdbutil.GenerateTestHistogram(2),
- tsdbutil.GenerateTestHistogram(2),
- }
- histograms[0].CounterResetHint = histogram.NotCounterReset
- histograms[1].CounterResetHint = histogram.UnknownCounterReset
- histograms[2].CounterResetHint = histogram.CounterReset
- histograms[3].CounterResetHint = histogram.UnknownCounterReset
-
- expectedHints := []histogram.CounterResetHint{
- histogram.NotCounterReset,
- histogram.NotCounterReset,
- histogram.CounterReset,
- histogram.NotCounterReset,
+ cases := []struct {
+ name string
+ histograms []*histogram.Histogram
+ expectedHints []histogram.CounterResetHint
+ }{
+ {
+ name: "unknown counter reset triggers detection",
+ histograms: []*histogram.Histogram{
+ tsdbutil.GenerateTestHistogramWithHint(0, histogram.NotCounterReset),
+ tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
+ tsdbutil.GenerateTestHistogramWithHint(2, histogram.CounterReset),
+ tsdbutil.GenerateTestHistogramWithHint(2, histogram.UnknownCounterReset),
+ },
+ expectedHints: []histogram.CounterResetHint{
+ histogram.NotCounterReset,
+ histogram.NotCounterReset,
+ histogram.CounterReset,
+ histogram.NotCounterReset,
+ },
+ },
+ {
+ name: "stale sample before unknown reset hint",
+ histograms: []*histogram.Histogram{
+ tsdbutil.GenerateTestHistogramWithHint(0, histogram.NotCounterReset),
+ tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
+ {Sum: math.Float64frombits(value.StaleNaN)},
+ tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
+ },
+ expectedHints: []histogram.CounterResetHint{
+ histogram.NotCounterReset,
+ histogram.NotCounterReset,
+ histogram.UnknownCounterReset,
+ histogram.NotCounterReset,
+ },
+ },
}
- t.Run("histogram_stats", func(t *testing.T) {
- decodedStats := make([]*histogram.Histogram, 0)
- statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil))
- for statsIterator.Next() != chunkenc.ValNone {
- _, h := statsIterator.AtHistogram(nil)
- decodedStats = append(decodedStats, h)
- }
- for i := 0; i < len(histograms); i++ {
- require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint)
- require.Equal(t, histograms[i].Count, decodedStats[i].Count)
- require.Equal(t, histograms[i].Sum, decodedStats[i].Sum)
- }
- })
- t.Run("float_histogram_stats", func(t *testing.T) {
- decodedStats := make([]*histogram.FloatHistogram, 0)
- statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil))
- for statsIterator.Next() != chunkenc.ValNone {
- _, h := statsIterator.AtFloatHistogram(nil)
- decodedStats = append(decodedStats, h)
- }
- for i := 0; i < len(histograms); i++ {
- fh := histograms[i].ToFloat(nil)
- require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint)
- require.Equal(t, fh.Count, decodedStats[i].Count)
- require.Equal(t, fh.Sum, decodedStats[i].Sum)
- }
- })
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Run("histogram_stats", func(t *testing.T) {
+ decodedStats := make([]*histogram.Histogram, 0)
+ statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil))
+ for statsIterator.Next() != chunkenc.ValNone {
+ _, h := statsIterator.AtHistogram(nil)
+ decodedStats = append(decodedStats, h)
+ }
+ for i := 0; i < len(tc.histograms); i++ {
+ require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint, fmt.Sprintf("mismatch in counter reset hint for histogram %d", i))
+ h := tc.histograms[i]
+ if value.IsStaleNaN(h.Sum) {
+ require.True(t, value.IsStaleNaN(decodedStats[i].Sum))
+ require.Equal(t, uint64(0), decodedStats[i].Count)
+ } else {
+ require.Equal(t, tc.histograms[i].Count, decodedStats[i].Count)
+ require.Equal(t, tc.histograms[i].Sum, decodedStats[i].Sum)
+ }
+ }
+ })
+ t.Run("float_histogram_stats", func(t *testing.T) {
+ decodedStats := make([]*histogram.FloatHistogram, 0)
+ statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil))
+ for statsIterator.Next() != chunkenc.ValNone {
+ _, h := statsIterator.AtFloatHistogram(nil)
+ decodedStats = append(decodedStats, h)
+ }
+ for i := 0; i < len(tc.histograms); i++ {
+ require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint)
+ fh := tc.histograms[i].ToFloat(nil)
+ if value.IsStaleNaN(fh.Sum) {
+ require.True(t, value.IsStaleNaN(decodedStats[i].Sum))
+ require.Equal(t, float64(0), decodedStats[i].Count)
+ } else {
+ require.Equal(t, fh.Count, decodedStats[i].Count)
+ require.Equal(t, fh.Sum, decodedStats[i].Sum)
+ }
+ }
+ })
+ })
+ }
}
type histogramSeries struct {
diff --git a/tsdb/tsdbutil/histogram.go b/tsdb/tsdbutil/histogram.go
index 3c7349cf72..ce934a638d 100644
--- a/tsdb/tsdbutil/histogram.go
+++ b/tsdb/tsdbutil/histogram.go
@@ -30,12 +30,10 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
return r
}
-func GenerateTestHistogramsWithUnknownResetHint(n int) []*histogram.Histogram {
- hs := GenerateTestHistograms(n)
- for i := range hs {
- hs[i].CounterResetHint = histogram.UnknownCounterReset
- }
- return hs
+func GenerateTestHistogramWithHint(n int, hint histogram.CounterResetHint) *histogram.Histogram {
+ h := GenerateTestHistogram(n)
+ h.CounterResetHint = hint
+ return h
}
// GenerateTestHistogram but it is up to the user to set any known counter reset hint.
From 6e89250a5d937485a140c6ba6dcdb35d2db51cd0 Mon Sep 17 00:00:00 2001
From: Bryan Boreham
Date: Fri, 26 Jul 2024 09:49:25 +0100
Subject: [PATCH 19/49] Revert "Chunked remote read: close the querier earlier"
Believed to trigger segmentation faults due to memory-mapped block
data still being accessed by iterators after the querier is closed.
Signed-off-by: Bryan Boreham
---
storage/remote/read_handler.go | 53 ++++++++++++++--------------------
1 file changed, 21 insertions(+), 32 deletions(-)
diff --git a/storage/remote/read_handler.go b/storage/remote/read_handler.go
index 2a00ce897f..ffc64c9c3f 100644
--- a/storage/remote/read_handler.go
+++ b/storage/remote/read_handler.go
@@ -202,16 +202,34 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
return err
}
- chunks := h.getChunkSeriesSet(ctx, query, filteredMatchers)
- if err := chunks.Err(); err != nil {
+ querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
+ if err != nil {
return err
}
+ defer func() {
+ if err := querier.Close(); err != nil {
+ level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
+ }
+ }()
+
+ var hints *storage.SelectHints
+ if query.Hints != nil {
+ hints = &storage.SelectHints{
+ Start: query.Hints.StartMs,
+ End: query.Hints.EndMs,
+ Step: query.Hints.StepMs,
+ Func: query.Hints.Func,
+ Grouping: query.Hints.Grouping,
+ Range: query.Hints.RangeMs,
+ By: query.Hints.By,
+ }
+ }
ws, err := StreamChunkedReadResponses(
NewChunkedWriter(w, f),
int64(i),
// The streaming API has to provide the series sorted.
- chunks,
+ querier.Select(ctx, true, hints, filteredMatchers...),
sortedExternalLabels,
h.remoteReadMaxBytesInFrame,
h.marshalPool,
@@ -236,35 +254,6 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
}
}
-// getChunkSeriesSet executes a query to retrieve a ChunkSeriesSet,
-// encapsulating the operation in its own function to ensure timely release of
-// the querier resources.
-func (h *readHandler) getChunkSeriesSet(ctx context.Context, query *prompb.Query, filteredMatchers []*labels.Matcher) storage.ChunkSeriesSet {
- querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
- if err != nil {
- return storage.ErrChunkSeriesSet(err)
- }
- defer func() {
- if err := querier.Close(); err != nil {
- level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
- }
- }()
-
- var hints *storage.SelectHints
- if query.Hints != nil {
- hints = &storage.SelectHints{
- Start: query.Hints.StartMs,
- End: query.Hints.EndMs,
- Step: query.Hints.StepMs,
- Func: query.Hints.Func,
- Grouping: query.Hints.Grouping,
- Range: query.Hints.RangeMs,
- By: query.Hints.By,
- }
- }
- return querier.Select(ctx, true, hints, filteredMatchers...)
-}
-
// filterExtLabelsFromMatchers change equality matchers which match external labels
// to a matcher that looks for an empty label,
// as that label should not be present in the storage.
From d4f098ae80fb276153efc757e373c813163da0e8 Mon Sep 17 00:00:00 2001
From: Marco Pracucci
Date: Fri, 26 Jul 2024 14:55:39 +0200
Subject: [PATCH 20/49] Fix relabel.Regexp zero value marshalling (#14517)
Signed-off-by: Marco Pracucci
---
model/relabel/relabel.go | 4 ++++
model/relabel/relabel_test.go | 13 +++++++++++++
2 files changed, 17 insertions(+)
diff --git a/model/relabel/relabel.go b/model/relabel/relabel.go
index 4f33edda43..a880465969 100644
--- a/model/relabel/relabel.go
+++ b/model/relabel/relabel.go
@@ -213,6 +213,10 @@ func (re Regexp) IsZero() bool {
// String returns the original string used to compile the regular expression.
func (re Regexp) String() string {
+ if re.Regexp == nil {
+ return ""
+ }
+
str := re.Regexp.String()
// Trim the anchor `^(?:` prefix and `)$` suffix.
return str[4 : len(str)-2]
diff --git a/model/relabel/relabel_test.go b/model/relabel/relabel_test.go
index 0f11f7068d..fc9952134d 100644
--- a/model/relabel/relabel_test.go
+++ b/model/relabel/relabel_test.go
@@ -900,3 +900,16 @@ action: replace
})
}
}
+
+func TestRegexp_ShouldMarshalAndUnmarshalZeroValue(t *testing.T) {
+ var zero Regexp
+
+ marshalled, err := yaml.Marshal(&zero)
+ require.NoError(t, err)
+ require.Equal(t, "null\n", string(marshalled))
+
+ var unmarshalled Regexp
+ err = yaml.Unmarshal(marshalled, &unmarshalled)
+ require.NoError(t, err)
+ require.Nil(t, unmarshalled.Regexp)
+}
From fe12924638d433c99b51f9acb1d7ebb9c1f40881 Mon Sep 17 00:00:00 2001
From: Kushal shukla <85934954+kushalShukla-web@users.noreply.github.com>
Date: Mon, 29 Jul 2024 07:28:08 -0400
Subject: [PATCH 21/49] promtool: JUnit-Format XML Test Results (#14506)
* Junit compatible output
Signed-off-by: Kushal Shukla
---
cmd/promtool/main.go | 7 ++-
cmd/promtool/unittest.go | 40 +++++++++++++----
cmd/promtool/unittest_test.go | 50 +++++++++++++++++++++
docs/command-line/promtool.md | 9 ++++
util/junitxml/junitxml.go | 81 ++++++++++++++++++++++++++++++++++
util/junitxml/junitxml_test.go | 66 +++++++++++++++++++++++++++
6 files changed, 243 insertions(+), 10 deletions(-)
create mode 100644 util/junitxml/junitxml.go
create mode 100644 util/junitxml/junitxml_test.go
diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go
index e1d275e97e..1c8e1dd1c8 100644
--- a/cmd/promtool/main.go
+++ b/cmd/promtool/main.go
@@ -204,6 +204,7 @@ func main() {
pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap()
testCmd := app.Command("test", "Unit testing.")
+ junitOutFile := testCmd.Flag("junit", "File path to store JUnit XML test results.").OpenFile(os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
testRulesCmd := testCmd.Command("rules", "Unit tests for rules.")
testRulesRun := testRulesCmd.Flag("run", "If set, will only run test groups whose names match the regular expression. Can be specified multiple times.").Strings()
testRulesFiles := testRulesCmd.Arg(
@@ -378,7 +379,11 @@ func main() {
os.Exit(QueryLabels(serverURL, httpRoundTripper, *queryLabelsMatch, *queryLabelsName, *queryLabelsBegin, *queryLabelsEnd, p))
case testRulesCmd.FullCommand():
- os.Exit(RulesUnitTest(
+ results := io.Discard
+ if *junitOutFile != nil {
+ results = *junitOutFile
+ }
+ os.Exit(RulesUnitTestResult(results,
promqltest.LazyLoaderOpts{
EnableAtModifier: true,
EnableNegativeOffset: true,
diff --git a/cmd/promtool/unittest.go b/cmd/promtool/unittest.go
index 5451c5296c..7030635d1c 100644
--- a/cmd/promtool/unittest.go
+++ b/cmd/promtool/unittest.go
@@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "io"
"os"
"path/filepath"
"sort"
@@ -29,9 +30,10 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/grafana/regexp"
"github.com/nsf/jsondiff"
- "github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
+ "github.com/prometheus/common/model"
+
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
@@ -39,12 +41,18 @@ import (
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
+ "github.com/prometheus/prometheus/util/junitxml"
)
// RulesUnitTest does unit testing of rules based on the unit testing files provided.
// More info about the file format can be found in the docs.
func RulesUnitTest(queryOpts promqltest.LazyLoaderOpts, runStrings []string, diffFlag bool, files ...string) int {
+ return RulesUnitTestResult(io.Discard, queryOpts, runStrings, diffFlag, files...)
+}
+
+func RulesUnitTestResult(results io.Writer, queryOpts promqltest.LazyLoaderOpts, runStrings []string, diffFlag bool, files ...string) int {
failed := false
+ junit := &junitxml.JUnitXML{}
var run *regexp.Regexp
if runStrings != nil {
@@ -52,7 +60,7 @@ func RulesUnitTest(queryOpts promqltest.LazyLoaderOpts, runStrings []string, dif
}
for _, f := range files {
- if errs := ruleUnitTest(f, queryOpts, run, diffFlag); errs != nil {
+ if errs := ruleUnitTest(f, queryOpts, run, diffFlag, junit.Suite(f)); errs != nil {
fmt.Fprintln(os.Stderr, " FAILED:")
for _, e := range errs {
fmt.Fprintln(os.Stderr, e.Error())
@@ -64,25 +72,30 @@ func RulesUnitTest(queryOpts promqltest.LazyLoaderOpts, runStrings []string, dif
}
fmt.Println()
}
+ err := junit.WriteXML(results)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "failed to write JUnit XML: %s\n", err)
+ }
if failed {
return failureExitCode
}
return successExitCode
}
-func ruleUnitTest(filename string, queryOpts promqltest.LazyLoaderOpts, run *regexp.Regexp, diffFlag bool) []error {
- fmt.Println("Unit Testing: ", filename)
-
+func ruleUnitTest(filename string, queryOpts promqltest.LazyLoaderOpts, run *regexp.Regexp, diffFlag bool, ts *junitxml.TestSuite) []error {
b, err := os.ReadFile(filename)
if err != nil {
+ ts.Abort(err)
return []error{err}
}
var unitTestInp unitTestFile
if err := yaml.UnmarshalStrict(b, &unitTestInp); err != nil {
+ ts.Abort(err)
return []error{err}
}
if err := resolveAndGlobFilepaths(filepath.Dir(filename), &unitTestInp); err != nil {
+ ts.Abort(err)
return []error{err}
}
@@ -91,29 +104,38 @@ func ruleUnitTest(filename string, queryOpts promqltest.LazyLoaderOpts, run *reg
}
evalInterval := time.Duration(unitTestInp.EvaluationInterval)
-
+ ts.Settime(time.Now().Format("2006-01-02T15:04:05"))
// Giving number for groups mentioned in the file for ordering.
// Lower number group should be evaluated before higher number group.
groupOrderMap := make(map[string]int)
for i, gn := range unitTestInp.GroupEvalOrder {
if _, ok := groupOrderMap[gn]; ok {
- return []error{fmt.Errorf("group name repeated in evaluation order: %s", gn)}
+ err := fmt.Errorf("group name repeated in evaluation order: %s", gn)
+ ts.Abort(err)
+ return []error{err}
}
groupOrderMap[gn] = i
}
// Testing.
var errs []error
- for _, t := range unitTestInp.Tests {
+ for i, t := range unitTestInp.Tests {
if !matchesRun(t.TestGroupName, run) {
continue
}
-
+ testname := t.TestGroupName
+ if testname == "" {
+ testname = fmt.Sprintf("unnamed#%d", i)
+ }
+ tc := ts.Case(testname)
if t.Interval == 0 {
t.Interval = unitTestInp.EvaluationInterval
}
ers := t.test(evalInterval, groupOrderMap, queryOpts, diffFlag, unitTestInp.RuleFiles...)
if ers != nil {
+ for _, e := range ers {
+ tc.Fail(e.Error())
+ }
errs = append(errs, ers...)
}
}
diff --git a/cmd/promtool/unittest_test.go b/cmd/promtool/unittest_test.go
index 2dbd5a4e51..9bbac28e9f 100644
--- a/cmd/promtool/unittest_test.go
+++ b/cmd/promtool/unittest_test.go
@@ -14,11 +14,15 @@
package main
import (
+ "bytes"
+ "encoding/xml"
+ "fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/promql/promqltest"
+ "github.com/prometheus/prometheus/util/junitxml"
)
func TestRulesUnitTest(t *testing.T) {
@@ -125,13 +129,59 @@ func TestRulesUnitTest(t *testing.T) {
want: 0,
},
}
+ reuseFiles := []string{}
+ reuseCount := [2]int{}
for _, tt := range tests {
+ if (tt.queryOpts == promqltest.LazyLoaderOpts{
+ EnableNegativeOffset: true,
+ } || tt.queryOpts == promqltest.LazyLoaderOpts{
+ EnableAtModifier: true,
+ }) {
+ reuseFiles = append(reuseFiles, tt.args.files...)
+ reuseCount[tt.want] += len(tt.args.files)
+ }
t.Run(tt.name, func(t *testing.T) {
if got := RulesUnitTest(tt.queryOpts, nil, false, tt.args.files...); got != tt.want {
t.Errorf("RulesUnitTest() = %v, want %v", got, tt.want)
}
})
}
+ t.Run("Junit xml output ", func(t *testing.T) {
+ var buf bytes.Buffer
+ if got := RulesUnitTestResult(&buf, promqltest.LazyLoaderOpts{}, nil, false, reuseFiles...); got != 1 {
+ t.Errorf("RulesUnitTestResults() = %v, want 1", got)
+ }
+ var test junitxml.JUnitXML
+ output := buf.Bytes()
+ err := xml.Unmarshal(output, &test)
+ if err != nil {
+ fmt.Println("error in decoding XML:", err)
+ return
+ }
+ var total int
+ var passes int
+ var failures int
+ var cases int
+ total = len(test.Suites)
+ if total != len(reuseFiles) {
+ t.Errorf("JUnit output had %d testsuite elements; expected %d\n", total, len(reuseFiles))
+ }
+
+ for _, i := range test.Suites {
+ if i.FailureCount == 0 {
+ passes++
+ } else {
+ failures++
+ }
+ cases += len(i.Cases)
+ }
+ if total != passes+failures {
+ t.Errorf("JUnit output mismatch: Total testsuites (%d) does not equal the sum of passes (%d) and failures (%d).", total, passes, failures)
+ }
+ if cases < total {
+ t.Errorf("JUnit output had %d suites without test cases\n", total-cases)
+ }
+ })
}
func TestRulesUnitTestRun(t *testing.T) {
diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md
index 443cd3f0cb..6bb80169a9 100644
--- a/docs/command-line/promtool.md
+++ b/docs/command-line/promtool.md
@@ -442,6 +442,15 @@ Unit testing.
+#### Flags
+
+| Flag | Description |
+| --- | --- |
+| --junit
| File path to store JUnit XML test results. |
+
+
+
+
##### `promtool test rules`
Unit tests for rules.
diff --git a/util/junitxml/junitxml.go b/util/junitxml/junitxml.go
new file mode 100644
index 0000000000..14e4b6dbae
--- /dev/null
+++ b/util/junitxml/junitxml.go
@@ -0,0 +1,81 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package junitxml
+
+import (
+ "encoding/xml"
+ "io"
+)
+
+type JUnitXML struct {
+ XMLName xml.Name `xml:"testsuites"`
+ Suites []*TestSuite `xml:"testsuite"`
+}
+
+type TestSuite struct {
+ Name string `xml:"name,attr"`
+ TestCount int `xml:"tests,attr"`
+ FailureCount int `xml:"failures,attr"`
+ ErrorCount int `xml:"errors,attr"`
+ SkippedCount int `xml:"skipped,attr"`
+ Timestamp string `xml:"timestamp,attr"`
+ Cases []*TestCase `xml:"testcase"`
+}
+type TestCase struct {
+ Name string `xml:"name,attr"`
+ Failures []string `xml:"failure,omitempty"`
+ Error string `xml:"error,omitempty"`
+}
+
+func (j *JUnitXML) WriteXML(h io.Writer) error {
+ return xml.NewEncoder(h).Encode(j)
+}
+
+func (j *JUnitXML) Suite(name string) *TestSuite {
+ ts := &TestSuite{Name: name}
+ j.Suites = append(j.Suites, ts)
+ return ts
+}
+
+func (ts *TestSuite) Fail(f string) {
+ ts.FailureCount++
+ curt := ts.lastCase()
+ curt.Failures = append(curt.Failures, f)
+}
+
+func (ts *TestSuite) lastCase() *TestCase {
+ if len(ts.Cases) == 0 {
+ ts.Case("unknown")
+ }
+ return ts.Cases[len(ts.Cases)-1]
+}
+
+func (ts *TestSuite) Case(name string) *TestSuite {
+ j := &TestCase{
+ Name: name,
+ }
+ ts.Cases = append(ts.Cases, j)
+ ts.TestCount++
+ return ts
+}
+
+func (ts *TestSuite) Settime(name string) {
+ ts.Timestamp = name
+}
+
+func (ts *TestSuite) Abort(e error) {
+ ts.ErrorCount++
+ curt := ts.lastCase()
+ curt.Error = e.Error()
+}
diff --git a/util/junitxml/junitxml_test.go b/util/junitxml/junitxml_test.go
new file mode 100644
index 0000000000..ad4d0293d0
--- /dev/null
+++ b/util/junitxml/junitxml_test.go
@@ -0,0 +1,66 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package junitxml
+
+import (
+ "bytes"
+ "encoding/xml"
+ "errors"
+ "testing"
+)
+
+func TestJunitOutput(t *testing.T) {
+ var buf bytes.Buffer
+ var test JUnitXML
+ x := FakeTestSuites()
+ if err := x.WriteXML(&buf); err != nil {
+ t.Fatalf("Failed to encode XML: %v", err)
+ }
+
+ output := buf.Bytes()
+
+ err := xml.Unmarshal(output, &test)
+ if err != nil {
+ t.Errorf("Unmarshal failed with error: %v", err)
+ }
+ var total int
+ var cases int
+ total = len(test.Suites)
+ if total != 3 {
+ t.Errorf("JUnit output had %d testsuite elements; expected 3\n", total)
+ }
+ for _, i := range test.Suites {
+ cases += len(i.Cases)
+ }
+
+ if cases != 7 {
+ t.Errorf("JUnit output had %d testcase; expected 7\n", cases)
+ }
+}
+
+func FakeTestSuites() *JUnitXML {
+ ju := &JUnitXML{}
+ good := ju.Suite("all good")
+ good.Case("alpha")
+ good.Case("beta")
+ good.Case("gamma")
+ mixed := ju.Suite("mixed")
+ mixed.Case("good")
+ bad := mixed.Case("bad")
+ bad.Fail("once")
+ bad.Fail("twice")
+ mixed.Case("ugly").Abort(errors.New("buggy"))
+ ju.Suite("fast").Fail("fail early")
+ return ju
+}
From 2cd97c61e02ac9cf50e0fa4a72bbc61f8e128b8b Mon Sep 17 00:00:00 2001
From: Filip Petkovski
Date: Mon, 29 Jul 2024 14:53:32 +0200
Subject: [PATCH 22/49] Add more test cases
Signed-off-by: Filip Petkovski
---
promql/histogram_stats_iterator_test.go | 33 +++++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go
index d5c081348c..7a2953d3e2 100644
--- a/promql/histogram_stats_iterator_test.go
+++ b/promql/histogram_stats_iterator_test.go
@@ -63,6 +63,39 @@ func TestHistogramStatsDecoding(t *testing.T) {
histogram.NotCounterReset,
},
},
+ {
+ name: "unknown counter reset at the beginning",
+ histograms: []*histogram.Histogram{
+ tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
+ },
+ expectedHints: []histogram.CounterResetHint{
+ histogram.NotCounterReset,
+ },
+ },
+ {
+ name: "detect real counter reset",
+ histograms: []*histogram.Histogram{
+ tsdbutil.GenerateTestHistogramWithHint(2, histogram.UnknownCounterReset),
+ tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
+ },
+ expectedHints: []histogram.CounterResetHint{
+ histogram.NotCounterReset,
+ histogram.CounterReset,
+ },
+ },
+ {
+ name: "detect real counter reset after stale NaN",
+ histograms: []*histogram.Histogram{
+ tsdbutil.GenerateTestHistogramWithHint(2, histogram.UnknownCounterReset),
+ {Sum: math.Float64frombits(value.StaleNaN)},
+ tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
+ },
+ expectedHints: []histogram.CounterResetHint{
+ histogram.NotCounterReset,
+ histogram.UnknownCounterReset,
+ histogram.CounterReset,
+ },
+ },
}
for _, tc := range cases {
From b7f2f3c3ac90f2347de6112c185a4e470e7ae8a6 Mon Sep 17 00:00:00 2001
From: Oleg Zaytsev
Date: Tue, 30 Jul 2024 10:19:56 +0200
Subject: [PATCH 23/49] Add BenchmarkLoadRealWLs
This benchmark runs on real WLs rather than fake generated ones.
Signed-off-by: Oleg Zaytsev
---
tsdb/head_test.go | 38 ++++++++++++++++++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git a/tsdb/head_test.go b/tsdb/head_test.go
index c192c8a078..09927c23c6 100644
--- a/tsdb/head_test.go
+++ b/tsdb/head_test.go
@@ -23,6 +23,7 @@ import (
"path"
"path/filepath"
"reflect"
+ "runtime/pprof"
"sort"
"strconv"
"strings"
@@ -89,6 +90,43 @@ func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts
return h, wal
}
+// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
+// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
+// Optionally, BENCHMARK_LOAD_REAL_WLS_PROFILE can be set to a file path to write a CPU profile.
+func BenchmarkLoadRealWLs(b *testing.B) {
+ dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
+ if dir == "" {
+ b.Skipped()
+ }
+
+ profileFile := os.Getenv("BENCHMARK_LOAD_REAL_WLS_PROFILE")
+ if profileFile != "" {
+ b.Logf("Will profile in %s", profileFile)
+ f, err := os.Create(profileFile)
+ require.NoError(b, err)
+ b.Cleanup(func() { f.Close() })
+ require.NoError(b, pprof.StartCPUProfile(f))
+ b.Cleanup(pprof.StopCPUProfile)
+ }
+
+ wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
+ require.NoError(b, err)
+ b.Cleanup(func() { wal.Close() })
+
+ wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
+ require.NoError(b, err)
+ b.Cleanup(func() { wbl.Close() })
+
+ // Load the WAL.
+ for i := 0; i < b.N; i++ {
+ opts := DefaultHeadOptions()
+ opts.ChunkDirRoot = dir
+ h, err := NewHead(nil, nil, wal, wbl, opts, nil)
+ require.NoError(b, err)
+ h.Init(0)
+ }
+}
+
func BenchmarkCreateSeries(b *testing.B) {
series := genSeries(b.N, 10, 0, 0)
h, _ := newTestHead(b, 10000, wlog.CompressionNone, false)
From d8e1b6bdfd3c8cd02a38b21386453dac9b14da1b Mon Sep 17 00:00:00 2001
From: Oleg Zaytsev
Date: Tue, 30 Jul 2024 10:20:29 +0200
Subject: [PATCH 24/49] Store mmMaxTime in same field as seriesShard
We don't use seriesShard during DB initialization, so we can use the
same 8 bytes to store mmMaxTime, and save those during the rest of the
lifetime of the database.
This doesn't affect CPU performance.
Signed-off-by: Oleg Zaytsev
---
tsdb/head.go | 46 +++++++++++++++++++++++++++++++++++-----------
tsdb/head_read.go | 2 +-
tsdb/head_wal.go | 13 ++++++++-----
3 files changed, 44 insertions(+), 17 deletions(-)
diff --git a/tsdb/head.go b/tsdb/head.go
index b7bfaa0fda..1659e57a48 100644
--- a/tsdb/head.go
+++ b/tsdb/head.go
@@ -178,6 +178,7 @@ type HeadOptions struct {
WALReplayConcurrency int
// EnableSharding enables ShardedPostings() support in the Head.
+ // EnableSharding is temporarily disabled during Init().
EnableSharding bool
}
@@ -609,7 +610,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
-func (h *Head) Init(minValidTime int64) error {
+func (h *Head) Init(minValidTime int64) (err error) {
h.minValidTime.Store(minValidTime)
defer func() {
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
@@ -623,6 +624,24 @@ func (h *Head) Init(minValidTime int64) error {
}
}()
+ // If sharding is enabled, disable it while initializing, and calculate the shards later.
+ // We're going to use that field for other purposes during WAL replay,
+ // so we don't want to waste time on calculating the shard that we're going to lose anyway.
+ if h.opts.EnableSharding {
+ h.opts.EnableSharding = false
+ defer func() {
+ if err == nil {
+ h.opts.EnableSharding = true
+ // No locking is needed here as nobody should be writing while we're in Init.
+ for _, stripe := range h.series.series {
+ for _, s := range stripe {
+ s.shardHashOrMemoryMappedMaxTime = labels.StableHash(s.lset)
+ }
+ }
+ }
+ }()
+ }
+
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
start := time.Now()
@@ -683,7 +702,6 @@ func (h *Head) Init(minValidTime int64) error {
mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
lastMmapRef chunks.ChunkDiskMapperRef
- err error
mmapChunkReplayDuration time.Duration
)
@@ -2068,9 +2086,11 @@ type memSeries struct {
ref chunks.HeadSeriesRef
meta *metadata.Metadata
- // Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
- // been explicitly enabled in TSDB.
- shardHash uint64
+ // Series labels hash to use for sharding purposes.
+ // The value is always 0 when sharding has not been explicitly enabled in TSDB.
+ // While the WAL replay the value stored here is the max time of any mmapped chunk,
+ // and the shard hash is re-calculated after WAL replay is complete.
+ shardHashOrMemoryMappedMaxTime uint64
// Everything after here should only be accessed with the lock held.
sync.Mutex
@@ -2095,8 +2115,6 @@ type memSeries struct {
ooo *memSeriesOOOFields
- mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
-
nextAt int64 // Timestamp at which to cut the next chunk.
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
pendingCommit bool // Whether there are samples waiting to be committed to this series.
@@ -2127,10 +2145,10 @@ type memSeriesOOOFields struct {
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries {
s := &memSeries{
- lset: lset,
- ref: id,
- nextAt: math.MinInt64,
- shardHash: shardHash,
+ lset: lset,
+ ref: id,
+ nextAt: math.MinInt64,
+ shardHashOrMemoryMappedMaxTime: shardHash,
}
if !isolationDisabled {
s.txs = newTxRing(0)
@@ -2218,6 +2236,12 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD
return removedInOrder + removedOOO
}
+// shardHash returns the shard hash of the series, only available after WAL replay.
+func (s *memSeries) shardHash() uint64 { return s.shardHashOrMemoryMappedMaxTime }
+
+// mmMaxTime returns the max time of any mmapped chunk in the series, only available during WAL replay.
+func (s *memSeries) mmMaxTime() int64 { return int64(s.shardHashOrMemoryMappedMaxTime) }
+
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
// acquiring lock.
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
diff --git a/tsdb/head_read.go b/tsdb/head_read.go
index 9ba8785ad2..3a50f316ba 100644
--- a/tsdb/head_read.go
+++ b/tsdb/head_read.go
@@ -170,7 +170,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
}
// Check if the series belong to the shard.
- if s.shardHash%shardCount != shardIndex {
+ if s.shardHash()%shardCount != shardIndex {
continue
}
diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go
index 787cb7c267..2852709a04 100644
--- a/tsdb/head_wal.go
+++ b/tsdb/head_wal.go
@@ -435,6 +435,8 @@ Outer:
return nil
}
+func minInt64() int64 { return math.MinInt64 }
+
// resetSeriesWithMMappedChunks is only used during the WAL replay.
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
if mSeries.ref != walSeriesRef {
@@ -481,10 +483,11 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
}
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
if len(mmc) == 0 {
- mSeries.mmMaxTime = math.MinInt64
+ mSeries.shardHashOrMemoryMappedMaxTime = uint64(minInt64())
} else {
- mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
- h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
+ mmMaxTime := mmc[len(mmc)-1].maxTime
+ mSeries.shardHashOrMemoryMappedMaxTime = uint64(mmMaxTime)
+ h.updateMinMaxTime(mmc[0].minTime, mmMaxTime)
}
if len(oooMmc) != 0 {
// Mint and maxt can be in any chunk, they are not sorted.
@@ -585,7 +588,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
unknownRefs++
continue
}
- if s.T <= ms.mmMaxTime {
+ if s.T <= ms.mmMaxTime() {
continue
}
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
@@ -614,7 +617,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
unknownHistogramRefs++
continue
}
- if s.t <= ms.mmMaxTime {
+ if s.t <= ms.mmMaxTime() {
continue
}
var chunkCreated bool
From 0300ad58a97098674ca4757c79a74a05e9c33322 Mon Sep 17 00:00:00 2001
From: Oleg Zaytsev
Date: Tue, 30 Jul 2024 11:31:31 +0200
Subject: [PATCH 25/49] Revert the option regardless of error
Signed-off-by: Oleg Zaytsev
---
tsdb/head.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tsdb/head.go b/tsdb/head.go
index 1659e57a48..9d81b24ae4 100644
--- a/tsdb/head.go
+++ b/tsdb/head.go
@@ -630,8 +630,8 @@ func (h *Head) Init(minValidTime int64) (err error) {
if h.opts.EnableSharding {
h.opts.EnableSharding = false
defer func() {
+ h.opts.EnableSharding = true
if err == nil {
- h.opts.EnableSharding = true
// No locking is needed here as nobody should be writing while we're in Init.
for _, stripe := range h.series.series {
for _, s := range stripe {
From 6cef8698c27b99263efcbe5025846187cf4358f7 Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Tue, 30 Jul 2024 13:30:49 +0200
Subject: [PATCH 26/49] build(deps-dev): bump @lezer/generator from 1.7.0 to
1.7.1 in /web/ui (#14382)
Bumps [@lezer/generator](https://github.com/lezer-parser/generator) from 1.7.0 to 1.7.1.
- [Changelog](https://github.com/lezer-parser/generator/blob/main/CHANGELOG.md)
- [Commits](https://github.com/lezer-parser/generator/compare/1.7.0...1.7.1)
---
updated-dependencies:
- dependency-name: "@lezer/generator"
dependency-type: direct:development
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot]
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
---
web/ui/module/lezer-promql/package.json | 2 +-
web/ui/package-lock.json | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/web/ui/module/lezer-promql/package.json b/web/ui/module/lezer-promql/package.json
index cbd03ae2b1..43a5c44fa8 100644
--- a/web/ui/module/lezer-promql/package.json
+++ b/web/ui/module/lezer-promql/package.json
@@ -30,7 +30,7 @@
"test": "NODE_OPTIONS=--experimental-vm-modules jest"
},
"devDependencies": {
- "@lezer/generator": "^1.7.0",
+ "@lezer/generator": "^1.7.1",
"@lezer/highlight": "^1.2.0",
"@lezer/lr": "^1.4.1"
},
diff --git a/web/ui/package-lock.json b/web/ui/package-lock.json
index 62ac34e436..2028c34022 100644
--- a/web/ui/package-lock.json
+++ b/web/ui/package-lock.json
@@ -72,7 +72,7 @@
"version": "0.53.1",
"license": "Apache-2.0",
"devDependencies": {
- "@lezer/generator": "^1.7.0",
+ "@lezer/generator": "^1.7.1",
"@lezer/highlight": "^1.2.0",
"@lezer/lr": "^1.4.1"
},
@@ -3371,9 +3371,9 @@
"integrity": "sha512-yemX0ZD2xS/73llMZIK6KplkjIjf2EvAHcinDi/TfJ9hS25G0388+ClHt6/3but0oOxinTcQHJLDXh6w1crzFQ=="
},
"node_modules/@lezer/generator": {
- "version": "1.7.0",
- "resolved": "https://registry.npmjs.org/@lezer/generator/-/generator-1.7.0.tgz",
- "integrity": "sha512-IJ16tx3biLKlCXUzcK4v8S10AVa2BSM2rB12rtAL6f1hL2TS/HQQlGCoWRvanlL2J4mCYEEIv9uG7n4kVMkVDA==",
+ "version": "1.7.1",
+ "resolved": "https://registry.npmjs.org/@lezer/generator/-/generator-1.7.1.tgz",
+ "integrity": "sha512-MgPJN9Si+ccxzXl3OAmCeZuUKw4XiPl4y664FX/hnnyG9CTqUPq65N3/VGPA2jD23D7QgMTtNqflta+cPN+5mQ==",
"dev": true,
"dependencies": {
"@lezer/common": "^1.1.0",
From 84b819a69f375dc66ea41302a56e44975c0317e3 Mon Sep 17 00:00:00 2001
From: Max Amin
Date: Tue, 30 Jul 2024 11:25:19 -0400
Subject: [PATCH 27/49] feat: add Google cloud roundtripper for remote write
(#14346)
* feat: Google Auth for remote write
Signed-off-by: Max Amin
---------
Signed-off-by: Max Amin
---
config/config.go | 36 +++++++++++++-----
config/config_test.go | 2 +-
docs/configuration/configuration.md | 16 ++++++--
promql/engine_test.go | 3 +-
rules/manager_test.go | 3 +-
storage/remote/client.go | 9 +++++
storage/remote/googleiam/googleiam.go | 54 +++++++++++++++++++++++++++
storage/remote/write.go | 1 +
tsdb/db_test.go | 5 ++-
9 files changed, 110 insertions(+), 19 deletions(-)
create mode 100644 storage/remote/googleiam/googleiam.go
diff --git a/config/config.go b/config/config.go
index 9139838813..8a62161462 100644
--- a/config/config.go
+++ b/config/config.go
@@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage/remote/azuread"
+ "github.com/prometheus/prometheus/storage/remote/googleiam"
)
var (
@@ -1123,6 +1124,7 @@ type RemoteWriteConfig struct {
MetadataConfig MetadataConfig `yaml:"metadata_config,omitempty"`
SigV4Config *sigv4.SigV4Config `yaml:"sigv4,omitempty"`
AzureADConfig *azuread.AzureADConfig `yaml:"azuread,omitempty"`
+ GoogleIAMConfig *googleiam.Config `yaml:"google_iam,omitempty"`
}
// SetDirectory joins any relative file paths with dir.
@@ -1160,17 +1162,33 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return err
}
- httpClientConfigAuthEnabled := c.HTTPClientConfig.BasicAuth != nil ||
- c.HTTPClientConfig.Authorization != nil || c.HTTPClientConfig.OAuth2 != nil
+ return validateAuthConfigs(c)
+}
- if httpClientConfigAuthEnabled && (c.SigV4Config != nil || c.AzureADConfig != nil) {
- return fmt.Errorf("at most one of basic_auth, authorization, oauth2, sigv4, & azuread must be configured")
+// validateAuthConfigs validates that at most one of basic_auth, authorization, oauth2, sigv4, azuread or google_iam must be configured.
+func validateAuthConfigs(c *RemoteWriteConfig) error {
+ var authConfigured []string
+ if c.HTTPClientConfig.BasicAuth != nil {
+ authConfigured = append(authConfigured, "basic_auth")
}
-
- if c.SigV4Config != nil && c.AzureADConfig != nil {
- return fmt.Errorf("at most one of basic_auth, authorization, oauth2, sigv4, & azuread must be configured")
+ if c.HTTPClientConfig.Authorization != nil {
+ authConfigured = append(authConfigured, "authorization")
+ }
+ if c.HTTPClientConfig.OAuth2 != nil {
+ authConfigured = append(authConfigured, "oauth2")
+ }
+ if c.SigV4Config != nil {
+ authConfigured = append(authConfigured, "sigv4")
+ }
+ if c.AzureADConfig != nil {
+ authConfigured = append(authConfigured, "azuread")
+ }
+ if c.GoogleIAMConfig != nil {
+ authConfigured = append(authConfigured, "google_iam")
+ }
+ if len(authConfigured) > 1 {
+ return fmt.Errorf("at most one of basic_auth, authorization, oauth2, sigv4, azuread or google_iam must be configured. Currently configured: %v", authConfigured)
}
-
return nil
}
@@ -1189,7 +1207,7 @@ func validateHeadersForTracing(headers map[string]string) error {
func validateHeaders(headers map[string]string) error {
for header := range headers {
if strings.ToLower(header) == "authorization" {
- return errors.New("authorization header must be changed via the basic_auth, authorization, oauth2, sigv4, or azuread parameter")
+ return errors.New("authorization header must be changed via the basic_auth, authorization, oauth2, sigv4, azuread or google_iam parameter")
}
if _, ok := reservedHeaders[strings.ToLower(header)]; ok {
return fmt.Errorf("%s is a reserved header. It must not be changed", header)
diff --git a/config/config_test.go b/config/config_test.go
index b684fdb50c..9b074bef1c 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -1826,7 +1826,7 @@ var expectedErrors = []struct {
},
{
filename: "remote_write_authorization_header.bad.yml",
- errMsg: `authorization header must be changed via the basic_auth, authorization, oauth2, sigv4, or azuread parameter`,
+ errMsg: `authorization header must be changed via the basic_auth, authorization, oauth2, sigv4, azuread or google_iam parameter`,
},
{
filename: "remote_write_wrong_msg.bad.yml",
diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md
index 5aa57b3ba6..313a7f2f37 100644
--- a/docs/configuration/configuration.md
+++ b/docs/configuration/configuration.md
@@ -3401,8 +3401,8 @@ authorization:
# It is mutually exclusive with `credentials`.
[ credentials_file: ]
-# Optionally configures AWS's Signature Verification 4 signing process to
-# sign requests. Cannot be set at the same time as basic_auth, authorization, or oauth2.
+# Optionally configures AWS's Signature Verification 4 signing process to sign requests.
+# Cannot be set at the same time as basic_auth, authorization, oauth2, azuread or google_iam.
# To use the default credentials from the AWS SDK, use `sigv4: {}`.
sigv4:
# The AWS region. If blank, the region from the default credentials chain
@@ -3655,12 +3655,12 @@ sigv4:
[ role_arn: ]
# Optional OAuth 2.0 configuration.
-# Cannot be used at the same time as basic_auth, authorization, sigv4, or azuread.
+# Cannot be used at the same time as basic_auth, authorization, sigv4, azuread or google_iam.
oauth2:
[ ]
# Optional AzureAD configuration.
-# Cannot be used at the same time as basic_auth, authorization, oauth2, or sigv4.
+# Cannot be used at the same time as basic_auth, authorization, oauth2, sigv4 or google_iam.
azuread:
# The Azure Cloud. Options are 'AzurePublic', 'AzureChina', or 'AzureGovernment'.
[ cloud: | default = AzurePublic ]
@@ -3680,6 +3680,14 @@ azuread:
[ sdk:
[ tenant_id: ] ]
+# WARNING: Remote write is NOT SUPPORTED by Google Cloud. This configuration is reserved for future use.
+# Optional Google Cloud Monitoring configuration.
+# Cannot be used at the same time as basic_auth, authorization, oauth2, sigv4 or azuread.
+# To use the default credentials from the Google Cloud SDK, use `google_iam: {}`.
+google_iam:
+ # Service account key with monitoring write permessions.
+ credentials_file:
+
# Configures the remote write request's TLS settings.
tls_config:
[ ]
diff --git a/promql/engine_test.go b/promql/engine_test.go
index 523c0613df..8e618d435c 100644
--- a/promql/engine_test.go
+++ b/promql/engine_test.go
@@ -26,7 +26,6 @@ import (
"time"
"github.com/stretchr/testify/require"
- "go.uber.org/goleak"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@@ -51,7 +50,7 @@ const (
func TestMain(m *testing.M) {
// Enable experimental functions testing
parser.EnableExperimentalFunctions = true
- goleak.VerifyTestMain(m)
+ testutil.TolerantVerifyLeak(m)
}
func TestQueryConcurrency(t *testing.T) {
diff --git a/rules/manager_test.go b/rules/manager_test.go
index 51239e6c90..9865cbdfed 100644
--- a/rules/manager_test.go
+++ b/rules/manager_test.go
@@ -32,7 +32,6 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
- "go.uber.org/goleak"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/model/labels"
@@ -50,7 +49,7 @@ import (
)
func TestMain(m *testing.M) {
- goleak.VerifyTestMain(m)
+ prom_testutil.TolerantVerifyLeak(m)
}
func TestAlertingRule(t *testing.T) {
diff --git a/storage/remote/client.go b/storage/remote/client.go
index 17caf7be9b..11e423b6ab 100644
--- a/storage/remote/client.go
+++ b/storage/remote/client.go
@@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote/azuread"
+ "github.com/prometheus/prometheus/storage/remote/googleiam"
)
const maxErrMsgLen = 1024
@@ -131,6 +132,7 @@ type ClientConfig struct {
HTTPClientConfig config_util.HTTPClientConfig
SigV4Config *sigv4.SigV4Config
AzureADConfig *azuread.AzureADConfig
+ GoogleIAMConfig *googleiam.Config
Headers map[string]string
RetryOnRateLimit bool
WriteProtoMsg config.RemoteWriteProtoMsg
@@ -192,6 +194,13 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
}
}
+ if conf.GoogleIAMConfig != nil {
+ t, err = googleiam.NewRoundTripper(conf.GoogleIAMConfig, t)
+ if err != nil {
+ return nil, err
+ }
+ }
+
writeProtoMsg := config.RemoteWriteProtoMsgV1
if conf.WriteProtoMsg != "" {
writeProtoMsg = conf.WriteProtoMsg
diff --git a/storage/remote/googleiam/googleiam.go b/storage/remote/googleiam/googleiam.go
new file mode 100644
index 0000000000..acf3bd5a68
--- /dev/null
+++ b/storage/remote/googleiam/googleiam.go
@@ -0,0 +1,54 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package googleiam provides an http.RoundTripper that attaches an Google Cloud accessToken
+// to remote write requests.
+package googleiam
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+
+ "golang.org/x/oauth2/google"
+ "google.golang.org/api/option"
+ apihttp "google.golang.org/api/transport/http"
+)
+
+type Config struct {
+ CredentialsFile string `yaml:"credentials_file,omitempty"`
+}
+
+// NewRoundTripper creates a round tripper that adds Google Cloud Monitoring authorization to calls
+// using either a credentials file or the default credentials.
+func NewRoundTripper(cfg *Config, next http.RoundTripper) (http.RoundTripper, error) {
+ if next == nil {
+ next = http.DefaultTransport
+ }
+ const scopes = "https://www.googleapis.com/auth/monitoring.write"
+ ctx := context.Background()
+ opts := []option.ClientOption{
+ option.WithScopes(scopes),
+ }
+ if cfg.CredentialsFile != "" {
+ opts = append(opts, option.WithCredentialsFile(cfg.CredentialsFile))
+ } else {
+ creds, err := google.FindDefaultCredentials(ctx, scopes)
+ if err != nil {
+ return nil, fmt.Errorf("error finding default Google credentials: %w", err)
+ }
+ opts = append(opts, option.WithCredentials(creds))
+ }
+
+ return apihttp.NewTransport(ctx, next, opts...)
+}
diff --git a/storage/remote/write.go b/storage/remote/write.go
index 81902a8f1a..3d2f1fdfcd 100644
--- a/storage/remote/write.go
+++ b/storage/remote/write.go
@@ -176,6 +176,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
HTTPClientConfig: rwConf.HTTPClientConfig,
SigV4Config: rwConf.SigV4Config,
AzureADConfig: rwConf.AzureADConfig,
+ GoogleIAMConfig: rwConf.GoogleIAMConfig,
Headers: rwConf.Headers,
RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit,
})
diff --git a/tsdb/db_test.go b/tsdb/db_test.go
index c0edafe087..c8dad86990 100644
--- a/tsdb/db_test.go
+++ b/tsdb/db_test.go
@@ -63,7 +63,10 @@ func TestMain(m *testing.M) {
flag.Parse()
defaultIsolationDisabled = !isolationEnabled
- goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func1"), goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func2"))
+ goleak.VerifyTestMain(m,
+ goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func1"),
+ goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func2"),
+ goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}
func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
From 15618157321f988e069cdaa955422b24632f5743 Mon Sep 17 00:00:00 2001
From: Callum Styan
Date: Tue, 30 Jul 2024 14:08:28 -0700
Subject: [PATCH 28/49] remote write: increase time threshold for resharding
(#14450)
Don't reshard if we haven't successfully sent a sample in the last
shardUpdateDuration seconds.
Signed-off-by: Callum Styan
Co-authored-by: kushagra Shukla
---
storage/remote/queue_manager.go | 6 +++---
storage/remote/queue_manager_test.go | 13 ++++++++-----
2 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go
index 5b59288e6c..17ff1850fd 100644
--- a/storage/remote/queue_manager.go
+++ b/storage/remote/queue_manager.go
@@ -1109,9 +1109,9 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
if desiredShards == t.numShards {
return false
}
- // We shouldn't reshard if Prometheus hasn't been able to send to the
- // remote endpoint successfully within some period of time.
- minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
+ // We shouldn't reshard if Prometheus hasn't been able to send
+ // since the last time it checked if it should reshard.
+ minSendTimestamp := time.Now().Add(-1 * shardUpdateDuration).Unix()
lsts := t.lastSendTimestamp.Load()
if lsts < minSendTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go
index 7343184fc0..1c06173a59 100644
--- a/storage/remote/queue_manager_test.go
+++ b/storage/remote/queue_manager_test.go
@@ -703,32 +703,35 @@ func TestShouldReshard(t *testing.T) {
startingShards int
samplesIn, samplesOut, lastSendTimestamp int64
expectedToReshard bool
+ sendDeadline model.Duration
}
cases := []testcase{
{
- // Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago.
+ // resharding shouldn't take place if we haven't successfully sent
+ // since the last shardUpdateDuration, even if the send deadline is very low
startingShards: 10,
samplesIn: 1000,
samplesOut: 10,
- lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second),
+ lastSendTimestamp: time.Now().Unix() - int64(shardUpdateDuration),
expectedToReshard: false,
+ sendDeadline: model.Duration(100 * time.Millisecond),
},
{
- startingShards: 5,
+ startingShards: 10,
samplesIn: 1000,
samplesOut: 10,
lastSendTimestamp: time.Now().Unix(),
expectedToReshard: true,
+ sendDeadline: config.DefaultQueueConfig.BatchSendDeadline,
},
}
for _, c := range cases {
- _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
+ _, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
m.lastSendTimestamp.Store(c.lastSendTimestamp)
-
m.Start()
desiredShards := m.calculateDesiredShards()
From 2880ee8e46e2c49e5155523b30b7878d7cc65ae8 Mon Sep 17 00:00:00 2001
From: Matthieu MOREL
Date: Thu, 25 Jan 2024 07:29:48 +0100
Subject: [PATCH 29/49] chore: provide OSSF security insight
Signed-off-by: Matthieu MOREL
---
README.md | 3 ++-
SECURITY-INSIGHTS.yml | 48 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
create mode 100644 SECURITY-INSIGHTS.yml
diff --git a/README.md b/README.md
index cd14ed2ecb..df974e1097 100644
--- a/README.md
+++ b/README.md
@@ -12,9 +12,10 @@ examples and guides.
[][hub]
[](https://goreportcard.com/report/github.com/prometheus/prometheus)
[](https://bestpractices.coreinfrastructure.org/projects/486)
+[](https://securityscorecards.dev/viewer/?uri=github.com/prometheus/prometheus)
+[](https://clomonitor.io/projects/cncf/prometheus)
[](https://gitpod.io/#https://github.com/prometheus/prometheus)
[](https://bugs.chromium.org/p/oss-fuzz/issues/list?sort=-opened&can=1&q=proj:prometheus)
-[](https://securityscorecards.dev/viewer/?uri=github.com/prometheus/prometheus)
diff --git a/SECURITY-INSIGHTS.yml b/SECURITY-INSIGHTS.yml
new file mode 100644
index 0000000000..009b356214
--- /dev/null
+++ b/SECURITY-INSIGHTS.yml
@@ -0,0 +1,48 @@
+header:
+ schema-version: '1.0.0'
+ expiration-date: '2025-07-30T01:00:00.000Z'
+ last-updated: '2024-07-30'
+ last-reviewed: '2024-07-30'
+ project-url: https://github.com/prometheus/prometheus
+ changelog: https://github.com/prometheus/prometheus/blob/main/CHANGELOG.md
+ license: https://github.com/prometheus/prometheus/blob/main/LICENSE
+project-lifecycle:
+ status: active
+ bug-fixes-only: false
+ core-maintainers:
+ - https://github.com/prometheus/prometheus/blob/main/MAINTAINERS.md
+contribution-policy:
+ accepts-pull-requests: true
+ accepts-automated-pull-requests: true
+dependencies:
+ third-party-packages: true
+ dependencies-lists:
+ - https://github.com/prometheus/prometheus/blob/main/go.mod
+ - https://github.com/prometheus/prometheus/blob/main/web/ui/package.json
+ env-dependencies-policy:
+ policy-url: https://github.com/prometheus/prometheus/blob/main/CONTRIBUTING.md#dependency-management
+distribution-points:
+ - https://github.com/prometheus/prometheus/releases
+documentation:
+ - https://prometheus.io/docs/introduction/overview/
+security-contacts:
+ - type: email
+ value: prometheus-team@googlegroups.com
+security-testing:
+ - tool-type: sca
+ tool-name: Dependabot
+ tool-version: latest
+ integration:
+ ad-hoc: false
+ ci: true
+ before-release: true
+ - tool-type: sast
+ tool-name: CodeQL
+ tool-version: latest
+ integration:
+ ad-hoc: false
+ ci: true
+ before-release: true
+vulnerability-reporting:
+ accepts-vulnerability-reports: true
+ security-policy: https://github.com/prometheus/prometheus/security/policy
From 7fab72a280f139170a14e6f6a21f6396fa02899e Mon Sep 17 00:00:00 2001
From: Charles Korn
Date: Wed, 31 Jul 2024 17:53:05 +1000
Subject: [PATCH 30/49] promqltest: add support for setting counter reset hint
on histogram samples (#14537)
* promqltest: add support for setting counter reset hint on histogram samples
Signed-off-by: Charles Korn
---
docs/configuration/unit_testing_rules.md | 4 +-
promql/parser/generated_parser.y | 17 +-
promql/parser/generated_parser.y.go | 797 ++++++++++++-----------
promql/parser/lex.go | 33 +-
promql/parser/parse.go | 22 +
promql/parser/parse_test.go | 75 ++-
6 files changed, 531 insertions(+), 417 deletions(-)
diff --git a/docs/configuration/unit_testing_rules.md b/docs/configuration/unit_testing_rules.md
index 163fcb91f1..7fc676a251 100644
--- a/docs/configuration/unit_testing_rules.md
+++ b/docs/configuration/unit_testing_rules.md
@@ -92,7 +92,7 @@ series:
#
# Native histogram notation:
# Native histograms can be used instead of floating point numbers using the following notation:
-# {{schema:1 sum:-0.3 count:3.1 z_bucket:7.1 z_bucket_w:0.05 buckets:[5.1 10 7] offset:-3 n_buckets:[4.1 5] n_offset:-5}}
+# {{schema:1 sum:-0.3 count:3.1 z_bucket:7.1 z_bucket_w:0.05 buckets:[5.1 10 7] offset:-3 n_buckets:[4.1 5] n_offset:-5 counter_reset_hint:gauge}}
# Native histograms support the same expanding notation as floating point numbers, i.e. 'axn', 'a+bxn' and 'a-bxn'.
# All properties are optional and default to 0. The order is not important. The following properties are supported:
# - schema (int):
@@ -119,6 +119,8 @@ series:
# Observation counts in negative buckets. Each represents an absolute count.
# - n_offset (int):
# The starting index of the first entry in the negative buckets.
+# - counter_reset_hint (one of 'unknown', 'reset', 'not_reset' or 'gauge')
+# The counter reset hint associated with this histogram. Defaults to 'unknown' if not set.
values:
```
diff --git a/promql/parser/generated_parser.y b/promql/parser/generated_parser.y
index b99e67424f..b8e6aa373a 100644
--- a/promql/parser/generated_parser.y
+++ b/promql/parser/generated_parser.y
@@ -84,6 +84,7 @@ NEGATIVE_BUCKETS_DESC
ZERO_BUCKET_DESC
ZERO_BUCKET_WIDTH_DESC
CUSTOM_VALUES_DESC
+COUNTER_RESET_HINT_DESC
%token histogramDescEnd
// Operators.
@@ -149,6 +150,14 @@ START
END
%token preprocessorEnd
+// Counter reset hints.
+%token counterResetHintsStart
+%token -
+UNKNOWN_COUNTER_RESET
+COUNTER_RESET
+NOT_COUNTER_RESET
+GAUGE_TYPE
+%token counterResetHintsEnd
// Start symbols for the generated parser.
%token startSymbolsStart
@@ -163,7 +172,7 @@ START_METRIC_SELECTOR
// Type definitions for grammar rules.
%type label_match_list
%type label_matcher
-%type
- aggregate_op grouping_label match_op maybe_label metric_identifier unary_op at_modifier_preprocessors string_identifier
+%type
- aggregate_op grouping_label match_op maybe_label metric_identifier unary_op at_modifier_preprocessors string_identifier counter_reset_hint
%type label_set metric
%type label_set_list
%type