Merge pull request #16720 from krajorama/krajo/fix-nhcb-after-exponential

fix(nhcb): not converting to NHCB after exponential native histogram encountered
This commit is contained in:
George Krajcsovits 2025-06-12 15:52:35 +02:00 committed by GitHub
commit c9d638fd8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 173 additions and 19 deletions

View File

@ -34,6 +34,7 @@ const (
stateStart collectionState = iota
stateCollecting
stateEmitting
stateInhibiting // Inhibiting NHCB, because there was an exponential histogram with the same labels.
)
// The NHCBParser wraps a Parser and converts classic histograms to native
@ -97,9 +98,8 @@ type NHCBParser struct {
// Remembers the last base histogram metric name (assuming it's
// a classic histogram) so we can tell if the next float series
// is part of the same classic histogram.
lastHistogramName string
lastHistogramLabelsHash uint64
lastHistogramExponential bool
lastHistogramName string
lastHistogramLabelsHash uint64
// Reused buffer for hashing labels.
hBuffer []byte
}
@ -162,7 +162,7 @@ func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool {
func (p *NHCBParser) CreatedTimestamp() int64 {
switch p.state {
case stateStart:
case stateStart, stateInhibiting:
if p.entry == EntrySeries || p.entry == EntryHistogram {
return p.parser.CreatedTimestamp()
}
@ -199,21 +199,34 @@ func (p *NHCBParser) Next() (Entry, error) {
case EntrySeries:
p.bytes, p.ts, p.value = p.parser.Series()
p.parser.Labels(&p.lset)
// Check the label set to see if we can continue or need to emit the NHCB.
var isNHCB bool
if p.compareLabels() {
// Labels differ. Check if we can emit the NHCB.
if p.processNHCB() {
switch p.state {
case stateCollecting:
if p.differentMetric() && p.processNHCB() {
// We are collecting classic series, but the next series
// has different type or labels. If we can convert what
// we have collected so far to NHCB, then we can return it.
return EntryHistogram, nil
}
isNHCB = p.handleClassicHistogramSeries(p.lset)
} else {
// Labels are the same. Check if after an exponential histogram.
if p.lastHistogramExponential {
isNHCB = false
} else {
case stateInhibiting:
if p.differentMetric() {
// Next has different labels than the previous exponential
// histogram so we can start collecting classic histogram
// series.
p.state = stateStart
isNHCB = p.handleClassicHistogramSeries(p.lset)
} else {
// Next has the same labels as the previous exponential
// histogram, so we are still in the inhibiting state and
// we should not convert to NHCB.
isNHCB = false
}
case stateStart:
isNHCB = p.handleClassicHistogramSeries(p.lset)
default:
// This should not happen.
return EntryInvalid, errors.New("unexpected state in NHCBParser")
}
if isNHCB && !p.keepClassicHistograms {
// Do not return the classic histogram series if it was converted to NHCB and we are not keeping classic histograms.
@ -221,6 +234,7 @@ func (p *NHCBParser) Next() (Entry, error) {
}
return p.entry, p.err
case EntryHistogram:
p.state = stateInhibiting
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
p.parser.Labels(&p.lset)
p.storeExponentialLabels()
@ -235,10 +249,7 @@ func (p *NHCBParser) Next() (Entry, error) {
}
// Return true if labels have changed and we should emit the NHCB.
func (p *NHCBParser) compareLabels() bool {
if p.state != stateCollecting {
return false
}
func (p *NHCBParser) differentMetric() bool {
if p.typ != model.MetricTypeHistogram {
// Different metric type.
return true
@ -257,13 +268,11 @@ func (p *NHCBParser) compareLabels() bool {
func (p *NHCBParser) storeClassicLabels(name string) {
p.lastHistogramName = name
p.lastHistogramLabelsHash, _ = p.lset.HashWithoutLabels(p.hBuffer, labels.BucketLabel)
p.lastHistogramExponential = false
}
func (p *NHCBParser) storeExponentialLabels() {
p.lastHistogramName = p.lset.Get(labels.MetricName)
p.lastHistogramLabelsHash, _ = p.lset.HashWithoutLabels(p.hBuffer)
p.lastHistogramExponential = true
}
// handleClassicHistogramSeries collates the classic histogram series to be converted to NHCB

View File

@ -981,3 +981,148 @@ something_bucket{a="b",le="+Inf"} 9
got := testParse(t, p)
requireEntries(t, exp, got)
}
func TestNHCBParserResetLastExponential(t *testing.T) {
testMetricFamilies := []string{`name: "test_histogram1"
help: "Test histogram 1"
type: HISTOGRAM
metric: <
histogram: <
created_timestamp: <
seconds: 1
nanos: 1
>
sample_count: 175
sample_sum: 0.0008280461746287094
schema: 3
zero_threshold: 2.938735877055719e-39
zero_count: 2
negative_span: <
offset: -162
length: 1
>
negative_span: <
offset: 23
length: 4
>
negative_delta: 1
negative_delta: 3
negative_delta: -2
negative_delta: -1
negative_delta: 1
positive_span: <
offset: -161
length: 1
>
positive_span: <
offset: 8
length: 3
>
positive_delta: 1
positive_delta: 2
positive_delta: -1
positive_delta: -1
>
timestamp_ms: 1234568
>
`, // Regression test to see that state resets after exponential native histogram.
`name: "test_histogram2"
help: "Test histogram 2"
type: HISTOGRAM
metric: <
histogram: <
created_timestamp: <
seconds: 1
nanos: 1
>
sample_count: 175
sample_sum: 0.0008280461746287094
bucket: <
cumulative_count: 2
upper_bound: -0.0004899999999999998
>
bucket: <
cumulative_count: 4
upper_bound: -0.0003899999999999998
>
bucket: <
cumulative_count: 16
upper_bound: -0.0002899999999999998
>
>
timestamp_ms: 1234568
>`}
varintBuf := make([]byte, binary.MaxVarintLen32)
buf := &bytes.Buffer{}
for _, tmf := range testMetricFamilies {
pb := &dto.MetricFamily{}
// From text to proto message.
require.NoError(t, proto.UnmarshalText(tmf, pb))
// From proto message to binary protobuf.
protoBuf, err := proto.Marshal(pb)
require.NoError(t, err)
// Write first length, then binary protobuf.
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
buf.Write(varintBuf[:varintLength])
buf.Write(protoBuf)
}
exp := []parsedEntry{
{
m: "test_histogram1",
help: "Test histogram 1",
},
{
m: "test_histogram1",
typ: model.MetricTypeHistogram,
},
// The parser should skip the series with non-cumulative buckets.
{
m: `test_histogram1`,
shs: &histogram.Histogram{
Schema: 3,
Count: 175,
Sum: 0.0008280461746287094,
ZeroThreshold: 2.938735877055719e-39,
ZeroCount: 2,
PositiveSpans: []histogram.Span{{Offset: -161, Length: 1}, {Offset: 8, Length: 3}},
NegativeSpans: []histogram.Span{{Offset: -162, Length: 1}, {Offset: 23, Length: 4}},
PositiveBuckets: []int64{1, 2, -1, -1},
NegativeBuckets: []int64{1, 3, -2, -1, 1},
},
lset: labels.FromStrings("__name__", "test_histogram1"),
t: int64p(1234568),
ct: 1000,
},
{
m: "test_histogram2",
help: "Test histogram 2",
},
{
m: "test_histogram2",
typ: model.MetricTypeHistogram,
},
{
m: "test_histogram2{}",
shs: &histogram.Histogram{
Schema: histogram.CustomBucketsSchema,
Count: 175,
Sum: 0.0008280461746287094,
PositiveSpans: []histogram.Span{{Length: 4}},
PositiveBuckets: []int64{2, 0, 10, 147},
CustomValues: []float64{-0.0004899999999999998, -0.0003899999999999998, -0.0002899999999999998},
},
lset: labels.FromStrings("__name__", "test_histogram2"),
t: int64p(1234568),
ct: 1000,
},
}
p := NewProtobufParser(buf.Bytes(), false, false, labels.NewSymbolTable())
p = NewNHCBParser(p, labels.NewSymbolTable(), false)
got := testParse(t, p)
requireEntries(t, exp, got)
}