mirror of
https://github.com/prometheus/prometheus.git
synced 2025-09-21 13:51:00 +02:00
* fix(textparse): implement NHCB parsing in ProtoBuf parser directly The NHCB conversion does some validation, but we can only return error from Parser.Next() not Parser.Histogram(). So the conversion needs to happen in Next(). There are 2 cases: 1. "always_scrape_classic_histograms" is enabled, in which case we convert after returning the classic series. This is to be consistent with the PromParser text parser, which collects NHCB while spitting out classic series; then returns the NHCB. 2. "always_scrape_classic_histograms" is disabled. In which case we never return the classic series. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> * refactor(textparse): skip classic series instead of adding NHCB around Do not return the first classic series from the EntryType state, switch to EntrySeries. This means we need to start the histogram field state from -3 , not -2. In EntrySeries, skip classic series if needed. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> * reuse nhcb converter Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> * test(textparse/nhcb): test corrupting metric family name NHCB parse doesn't always copy the metric name from the underlying parser. When called via HELP, UNIT, the string is directly referenced which means that the read-ahead of NHCB can corrupt it. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
398 lines
12 KiB
Go
398 lines
12 KiB
Go
// Copyright 2024 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package textparse
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"math"
|
|
"strconv"
|
|
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/util/convertnhcb"
|
|
)
|
|
|
|
type collectionState int
|
|
|
|
const (
|
|
stateStart collectionState = iota
|
|
stateCollecting
|
|
stateEmitting
|
|
stateInhibiting // Inhibiting NHCB, because there was an exponential histogram with the same labels.
|
|
)
|
|
|
|
// The NHCBParser wraps a Parser and converts classic histograms to native
|
|
// histograms with custom buckets.
|
|
//
|
|
// Since Parser interface is line based, this parser needs to keep track
|
|
// of the last classic histogram series it saw to collate them into a
|
|
// single native histogram.
|
|
//
|
|
// Note:
|
|
// - Only series that have the histogram metadata type are considered for
|
|
// conversion.
|
|
// - The classic series are also returned if keepClassicHistograms is true.
|
|
type NHCBParser struct {
|
|
// The parser we're wrapping.
|
|
parser Parser
|
|
// Option to keep classic histograms along with converted histograms.
|
|
keepClassicHistograms bool
|
|
|
|
// Labels builder.
|
|
builder labels.ScratchBuilder
|
|
|
|
// State of the parser.
|
|
state collectionState
|
|
|
|
// Caches the values from the underlying parser.
|
|
// For Series and Histogram.
|
|
bytes []byte
|
|
ts *int64
|
|
value float64
|
|
h *histogram.Histogram
|
|
fh *histogram.FloatHistogram
|
|
// For Metric.
|
|
lset labels.Labels
|
|
// For Type.
|
|
bName []byte
|
|
typ model.MetricType
|
|
|
|
// Caches the entry itself if we are inserting a converted NHCB
|
|
// halfway through.
|
|
entry Entry
|
|
err error
|
|
|
|
// Caches the values and metric for the inserted converted NHCB.
|
|
bytesNHCB []byte
|
|
hNHCB *histogram.Histogram
|
|
fhNHCB *histogram.FloatHistogram
|
|
lsetNHCB labels.Labels
|
|
exemplars []exemplar.Exemplar
|
|
ctNHCB int64
|
|
metricStringNHCB string
|
|
|
|
// Collates values from the classic histogram series to build
|
|
// the converted histogram later.
|
|
tempLsetNHCB labels.Labels
|
|
tempNHCB convertnhcb.TempHistogram
|
|
tempExemplars []exemplar.Exemplar
|
|
tempExemplarCount int
|
|
tempCT int64
|
|
|
|
// Remembers the last base histogram metric name (assuming it's
|
|
// a classic histogram) so we can tell if the next float series
|
|
// is part of the same classic histogram.
|
|
lastHistogramName string
|
|
lastHistogramLabelsHash uint64
|
|
// Reused buffer for hashing labels.
|
|
hBuffer []byte
|
|
}
|
|
|
|
func NewNHCBParser(p Parser, st *labels.SymbolTable, keepClassicHistograms bool) Parser {
|
|
return &NHCBParser{
|
|
parser: p,
|
|
keepClassicHistograms: keepClassicHistograms,
|
|
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
|
|
tempNHCB: convertnhcb.NewTempHistogram(),
|
|
}
|
|
}
|
|
|
|
func (p *NHCBParser) Series() ([]byte, *int64, float64) {
|
|
return p.bytes, p.ts, p.value
|
|
}
|
|
|
|
func (p *NHCBParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
|
|
if p.state == stateEmitting {
|
|
return p.bytesNHCB, p.ts, p.hNHCB, p.fhNHCB
|
|
}
|
|
return p.bytes, p.ts, p.h, p.fh
|
|
}
|
|
|
|
func (p *NHCBParser) Help() ([]byte, []byte) {
|
|
return p.parser.Help()
|
|
}
|
|
|
|
func (p *NHCBParser) Type() ([]byte, model.MetricType) {
|
|
return p.bName, p.typ
|
|
}
|
|
|
|
func (p *NHCBParser) Unit() ([]byte, []byte) {
|
|
return p.parser.Unit()
|
|
}
|
|
|
|
func (p *NHCBParser) Comment() []byte {
|
|
return p.parser.Comment()
|
|
}
|
|
|
|
func (p *NHCBParser) Labels(l *labels.Labels) {
|
|
if p.state == stateEmitting {
|
|
*l = p.lsetNHCB
|
|
return
|
|
}
|
|
*l = p.lset
|
|
}
|
|
|
|
func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool {
|
|
if p.state == stateEmitting {
|
|
if len(p.exemplars) == 0 {
|
|
return false
|
|
}
|
|
*ex = p.exemplars[0]
|
|
p.exemplars = p.exemplars[1:]
|
|
return true
|
|
}
|
|
return p.parser.Exemplar(ex)
|
|
}
|
|
|
|
func (p *NHCBParser) CreatedTimestamp() int64 {
|
|
switch p.state {
|
|
case stateStart, stateInhibiting:
|
|
if p.entry == EntrySeries || p.entry == EntryHistogram {
|
|
return p.parser.CreatedTimestamp()
|
|
}
|
|
case stateCollecting:
|
|
return p.tempCT
|
|
case stateEmitting:
|
|
return p.ctNHCB
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (p *NHCBParser) Next() (Entry, error) {
|
|
for {
|
|
if p.state == stateEmitting {
|
|
p.state = stateStart
|
|
if p.entry == EntrySeries {
|
|
isNHCB := p.handleClassicHistogramSeries(p.lset)
|
|
if isNHCB && !p.keepClassicHistograms {
|
|
// Do not return the classic histogram series if it was converted to NHCB and we are not keeping classic histograms.
|
|
continue
|
|
}
|
|
}
|
|
return p.entry, p.err
|
|
}
|
|
|
|
p.entry, p.err = p.parser.Next()
|
|
if p.err != nil {
|
|
if errors.Is(p.err, io.EOF) && p.processNHCB() {
|
|
return EntryHistogram, nil
|
|
}
|
|
return EntryInvalid, p.err
|
|
}
|
|
switch p.entry {
|
|
case EntrySeries:
|
|
p.bytes, p.ts, p.value = p.parser.Series()
|
|
p.parser.Labels(&p.lset)
|
|
var isNHCB bool
|
|
switch p.state {
|
|
case stateCollecting:
|
|
if p.differentMetric() && p.processNHCB() {
|
|
// We are collecting classic series, but the next series
|
|
// has different type or labels. If we can convert what
|
|
// we have collected so far to NHCB, then we can return it.
|
|
return EntryHistogram, nil
|
|
}
|
|
isNHCB = p.handleClassicHistogramSeries(p.lset)
|
|
case stateInhibiting:
|
|
if p.differentMetric() {
|
|
// Next has different labels than the previous exponential
|
|
// histogram so we can start collecting classic histogram
|
|
// series.
|
|
p.state = stateStart
|
|
isNHCB = p.handleClassicHistogramSeries(p.lset)
|
|
} else {
|
|
// Next has the same labels as the previous exponential
|
|
// histogram, so we are still in the inhibiting state and
|
|
// we should not convert to NHCB.
|
|
isNHCB = false
|
|
}
|
|
case stateStart:
|
|
isNHCB = p.handleClassicHistogramSeries(p.lset)
|
|
default:
|
|
// This should not happen.
|
|
return EntryInvalid, errors.New("unexpected state in NHCBParser")
|
|
}
|
|
if isNHCB && !p.keepClassicHistograms {
|
|
// Do not return the classic histogram series if it was converted to NHCB and we are not keeping classic histograms.
|
|
continue
|
|
}
|
|
return p.entry, p.err
|
|
case EntryHistogram:
|
|
p.state = stateInhibiting
|
|
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
|
|
p.parser.Labels(&p.lset)
|
|
p.storeExponentialLabels()
|
|
case EntryType:
|
|
p.bName, p.typ = p.parser.Type()
|
|
}
|
|
if p.processNHCB() {
|
|
return EntryHistogram, nil
|
|
}
|
|
return p.entry, p.err
|
|
}
|
|
}
|
|
|
|
// Return true if labels have changed and we should emit the NHCB.
|
|
func (p *NHCBParser) differentMetric() bool {
|
|
if p.typ != model.MetricTypeHistogram {
|
|
// Different metric type.
|
|
return true
|
|
}
|
|
_, name := convertnhcb.GetHistogramMetricBaseName(p.lset.Get(labels.MetricName))
|
|
if p.lastHistogramName != name {
|
|
// Different metric name.
|
|
return true
|
|
}
|
|
nextHash, _ := p.lset.HashWithoutLabels(p.hBuffer, labels.BucketLabel)
|
|
// Different label values.
|
|
return p.lastHistogramLabelsHash != nextHash
|
|
}
|
|
|
|
// Save the label set of the classic histogram without suffix and bucket `le` label.
|
|
func (p *NHCBParser) storeClassicLabels(name string) {
|
|
p.lastHistogramName = name
|
|
p.lastHistogramLabelsHash, _ = p.lset.HashWithoutLabels(p.hBuffer, labels.BucketLabel)
|
|
}
|
|
|
|
func (p *NHCBParser) storeExponentialLabels() {
|
|
p.lastHistogramName = p.lset.Get(labels.MetricName)
|
|
p.lastHistogramLabelsHash, _ = p.lset.HashWithoutLabels(p.hBuffer)
|
|
}
|
|
|
|
// handleClassicHistogramSeries collates the classic histogram series to be converted to NHCB
|
|
// if it is actually a classic histogram series (and not a normal float series) and if there
|
|
// isn't already a native histogram with the same name (assuming it is always processed
|
|
// right before the classic histograms) and returns true if the collation was done.
|
|
func (p *NHCBParser) handleClassicHistogramSeries(lset labels.Labels) bool {
|
|
if p.typ != model.MetricTypeHistogram {
|
|
return false
|
|
}
|
|
mName := lset.Get(labels.MetricName)
|
|
// Sanity check to ensure that the TYPE metadata entry name is the same as the base name.
|
|
suffixType, name := convertnhcb.GetHistogramMetricBaseName(mName)
|
|
if name != string(p.bName) {
|
|
return false
|
|
}
|
|
switch suffixType {
|
|
case convertnhcb.SuffixBucket:
|
|
if !lset.Has(labels.BucketLabel) {
|
|
// This should not really happen.
|
|
return false
|
|
}
|
|
le, err := strconv.ParseFloat(lset.Get(labels.BucketLabel), 64)
|
|
if err == nil && !math.IsNaN(le) {
|
|
p.processClassicHistogramSeries(lset, name, func(hist *convertnhcb.TempHistogram) {
|
|
_ = hist.SetBucketCount(le, p.value)
|
|
})
|
|
return true
|
|
}
|
|
case convertnhcb.SuffixCount:
|
|
p.processClassicHistogramSeries(lset, name, func(hist *convertnhcb.TempHistogram) {
|
|
_ = hist.SetCount(p.value)
|
|
})
|
|
return true
|
|
case convertnhcb.SuffixSum:
|
|
p.processClassicHistogramSeries(lset, name, func(hist *convertnhcb.TempHistogram) {
|
|
_ = hist.SetSum(p.value)
|
|
})
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *NHCBParser) processClassicHistogramSeries(lset labels.Labels, name string, updateHist func(*convertnhcb.TempHistogram)) {
|
|
if p.state != stateCollecting {
|
|
p.storeClassicLabels(name)
|
|
p.tempCT = p.parser.CreatedTimestamp()
|
|
p.state = stateCollecting
|
|
p.tempLsetNHCB = convertnhcb.GetHistogramMetricBase(lset, name)
|
|
}
|
|
p.storeExemplars()
|
|
updateHist(&p.tempNHCB)
|
|
}
|
|
|
|
func (p *NHCBParser) storeExemplars() {
|
|
for ex := p.nextExemplarPtr(); p.parser.Exemplar(ex); ex = p.nextExemplarPtr() {
|
|
p.tempExemplarCount++
|
|
}
|
|
}
|
|
|
|
func (p *NHCBParser) nextExemplarPtr() *exemplar.Exemplar {
|
|
switch {
|
|
case p.tempExemplarCount == len(p.tempExemplars)-1:
|
|
// Reuse the previously allocated exemplar, it was not filled up.
|
|
case len(p.tempExemplars) == cap(p.tempExemplars):
|
|
// Let the runtime grow the slice.
|
|
p.tempExemplars = append(p.tempExemplars, exemplar.Exemplar{})
|
|
default:
|
|
// Take the next element into use.
|
|
p.tempExemplars = p.tempExemplars[:len(p.tempExemplars)+1]
|
|
}
|
|
return &p.tempExemplars[len(p.tempExemplars)-1]
|
|
}
|
|
|
|
func (p *NHCBParser) swapExemplars() {
|
|
p.exemplars = p.tempExemplars[:p.tempExemplarCount]
|
|
p.tempExemplars = p.tempExemplars[:0]
|
|
}
|
|
|
|
// processNHCB converts the collated classic histogram series to NHCB and caches the info
|
|
// to be returned to callers. Retruns true if the conversion was successful.
|
|
func (p *NHCBParser) processNHCB() bool {
|
|
if p.state != stateCollecting {
|
|
return false
|
|
}
|
|
h, fh, err := p.tempNHCB.Convert()
|
|
if err == nil {
|
|
if h != nil {
|
|
if err := h.Validate(); err != nil {
|
|
return false
|
|
}
|
|
p.hNHCB = h
|
|
p.fhNHCB = nil
|
|
} else if fh != nil {
|
|
if err := fh.Validate(); err != nil {
|
|
return false
|
|
}
|
|
p.hNHCB = nil
|
|
p.fhNHCB = fh
|
|
}
|
|
|
|
lblsWithMetricName := p.tempLsetNHCB.DropMetricName()
|
|
// Ensure we return `metric` instead of `metric{}` for name only
|
|
// series, for consistency with wrapped parsers.
|
|
if lblsWithMetricName.IsEmpty() {
|
|
p.metricStringNHCB = p.tempLsetNHCB.Get(labels.MetricName)
|
|
} else {
|
|
p.metricStringNHCB = p.tempLsetNHCB.Get(labels.MetricName) + lblsWithMetricName.StringNoSpace()
|
|
}
|
|
|
|
p.bytesNHCB = []byte(p.metricStringNHCB)
|
|
p.lsetNHCB = p.tempLsetNHCB
|
|
p.swapExemplars()
|
|
p.ctNHCB = p.tempCT
|
|
p.state = stateEmitting
|
|
} else {
|
|
p.state = stateStart
|
|
}
|
|
p.tempNHCB.Reset()
|
|
p.tempExemplarCount = 0
|
|
p.tempCT = 0
|
|
return err == nil
|
|
}
|