diff --git a/model/textparse/nhcbparse.go b/model/textparse/nhcbparse.go index ea4941f2e2..e7cfcc028e 100644 --- a/model/textparse/nhcbparse.go +++ b/model/textparse/nhcbparse.go @@ -34,6 +34,7 @@ const ( stateStart collectionState = iota stateCollecting stateEmitting + stateInhibiting // Inhibiting NHCB, because there was an exponential histogram with the same labels. ) // The NHCBParser wraps a Parser and converts classic histograms to native @@ -97,9 +98,8 @@ type NHCBParser struct { // Remembers the last base histogram metric name (assuming it's // a classic histogram) so we can tell if the next float series // is part of the same classic histogram. - lastHistogramName string - lastHistogramLabelsHash uint64 - lastHistogramExponential bool + lastHistogramName string + lastHistogramLabelsHash uint64 // Reused buffer for hashing labels. hBuffer []byte } @@ -162,7 +162,7 @@ func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool { func (p *NHCBParser) CreatedTimestamp() int64 { switch p.state { - case stateStart: + case stateStart, stateInhibiting: if p.entry == EntrySeries || p.entry == EntryHistogram { return p.parser.CreatedTimestamp() } @@ -199,21 +199,34 @@ func (p *NHCBParser) Next() (Entry, error) { case EntrySeries: p.bytes, p.ts, p.value = p.parser.Series() p.parser.Labels(&p.lset) - // Check the label set to see if we can continue or need to emit the NHCB. var isNHCB bool - if p.compareLabels() { - // Labels differ. Check if we can emit the NHCB. - if p.processNHCB() { + switch p.state { + case stateCollecting: + if p.differentMetric() && p.processNHCB() { + // We are collecting classic series, but the next series + // has different type or labels. If we can convert what + // we have collected so far to NHCB, then we can return it. return EntryHistogram, nil } isNHCB = p.handleClassicHistogramSeries(p.lset) - } else { - // Labels are the same. Check if after an exponential histogram. - if p.lastHistogramExponential { - isNHCB = false - } else { + case stateInhibiting: + if p.differentMetric() { + // Next has different labels than the previous exponential + // histogram so we can start collecting classic histogram + // series. + p.state = stateStart isNHCB = p.handleClassicHistogramSeries(p.lset) + } else { + // Next has the same labels as the previous exponential + // histogram, so we are still in the inhibiting state and + // we should not convert to NHCB. + isNHCB = false } + case stateStart: + isNHCB = p.handleClassicHistogramSeries(p.lset) + default: + // This should not happen. + return EntryInvalid, errors.New("unexpected state in NHCBParser") } if isNHCB && !p.keepClassicHistograms { // Do not return the classic histogram series if it was converted to NHCB and we are not keeping classic histograms. @@ -221,6 +234,7 @@ func (p *NHCBParser) Next() (Entry, error) { } return p.entry, p.err case EntryHistogram: + p.state = stateInhibiting p.bytes, p.ts, p.h, p.fh = p.parser.Histogram() p.parser.Labels(&p.lset) p.storeExponentialLabels() @@ -235,10 +249,7 @@ func (p *NHCBParser) Next() (Entry, error) { } // Return true if labels have changed and we should emit the NHCB. -func (p *NHCBParser) compareLabels() bool { - if p.state != stateCollecting { - return false - } +func (p *NHCBParser) differentMetric() bool { if p.typ != model.MetricTypeHistogram { // Different metric type. return true @@ -257,13 +268,11 @@ func (p *NHCBParser) compareLabels() bool { func (p *NHCBParser) storeClassicLabels(name string) { p.lastHistogramName = name p.lastHistogramLabelsHash, _ = p.lset.HashWithoutLabels(p.hBuffer, labels.BucketLabel) - p.lastHistogramExponential = false } func (p *NHCBParser) storeExponentialLabels() { p.lastHistogramName = p.lset.Get(labels.MetricName) p.lastHistogramLabelsHash, _ = p.lset.HashWithoutLabels(p.hBuffer) - p.lastHistogramExponential = true } // handleClassicHistogramSeries collates the classic histogram series to be converted to NHCB diff --git a/model/textparse/nhcbparse_test.go b/model/textparse/nhcbparse_test.go index 167ffc8109..61ef1df2b1 100644 --- a/model/textparse/nhcbparse_test.go +++ b/model/textparse/nhcbparse_test.go @@ -981,3 +981,148 @@ something_bucket{a="b",le="+Inf"} 9 got := testParse(t, p) requireEntries(t, exp, got) } + +func TestNHCBParserResetLastExponential(t *testing.T) { + testMetricFamilies := []string{`name: "test_histogram1" +help: "Test histogram 1" +type: HISTOGRAM +metric: < + histogram: < + created_timestamp: < + seconds: 1 + nanos: 1 + > + sample_count: 175 + sample_sum: 0.0008280461746287094 + schema: 3 + zero_threshold: 2.938735877055719e-39 + zero_count: 2 + negative_span: < + offset: -162 + length: 1 + > + negative_span: < + offset: 23 + length: 4 + > + negative_delta: 1 + negative_delta: 3 + negative_delta: -2 + negative_delta: -1 + negative_delta: 1 + positive_span: < + offset: -161 + length: 1 + > + positive_span: < + offset: 8 + length: 3 + > + positive_delta: 1 + positive_delta: 2 + positive_delta: -1 + positive_delta: -1 + > + timestamp_ms: 1234568 +> +`, // Regression test to see that state resets after exponential native histogram. + `name: "test_histogram2" +help: "Test histogram 2" +type: HISTOGRAM +metric: < + histogram: < + created_timestamp: < + seconds: 1 + nanos: 1 + > + sample_count: 175 + sample_sum: 0.0008280461746287094 + bucket: < + cumulative_count: 2 + upper_bound: -0.0004899999999999998 + > + bucket: < + cumulative_count: 4 + upper_bound: -0.0003899999999999998 + > + bucket: < + cumulative_count: 16 + upper_bound: -0.0002899999999999998 + > + > + timestamp_ms: 1234568 +>`} + + varintBuf := make([]byte, binary.MaxVarintLen32) + buf := &bytes.Buffer{} + + for _, tmf := range testMetricFamilies { + pb := &dto.MetricFamily{} + // From text to proto message. + require.NoError(t, proto.UnmarshalText(tmf, pb)) + // From proto message to binary protobuf. + protoBuf, err := proto.Marshal(pb) + require.NoError(t, err) + + // Write first length, then binary protobuf. + varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf))) + buf.Write(varintBuf[:varintLength]) + buf.Write(protoBuf) + } + + exp := []parsedEntry{ + { + m: "test_histogram1", + help: "Test histogram 1", + }, + { + m: "test_histogram1", + typ: model.MetricTypeHistogram, + }, + // The parser should skip the series with non-cumulative buckets. + { + m: `test_histogram1`, + shs: &histogram.Histogram{ + Schema: 3, + Count: 175, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + ZeroCount: 2, + PositiveSpans: []histogram.Span{{Offset: -161, Length: 1}, {Offset: 8, Length: 3}}, + NegativeSpans: []histogram.Span{{Offset: -162, Length: 1}, {Offset: 23, Length: 4}}, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + lset: labels.FromStrings("__name__", "test_histogram1"), + t: int64p(1234568), + ct: 1000, + }, + { + m: "test_histogram2", + help: "Test histogram 2", + }, + { + m: "test_histogram2", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram2{}", + shs: &histogram.Histogram{ + Schema: histogram.CustomBucketsSchema, + Count: 175, + Sum: 0.0008280461746287094, + PositiveSpans: []histogram.Span{{Length: 4}}, + PositiveBuckets: []int64{2, 0, 10, 147}, + CustomValues: []float64{-0.0004899999999999998, -0.0003899999999999998, -0.0002899999999999998}, + }, + lset: labels.FromStrings("__name__", "test_histogram2"), + t: int64p(1234568), + ct: 1000, + }, + } + + p := NewProtobufParser(buf.Bytes(), false, false, labels.NewSymbolTable()) + p = NewNHCBParser(p, labels.NewSymbolTable(), false) + got := testParse(t, p) + requireEntries(t, exp, got) +}