From 0e3f35324b46010b0f6363d01e592eebad1a80ad Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 13 Jul 2023 14:16:10 +0200 Subject: [PATCH] scrape: Enable ingestion of multiple exemplars per sample This has become a requirement for native histograms, as a single histogram sample commonly has many buckets, so that providing many exemplars makes sense. Since OM text doesn't support native histograms yet, the test had to be expanded to also support protobuf test cases. Signed-off-by: beorn7 --- model/textparse/interface.go | 4 +- model/textparse/openmetricsparse.go | 8 +- scrape/scrape.go | 2 +- scrape/scrape_test.go | 144 ++++++++++++++++++++++++++-- tsdb/exemplar.go | 4 +- 5 files changed, 147 insertions(+), 15 deletions(-) diff --git a/model/textparse/interface.go b/model/textparse/interface.go index efa581410f..38903afc96 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -59,7 +59,9 @@ type Parser interface { Metric(l *labels.Labels) string // Exemplar writes the exemplar of the current sample into the passed - // exemplar. It returns if an exemplar exists or not. + // exemplar. It can be called repeatedly to retrieve multiple exemplars + // for the same sample. It returns false once all exemplars are + // retrieved (including the case where no exemplars exist at all). Exemplar(l *exemplar.Exemplar) bool // Next advances the parser to the next sample. It returns false if no diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index c17d40020a..e0833636f1 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -174,8 +174,10 @@ func (p *OpenMetricsParser) Metric(l *labels.Labels) string { return s } -// Exemplar writes the exemplar of the current sample into the passed -// exemplar. It returns the whether an exemplar exists. +// Exemplar writes the exemplar of the current sample into the passed exemplar. +// It returns whether an exemplar exists. As OpenMetrics only ever has one +// exemplar per sample, every call after the first (for the same sample) will +// always return false. func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool { if len(p.exemplar) == 0 { return false @@ -204,6 +206,8 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool { p.builder.Sort() e.Labels = p.builder.Labels() + // Wipe exemplar so that future calls return false. + p.exemplar = p.exemplar[:0] return true } diff --git a/scrape/scrape.go b/scrape/scrape.go index ec65c5ad9b..df729b4489 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1685,7 +1685,7 @@ loop: // number of samples remaining after relabeling. added++ - if hasExemplar := p.Exemplar(&e); hasExemplar { + for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) { if !e.HasTs { e.Ts = t } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 6c45c26b42..72e22fd0dd 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -17,6 +17,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/binary" "fmt" "io" "math" @@ -29,6 +30,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -39,6 +41,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/textparse" @@ -1980,15 +1983,18 @@ func TestScrapeLoopAppendExemplar(t *testing.T) { tests := []struct { title string scrapeText string + contentType string discoveryLabels []string - samples []sample + floats []sample + histograms []histogramSample exemplars []exemplar.Exemplar }{ { title: "Metric without exemplars", scrapeText: "metric_total{n=\"1\"} 0\n# EOF", + contentType: "application/openmetrics-text", discoveryLabels: []string{"n", "2"}, - samples: []sample{{ + floats: []sample{{ metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), v: 0, }}, @@ -1996,8 +2002,9 @@ func TestScrapeLoopAppendExemplar(t *testing.T) { { title: "Metric with exemplars", scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF", + contentType: "application/openmetrics-text", discoveryLabels: []string{"n", "2"}, - samples: []sample{{ + floats: []sample{{ metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), v: 0, }}, @@ -2008,8 +2015,9 @@ func TestScrapeLoopAppendExemplar(t *testing.T) { { title: "Metric with exemplars and TS", scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF", + contentType: "application/openmetrics-text", discoveryLabels: []string{"n", "2"}, - samples: []sample{{ + floats: []sample{{ metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), v: 0, }}, @@ -2022,7 +2030,8 @@ func TestScrapeLoopAppendExemplar(t *testing.T) { scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000 metric_total{n="2"} 2 # {t="2"} 2.0 20000 # EOF`, - samples: []sample{{ + contentType: "application/openmetrics-text", + floats: []sample{{ metric: labels.FromStrings("__name__", "metric_total", "n", "1"), v: 1, }, { @@ -2034,6 +2043,104 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000 {Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true}, }, }, + { + title: "Native histogram with two exemplars", + scrapeText: `name: "test_histogram" +help: "Test histogram with many buckets removed to keep it manageable in size." +type: HISTOGRAM +metric: < + histogram: < + sample_count: 175 + sample_sum: 0.0008280461746287094 + bucket: < + cumulative_count: 2 + upper_bound: -0.0004899999999999998 + > + bucket: < + cumulative_count: 4 + upper_bound: -0.0003899999999999998 + exemplar: < + label: < + name: "dummyID" + value: "59727" + > + value: -0.00039 + timestamp: < + seconds: 1625851155 + nanos: 146848499 + > + > + > + bucket: < + cumulative_count: 16 + upper_bound: -0.0002899999999999998 + exemplar: < + label: < + name: "dummyID" + value: "5617" + > + value: -0.00029 + > + > + schema: 3 + zero_threshold: 2.938735877055719e-39 + zero_count: 2 + negative_span: < + offset: -162 + length: 1 + > + negative_span: < + offset: 23 + length: 4 + > + negative_delta: 1 + negative_delta: 3 + negative_delta: -2 + negative_delta: -1 + negative_delta: 1 + positive_span: < + offset: -161 + length: 1 + > + positive_span: < + offset: 8 + length: 3 + > + positive_delta: 1 + positive_delta: 2 + positive_delta: -1 + positive_delta: -1 + > + timestamp_ms: 1234568 +> + +`, + contentType: "application/vnd.google.protobuf", + histograms: []histogramSample{{ + t: 1234568, + h: &histogram.Histogram{ + Count: 175, + ZeroCount: 2, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + }}, + exemplars: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true}, + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false}, + }, + }, } for _, test := range tests { @@ -2069,8 +2176,8 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000 now := time.Now() - for i := range test.samples { - test.samples[i].t = timestamp.FromTime(now) + for i := range test.floats { + test.floats[i].t = timestamp.FromTime(now) } // We need to set the timestamp for expected exemplars that does not have a timestamp. @@ -2080,10 +2187,29 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000 } } - _, _, _, err := sl.append(app, []byte(test.scrapeText), "application/openmetrics-text", now) + buf := &bytes.Buffer{} + if test.contentType == "application/vnd.google.protobuf" { + // In case of protobuf, we have to create the binary representation. + pb := &dto.MetricFamily{} + // From text to proto message. + require.NoError(t, proto.UnmarshalText(test.scrapeText, pb)) + // From proto message to binary protobuf. + protoBuf, err := proto.Marshal(pb) + require.NoError(t, err) + + // Write first length, then binary protobuf. + varintBuf := binary.AppendUvarint(nil, uint64(len(protoBuf))) + buf.Write(varintBuf) + buf.Write(protoBuf) + } else { + buf.WriteString(test.scrapeText) + } + + _, _, _, err := sl.append(app, buf.Bytes(), test.contentType, now) require.NoError(t, err) require.NoError(t, app.Commit()) - require.Equal(t, test.samples, app.result) + require.Equal(t, test.floats, app.result) + require.Equal(t, test.histograms, app.resultHistograms) require.Equal(t, test.exemplars, app.resultExemplars) }) } diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index bf401da3ce..d4c0505cc1 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -365,8 +365,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp if prev := ce.exemplars[ce.nextIndex]; prev == nil { ce.exemplars[ce.nextIndex] = &circularBufferEntry{} } else { - // There exists exemplar already on this ce.nextIndex entry, drop it, to make place - // for others. + // There exists an exemplar already on this ce.nextIndex entry, + // drop it, to make place for others. var buf [1024]byte prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) if prev.next == noExemplar {