diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 2b78c6d630..b0e512fbc5 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -798,7 +798,8 @@ func (h *FloatHistogram) AllReverseBucketIterator() BucketIterator[float64] { // create false positives here. func (h *FloatHistogram) Validate() error { var nCount, pCount float64 - if h.UsesCustomBuckets() { + switch { + case IsCustomBucketsSchema(h.Schema): if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("custom buckets: %w", err) } @@ -814,7 +815,7 @@ func (h *FloatHistogram) Validate() error { if len(h.NegativeBuckets) > 0 { return errors.New("custom buckets: must not have negative buckets") } - } else { + case IsExponentialSchema(h.Schema): if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("positive side: %w", err) } @@ -828,6 +829,8 @@ func (h *FloatHistogram) Validate() error { if h.CustomValues != nil { return errors.New("histogram with exponential schema must not have custom bounds") } + default: + return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema) } err := checkHistogramBuckets(h.PositiveBuckets, &pCount, false) if err != nil { diff --git a/model/histogram/generic.go b/model/histogram/generic.go index 90a94a5600..4c0940a1f6 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -21,9 +21,11 @@ import ( ) const ( - ExponentialSchemaMax int32 = 8 - ExponentialSchemaMin int32 = -4 - CustomBucketsSchema int32 = -53 + ExponentialSchemaMax int32 = 8 + ExponentialSchemaMaxReserved int32 = 52 + ExponentialSchemaMin int32 = -4 + ExponentialSchemaMinReserved int32 = -9 + CustomBucketsSchema int32 = -53 ) var ( @@ -37,6 +39,7 @@ var ( ErrHistogramCustomBucketsInfinite = errors.New("histogram custom bounds must be finite") ErrHistogramsIncompatibleSchema = errors.New("cannot apply this operation on histograms with a mix of exponential and custom bucket schemas") ErrHistogramsIncompatibleBounds = errors.New("cannot apply this operation on custom buckets histograms with different custom bounds") + ErrHistogramsInvalidSchema = errors.New("histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets") ) func IsCustomBucketsSchema(s int32) bool { @@ -47,6 +50,10 @@ func IsExponentialSchema(s int32) bool { return s >= ExponentialSchemaMin && s <= ExponentialSchemaMax } +func IsExponentialSchemaReserved(s int32) bool { + return s >= ExponentialSchemaMinReserved && s <= ExponentialSchemaMaxReserved +} + // BucketCount is a type constraint for the count in a bucket, which can be // float64 (for type FloatHistogram) or uint64 (for type Histogram). type BucketCount interface { diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index cfb63e6341..169be9a6ac 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -425,7 +425,8 @@ func resize[T any](items []T, n int) []T { // the total h.Count). func (h *Histogram) Validate() error { var nCount, pCount uint64 - if h.UsesCustomBuckets() { + switch { + case IsCustomBucketsSchema(h.Schema): if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("custom buckets: %w", err) } @@ -441,7 +442,7 @@ func (h *Histogram) Validate() error { if len(h.NegativeBuckets) > 0 { return errors.New("custom buckets: must not have negative buckets") } - } else { + case IsExponentialSchema(h.Schema): if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("positive side: %w", err) } @@ -455,6 +456,8 @@ func (h *Histogram) Validate() error { if h.CustomValues != nil { return errors.New("histogram with exponential schema must not have custom bounds") } + default: + return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema) } err := checkHistogramBuckets(h.PositiveBuckets, &pCount, true) if err != nil { diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index edc8663c94..35603bc01c 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -1565,6 +1565,18 @@ func TestHistogramValidation(t *testing.T) { CustomValues: []float64{1, 2, 3, 4, 5, 6, 7, 8}, }, }, + "schema too high": { + h: &Histogram{ + Schema: 10, + }, + errMsg: `schema 10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`, + }, + "schema too low": { + h: &Histogram{ + Schema: -10, + }, + errMsg: `schema -10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`, + }, } for testName, tc := range tests { diff --git a/scrape/target.go b/scrape/target.go index 73fed40498..0af2b8ba14 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -414,12 +414,12 @@ type maxSchemaAppender struct { func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if h != nil { - if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema { + if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema { h = h.ReduceResolution(app.maxSchema) } } if fh != nil { - if histogram.IsExponentialSchema(fh.Schema) && fh.Schema > app.maxSchema { + if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema { fh = fh.ReduceResolution(app.maxSchema) } } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index a694d2067a..0bc8a876e4 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -86,16 +86,16 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (*histogram.Histogram, annotations.Annotations, error) { var annots annotations.Annotations scale := p.Scale() - if scale < -4 { + if scale < histogram.ExponentialSchemaMin { return nil, annots, fmt.Errorf("cannot convert exponential to native histogram."+ - " Scale must be >= -4, was %d", scale) + " Scale must be >= %d, was %d", histogram.ExponentialSchemaMin, scale) } var scaleDown int32 - if scale > 8 { - scaleDown = scale - 8 - scale = 8 + if scale > histogram.ExponentialSchemaMax { + scaleDown = scale - histogram.ExponentialSchemaMax + scale = histogram.ExponentialSchemaMax } pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 7681655e61..ad0a2a13e0 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -229,7 +229,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err samplesWithInvalidLabels := 0 samplesAppended := 0 - app := &timeLimitAppender{ + app := &remoteWriteAppender{ Appender: h.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } @@ -344,7 +344,7 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist // NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors. // Once we have 5xx type of error, we immediately stop and rollback all appends. func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ WriteResponseStats, errHTTPCode int, _ error) { - app := &timeLimitAppender{ + app := &remoteWriteAppender{ Appender: h.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } @@ -616,7 +616,7 @@ type rwExporter struct { func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { otlpCfg := rw.config().OTLPConfig - app := &timeLimitAppender{ + app := &remoteWriteAppender{ Appender: rw.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } @@ -719,13 +719,13 @@ func hasDelta(md pmetric.Metrics) bool { return false } -type timeLimitAppender struct { +type remoteWriteAppender struct { storage.Appender maxTime int64 } -func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { +func (app *remoteWriteAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } @@ -737,11 +737,18 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, return ref, nil } -func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { +func (app *remoteWriteAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } + if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax { + h = h.ReduceResolution(histogram.ExponentialSchemaMax) + } + if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax { + fh = fh.ReduceResolution(histogram.ExponentialSchemaMax) + } + ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh) if err != nil { return 0, err @@ -749,7 +756,7 @@ func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.La return ref, nil } -func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { +func (app *remoteWriteAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { if e.Ts > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5631e80732..78cbcdccf7 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -1134,3 +1134,100 @@ func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, l labels.Labels m.samples = append(m.samples, mockSample{l, ct, 0}) return storage.SeriesRef(hash), nil } + +var ( + highSchemaHistogram = &histogram.Histogram{ + Schema: 10, + PositiveSpans: []histogram.Span{ + { + Offset: -1, + Length: 2, + }, + }, + PositiveBuckets: []int64{1, 2}, + NegativeSpans: []histogram.Span{ + { + Offset: 0, + Length: 1, + }, + }, + NegativeBuckets: []int64{1}, + } + reducedSchemaHistogram = &histogram.Histogram{ + Schema: 8, + PositiveSpans: []histogram.Span{ + { + Offset: 0, + Length: 1, + }, + }, + PositiveBuckets: []int64{4}, + NegativeSpans: []histogram.Span{ + { + Offset: 0, + Length: 1, + }, + }, + NegativeBuckets: []int64{1}, + } +) + +func TestHistogramsReduction(t *testing.T) { + for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} { + t.Run(string(protoMsg), func(t *testing.T) { + appendable := &mockAppendable{} + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false) + + var ( + err error + payload []byte + ) + + if protoMsg == config.RemoteWriteProtoMsgV1 { + payload, _, _, err = buildWriteRequest(nil, []prompb.TimeSeries{ + { + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric1"}}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, highSchemaHistogram)}, + }, + { + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric2"}}, + Histograms: []prompb.Histogram{prompb.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))}, + }, + }, nil, nil, nil, nil, "snappy") + } else { + payload, _, _, err = buildV2WriteRequest(promslog.NewNopLogger(), []writev2.TimeSeries{ + { + LabelsRefs: []uint32{0, 1}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, highSchemaHistogram)}, + }, + { + LabelsRefs: []uint32{0, 2}, + Histograms: []writev2.Histogram{writev2.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))}, + }, + }, []string{"__name__", "test_metric1", "test_metric2"}, + nil, nil, nil, "snappy") + } + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(payload)) + require.NoError(t, err) + + req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[protoMsg]) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, resp.StatusCode) + require.Empty(t, body) + + require.Len(t, appendable.histograms, 2) + require.Equal(t, int64(1), appendable.histograms[0].t) + require.Equal(t, reducedSchemaHistogram, appendable.histograms[0].h) + require.Equal(t, int64(2), appendable.histograms[1].t) + require.Equal(t, reducedSchemaHistogram.ToFloat(nil), appendable.histograms[1].fh) + }) + } +}