From bdf547ae9c25fe95f755b53343bcaa6b3d6bbbd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Sat, 13 Sep 2025 16:25:21 +0200 Subject: [PATCH 1/5] fix(nativehistograms): validation should fail on unsupported schemas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Histogram.Validate and FloatHistogram.Validate now return error on unsupported schemas. Scrape and remote-write handler reduces the schema to the maximum allowed if it is above the maximum, but below theoretical maximum of 52. For scrape the maximum is a configuration option, for remote-write it is 8. Note: OTLP endpont already does the reduction, without checking that it is below 52 as the spec does not specify a maximum. Signed-off-by: György Krajcsovits --- model/histogram/float_histogram.go | 7 +- model/histogram/generic.go | 13 ++- model/histogram/histogram.go | 7 +- model/histogram/histogram_test.go | 12 +++ scrape/target.go | 4 +- .../prometheusremotewrite/histograms.go | 10 +- storage/remote/write_handler.go | 21 ++-- storage/remote/write_handler_test.go | 97 +++++++++++++++++++ 8 files changed, 150 insertions(+), 21 deletions(-) diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 2b78c6d630..b0e512fbc5 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -798,7 +798,8 @@ func (h *FloatHistogram) AllReverseBucketIterator() BucketIterator[float64] { // create false positives here. func (h *FloatHistogram) Validate() error { var nCount, pCount float64 - if h.UsesCustomBuckets() { + switch { + case IsCustomBucketsSchema(h.Schema): if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("custom buckets: %w", err) } @@ -814,7 +815,7 @@ func (h *FloatHistogram) Validate() error { if len(h.NegativeBuckets) > 0 { return errors.New("custom buckets: must not have negative buckets") } - } else { + case IsExponentialSchema(h.Schema): if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("positive side: %w", err) } @@ -828,6 +829,8 @@ func (h *FloatHistogram) Validate() error { if h.CustomValues != nil { return errors.New("histogram with exponential schema must not have custom bounds") } + default: + return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema) } err := checkHistogramBuckets(h.PositiveBuckets, &pCount, false) if err != nil { diff --git a/model/histogram/generic.go b/model/histogram/generic.go index 90a94a5600..4c0940a1f6 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -21,9 +21,11 @@ import ( ) const ( - ExponentialSchemaMax int32 = 8 - ExponentialSchemaMin int32 = -4 - CustomBucketsSchema int32 = -53 + ExponentialSchemaMax int32 = 8 + ExponentialSchemaMaxReserved int32 = 52 + ExponentialSchemaMin int32 = -4 + ExponentialSchemaMinReserved int32 = -9 + CustomBucketsSchema int32 = -53 ) var ( @@ -37,6 +39,7 @@ var ( ErrHistogramCustomBucketsInfinite = errors.New("histogram custom bounds must be finite") ErrHistogramsIncompatibleSchema = errors.New("cannot apply this operation on histograms with a mix of exponential and custom bucket schemas") ErrHistogramsIncompatibleBounds = errors.New("cannot apply this operation on custom buckets histograms with different custom bounds") + ErrHistogramsInvalidSchema = errors.New("histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets") ) func IsCustomBucketsSchema(s int32) bool { @@ -47,6 +50,10 @@ func IsExponentialSchema(s int32) bool { return s >= ExponentialSchemaMin && s <= ExponentialSchemaMax } +func IsExponentialSchemaReserved(s int32) bool { + return s >= ExponentialSchemaMinReserved && s <= ExponentialSchemaMaxReserved +} + // BucketCount is a type constraint for the count in a bucket, which can be // float64 (for type FloatHistogram) or uint64 (for type Histogram). type BucketCount interface { diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index cfb63e6341..169be9a6ac 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -425,7 +425,8 @@ func resize[T any](items []T, n int) []T { // the total h.Count). func (h *Histogram) Validate() error { var nCount, pCount uint64 - if h.UsesCustomBuckets() { + switch { + case IsCustomBucketsSchema(h.Schema): if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("custom buckets: %w", err) } @@ -441,7 +442,7 @@ func (h *Histogram) Validate() error { if len(h.NegativeBuckets) > 0 { return errors.New("custom buckets: must not have negative buckets") } - } else { + case IsExponentialSchema(h.Schema): if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil { return fmt.Errorf("positive side: %w", err) } @@ -455,6 +456,8 @@ func (h *Histogram) Validate() error { if h.CustomValues != nil { return errors.New("histogram with exponential schema must not have custom bounds") } + default: + return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema) } err := checkHistogramBuckets(h.PositiveBuckets, &pCount, true) if err != nil { diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index edc8663c94..35603bc01c 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -1565,6 +1565,18 @@ func TestHistogramValidation(t *testing.T) { CustomValues: []float64{1, 2, 3, 4, 5, 6, 7, 8}, }, }, + "schema too high": { + h: &Histogram{ + Schema: 10, + }, + errMsg: `schema 10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`, + }, + "schema too low": { + h: &Histogram{ + Schema: -10, + }, + errMsg: `schema -10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`, + }, } for testName, tc := range tests { diff --git a/scrape/target.go b/scrape/target.go index 73fed40498..0af2b8ba14 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -414,12 +414,12 @@ type maxSchemaAppender struct { func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if h != nil { - if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema { + if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema { h = h.ReduceResolution(app.maxSchema) } } if fh != nil { - if histogram.IsExponentialSchema(fh.Schema) && fh.Schema > app.maxSchema { + if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema { fh = fh.ReduceResolution(app.maxSchema) } } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index a694d2067a..0bc8a876e4 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -86,16 +86,16 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (*histogram.Histogram, annotations.Annotations, error) { var annots annotations.Annotations scale := p.Scale() - if scale < -4 { + if scale < histogram.ExponentialSchemaMin { return nil, annots, fmt.Errorf("cannot convert exponential to native histogram."+ - " Scale must be >= -4, was %d", scale) + " Scale must be >= %d, was %d", histogram.ExponentialSchemaMin, scale) } var scaleDown int32 - if scale > 8 { - scaleDown = scale - 8 - scale = 8 + if scale > histogram.ExponentialSchemaMax { + scaleDown = scale - histogram.ExponentialSchemaMax + scale = histogram.ExponentialSchemaMax } pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 7681655e61..ad0a2a13e0 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -229,7 +229,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err samplesWithInvalidLabels := 0 samplesAppended := 0 - app := &timeLimitAppender{ + app := &remoteWriteAppender{ Appender: h.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } @@ -344,7 +344,7 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist // NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors. // Once we have 5xx type of error, we immediately stop and rollback all appends. func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ WriteResponseStats, errHTTPCode int, _ error) { - app := &timeLimitAppender{ + app := &remoteWriteAppender{ Appender: h.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } @@ -616,7 +616,7 @@ type rwExporter struct { func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { otlpCfg := rw.config().OTLPConfig - app := &timeLimitAppender{ + app := &remoteWriteAppender{ Appender: rw.appendable.Appender(ctx), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } @@ -719,13 +719,13 @@ func hasDelta(md pmetric.Metrics) bool { return false } -type timeLimitAppender struct { +type remoteWriteAppender struct { storage.Appender maxTime int64 } -func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { +func (app *remoteWriteAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } @@ -737,11 +737,18 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, return ref, nil } -func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { +func (app *remoteWriteAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } + if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax { + h = h.ReduceResolution(histogram.ExponentialSchemaMax) + } + if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax { + fh = fh.ReduceResolution(histogram.ExponentialSchemaMax) + } + ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh) if err != nil { return 0, err @@ -749,7 +756,7 @@ func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.La return ref, nil } -func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { +func (app *remoteWriteAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { if e.Ts > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5631e80732..78cbcdccf7 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -1134,3 +1134,100 @@ func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, l labels.Labels m.samples = append(m.samples, mockSample{l, ct, 0}) return storage.SeriesRef(hash), nil } + +var ( + highSchemaHistogram = &histogram.Histogram{ + Schema: 10, + PositiveSpans: []histogram.Span{ + { + Offset: -1, + Length: 2, + }, + }, + PositiveBuckets: []int64{1, 2}, + NegativeSpans: []histogram.Span{ + { + Offset: 0, + Length: 1, + }, + }, + NegativeBuckets: []int64{1}, + } + reducedSchemaHistogram = &histogram.Histogram{ + Schema: 8, + PositiveSpans: []histogram.Span{ + { + Offset: 0, + Length: 1, + }, + }, + PositiveBuckets: []int64{4}, + NegativeSpans: []histogram.Span{ + { + Offset: 0, + Length: 1, + }, + }, + NegativeBuckets: []int64{1}, + } +) + +func TestHistogramsReduction(t *testing.T) { + for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} { + t.Run(string(protoMsg), func(t *testing.T) { + appendable := &mockAppendable{} + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false) + + var ( + err error + payload []byte + ) + + if protoMsg == config.RemoteWriteProtoMsgV1 { + payload, _, _, err = buildWriteRequest(nil, []prompb.TimeSeries{ + { + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric1"}}, + Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, highSchemaHistogram)}, + }, + { + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric2"}}, + Histograms: []prompb.Histogram{prompb.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))}, + }, + }, nil, nil, nil, nil, "snappy") + } else { + payload, _, _, err = buildV2WriteRequest(promslog.NewNopLogger(), []writev2.TimeSeries{ + { + LabelsRefs: []uint32{0, 1}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, highSchemaHistogram)}, + }, + { + LabelsRefs: []uint32{0, 2}, + Histograms: []writev2.Histogram{writev2.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))}, + }, + }, []string{"__name__", "test_metric1", "test_metric2"}, + nil, nil, nil, "snappy") + } + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(payload)) + require.NoError(t, err) + + req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[protoMsg]) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, resp.StatusCode) + require.Empty(t, body) + + require.Len(t, appendable.histograms, 2) + require.Equal(t, int64(1), appendable.histograms[0].t) + require.Equal(t, reducedSchemaHistogram, appendable.histograms[0].h) + require.Equal(t, int64(2), appendable.histograms[1].t) + require.Equal(t, reducedSchemaHistogram.ToFloat(nil), appendable.histograms[1].fh) + }) + } +} From 267be7dc20a32032481943cee2e188b505fb9a82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Thu, 18 Sep 2025 09:21:03 +0200 Subject: [PATCH 2/5] fix(chunkenc): error out when reading unknown histogram schemas from chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Otherwise higher level code like PromQL needs to constantly check if it can handle the samples. Signed-off-by: György Krajcsovits --- model/histogram/generic.go | 4 ++++ tsdb/chunkenc/float_histogram.go | 6 ++++++ tsdb/chunkenc/float_histogram_test.go | 30 +++++++++++++++++++++++++++ tsdb/chunkenc/histogram.go | 6 ++++++ tsdb/chunkenc/histogram_test.go | 30 +++++++++++++++++++++++++++ 5 files changed, 76 insertions(+) diff --git a/model/histogram/generic.go b/model/histogram/generic.go index 4c0940a1f6..b5aa19f9cb 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -54,6 +54,10 @@ func IsExponentialSchemaReserved(s int32) bool { return s >= ExponentialSchemaMinReserved && s <= ExponentialSchemaMaxReserved } +func IsValidSchema(s int32) bool { + return IsCustomBucketsSchema(s) || IsExponentialSchema(s) +} + // BucketCount is a type constraint for the count in a bucket, which can be // float64 (for type FloatHistogram) or uint64 (for type Histogram). type BucketCount interface { diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 7e9bf0085a..1626b45239 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -954,6 +954,12 @@ func (it *floatHistogramIterator) Next() ValueType { it.err = err return ValNone } + + if !histogram.IsValidSchema(schema) { + it.err = fmt.Errorf("invalid histogram schema %d", schema) + return ValNone + } + it.schema = schema it.zThreshold = zeroThreshold it.pSpans, it.nSpans = posSpans, negSpans diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index f715716b65..be5a2673e5 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -14,6 +14,7 @@ package chunkenc import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -1462,3 +1463,32 @@ func TestFloatHistogramEmptyBucketsWithGaps(t *testing.T) { require.Equal(t, ValNone, it.Next()) require.NoError(t, it.Err()) } + +func TestFloatHistogramIteratorFailIfSchemaInValid(t *testing.T) { + for _, schema := range []int32{-101, 101} { + t.Run(fmt.Sprintf("schema %d", schema), func(t *testing.T) { + h := &histogram.FloatHistogram{ + Schema: schema, + Count: 10, + Sum: 15.0, + ZeroThreshold: 1e-100, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{1, 2, 3, 4}, + } + + c := NewFloatHistogramChunk() + app, err := c.Appender() + require.NoError(t, err) + + _, _, _, err = app.AppendFloatHistogram(nil, 1, h, false) + require.NoError(t, err) + + it := c.Iterator(nil) + require.Equal(t, ValNone, it.Next()) + require.EqualError(t, it.Err(), fmt.Sprintf("invalid histogram schema %d", schema)) + }) + } +} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 4a8460fa85..deb52d83a8 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -1077,6 +1077,12 @@ func (it *histogramIterator) Next() ValueType { it.err = err return ValNone } + + if !histogram.IsValidSchema(schema) { + it.err = fmt.Errorf("invalid histogram schema %d", schema) + return ValNone + } + it.schema = schema it.zThreshold = zeroThreshold it.pSpans, it.nSpans = posSpans, negSpans diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 5c708faf5f..d2b1f593a3 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -14,6 +14,7 @@ package chunkenc import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -1818,3 +1819,32 @@ func TestIntHistogramEmptyBucketsWithGaps(t *testing.T) { require.Equal(t, ValNone, it.Next()) require.NoError(t, it.Err()) } + +func TestHistogramIteratorFailIfSchemaInValid(t *testing.T) { + for _, schema := range []int32{-101, 101} { + t.Run(fmt.Sprintf("schema %d", schema), func(t *testing.T) { + h := &histogram.Histogram{ + Schema: schema, + Count: 10, + Sum: 15.0, + ZeroThreshold: 1e-100, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, 3, 4}, + } + + c := NewHistogramChunk() + app, err := c.Appender() + require.NoError(t, err) + + _, _, _, err = app.AppendHistogram(nil, 1, h, false) + require.NoError(t, err) + + it := c.Iterator(nil) + require.Equal(t, ValNone, it.Next()) + require.EqualError(t, it.Err(), fmt.Sprintf("invalid histogram schema %d", schema)) + }) + } +} From f0a297bb7c9dd218758958885a0a8fd0605cf0ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Thu, 18 Sep 2025 11:09:45 +0200 Subject: [PATCH 3/5] fix(remote): validate native histogram schema in remote read MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When remote read returns chunks, the validation is in tsdb/chunkenc. However when it returns samples, we need to modify the iterator to validate. Signed-off-by: György Krajcsovits --- storage/remote/codec.go | 40 ++++++++++++++++++++++++++++---- storage/remote/codec_test.go | 44 ++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index a798fc128c..2042e12199 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -388,6 +388,7 @@ type concreteSeriesIterator struct { histogramsCur int curValType chunkenc.ValueType series *concreteSeries + err error } func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator { @@ -404,10 +405,14 @@ func (c *concreteSeriesIterator) reset(series *concreteSeries) { c.histogramsCur = -1 c.curValType = chunkenc.ValNone c.series = series + c.err = nil } // Seek implements storage.SeriesIterator. func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { + if c.err != nil { + return chunkenc.ValNone + } if c.floatsCur == -1 { c.floatsCur = 0 } @@ -439,7 +444,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp { c.curValType = chunkenc.ValFloat } else { - c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur]) + c.curValType = chunkenc.ValHistogram } // When the timestamps do not overlap the cursor for the non-selected sample type has advanced too // far; we decrement it back down here. @@ -453,11 +458,26 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { case c.floatsCur < len(c.series.floats): c.curValType = chunkenc.ValFloat case c.histogramsCur < len(c.series.histograms): - c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur]) + c.curValType = chunkenc.ValHistogram + } + if c.curValType == chunkenc.ValHistogram { + h := &c.series.histograms[c.histogramsCur] + c.curValType = getHistogramValType(h) + c.err = validateHistogramSchema(h) + } + if c.err != nil { + c.curValType = chunkenc.ValNone } return c.curValType } +func validateHistogramSchema(h *prompb.Histogram) error { + if histogram.IsValidSchema(h.Schema) { + return nil + } + return fmt.Errorf("invalid histogram schema %d", h.Schema) +} + func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType { if h.IsFloatHistogram() { return chunkenc.ValFloatHistogram @@ -504,6 +524,9 @@ const noTS = int64(math.MaxInt64) // Next implements chunkenc.Iterator. func (c *concreteSeriesIterator) Next() chunkenc.ValueType { + if c.err != nil { + return chunkenc.ValNone + } peekFloatTS := noTS if c.floatsCur+1 < len(c.series.floats) { peekFloatTS = c.series.floats[c.floatsCur+1].Timestamp @@ -532,12 +555,21 @@ func (c *concreteSeriesIterator) Next() chunkenc.ValueType { c.histogramsCur++ c.curValType = chunkenc.ValFloat } + + if c.curValType == chunkenc.ValHistogram { + h := &c.series.histograms[c.histogramsCur] + c.curValType = getHistogramValType(h) + c.err = validateHistogramSchema(h) + } + if c.err != nil { + c.curValType = chunkenc.ValNone + } return c.curValType } // Err implements chunkenc.Iterator. -func (*concreteSeriesIterator) Err() error { - return nil +func (c *concreteSeriesIterator) Err() error { + return c.err } // chunkedSeriesSet implements storage.SeriesSet. diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index cb54463796..396acd0a5f 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -548,6 +548,50 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Seek(1)) } +func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) { + for _, schema := range []int32{-100, 100} { + t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) { + h := prompb.FromIntHistogram(2, &testHistogram) + h.Schema = schema + fh := prompb.FromFloatHistogram(4, testHistogram.ToFloat(nil)) + fh.Schema = schema + series := &concreteSeries{ + labels: labels.FromStrings("foo", "bar"), + floats: []prompb.Sample{ + {Value: 1, Timestamp: 0}, + {Value: 2, Timestamp: 3}, + }, + histograms: []prompb.Histogram{ + h, + fh, + }, + } + it := series.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.Error(t, it.Err()) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValFloat, it.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.Error(t, it.Err()) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValNone, it.Seek(1)) + require.Error(t, it.Err()) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValFloat, it.Seek(3)) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.Error(t, it.Err()) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValNone, it.Seek(4)) + require.Error(t, it.Err()) + }) + } +} + func TestFromQueryResultWithDuplicates(t *testing.T) { ts1 := prompb.TimeSeries{ Labels: []prompb.Label{ From 5e6900558a80ce5de078f5a65a0778a828a40f2c Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Fri, 19 Sep 2025 08:58:27 +0200 Subject: [PATCH 4/5] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Björn Rabenstein Signed-off-by: George Krajcsovits --- model/histogram/generic.go | 2 +- storage/remote/codec.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/model/histogram/generic.go b/model/histogram/generic.go index b5aa19f9cb..cc1fbce685 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -39,7 +39,7 @@ var ( ErrHistogramCustomBucketsInfinite = errors.New("histogram custom bounds must be finite") ErrHistogramsIncompatibleSchema = errors.New("cannot apply this operation on histograms with a mix of exponential and custom bucket schemas") ErrHistogramsIncompatibleBounds = errors.New("cannot apply this operation on custom buckets histograms with different custom bounds") - ErrHistogramsInvalidSchema = errors.New("histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets") + ErrHistogramsInvalidSchema = fmt.Errorf("histogram has an invalid schema, which must be between %d and %d for exponential buckets, or %d for custom buckets", ExponentialSchemaMin, ExponentialSchemaMax, CustomBucketsSchema) ) func IsCustomBucketsSchema(s int32) bool { diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 2042e12199..87b7ff2290 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -475,7 +475,7 @@ func validateHistogramSchema(h *prompb.Histogram) error { if histogram.IsValidSchema(h.Schema) { return nil } - return fmt.Errorf("invalid histogram schema %d", h.Schema) + return return fmt.Errorf("%w, got schema %d", h.Schema, histogram.ErrHistogramsInvalidSchema) } func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType { From 5b39b79f5a41a25c70a9efd842ca21e55c888649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Fri, 19 Sep 2025 09:26:34 +0200 Subject: [PATCH 5/5] refactor error creation and tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: György Krajcsovits --- model/histogram/float_histogram.go | 2 +- model/histogram/generic.go | 4 ++++ model/histogram/histogram.go | 2 +- model/histogram/histogram_test.go | 4 ++-- storage/remote/codec.go | 2 +- tsdb/chunkenc/float_histogram.go | 2 +- tsdb/chunkenc/float_histogram_test.go | 2 +- tsdb/chunkenc/histogram.go | 2 +- tsdb/chunkenc/histogram_test.go | 2 +- 9 files changed, 13 insertions(+), 9 deletions(-) diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index b0e512fbc5..60d1575125 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -830,7 +830,7 @@ func (h *FloatHistogram) Validate() error { return errors.New("histogram with exponential schema must not have custom bounds") } default: - return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema) + return InvalidSchemaError(h.Schema) } err := checkHistogramBuckets(h.PositiveBuckets, &pCount, false) if err != nil { diff --git a/model/histogram/generic.go b/model/histogram/generic.go index cc1fbce685..1788da7fc1 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -42,6 +42,10 @@ var ( ErrHistogramsInvalidSchema = fmt.Errorf("histogram has an invalid schema, which must be between %d and %d for exponential buckets, or %d for custom buckets", ExponentialSchemaMin, ExponentialSchemaMax, CustomBucketsSchema) ) +func InvalidSchemaError(s int32) error { + return fmt.Errorf("%w, got schema %d", ErrHistogramsInvalidSchema, s) +} + func IsCustomBucketsSchema(s int32) bool { return s == CustomBucketsSchema } diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 169be9a6ac..ca5260c75e 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -457,7 +457,7 @@ func (h *Histogram) Validate() error { return errors.New("histogram with exponential schema must not have custom bounds") } default: - return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema) + return InvalidSchemaError(h.Schema) } err := checkHistogramBuckets(h.PositiveBuckets, &pCount, true) if err != nil { diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index 35603bc01c..52a3392154 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -1569,13 +1569,13 @@ func TestHistogramValidation(t *testing.T) { h: &Histogram{ Schema: 10, }, - errMsg: `schema 10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`, + errMsg: `histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets, got schema 10`, }, "schema too low": { h: &Histogram{ Schema: -10, }, - errMsg: `schema -10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`, + errMsg: `histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets, got schema -10`, }, } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 87b7ff2290..25e5ba04b6 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -475,7 +475,7 @@ func validateHistogramSchema(h *prompb.Histogram) error { if histogram.IsValidSchema(h.Schema) { return nil } - return return fmt.Errorf("%w, got schema %d", h.Schema, histogram.ErrHistogramsInvalidSchema) + return histogram.InvalidSchemaError(h.Schema) } func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType { diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 1626b45239..13faf9961a 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -956,7 +956,7 @@ func (it *floatHistogramIterator) Next() ValueType { } if !histogram.IsValidSchema(schema) { - it.err = fmt.Errorf("invalid histogram schema %d", schema) + it.err = histogram.InvalidSchemaError(schema) return ValNone } diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index be5a2673e5..a9813d2c64 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -1488,7 +1488,7 @@ func TestFloatHistogramIteratorFailIfSchemaInValid(t *testing.T) { it := c.Iterator(nil) require.Equal(t, ValNone, it.Next()) - require.EqualError(t, it.Err(), fmt.Sprintf("invalid histogram schema %d", schema)) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramsInvalidSchema) }) } } diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index deb52d83a8..194b67962f 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -1079,7 +1079,7 @@ func (it *histogramIterator) Next() ValueType { } if !histogram.IsValidSchema(schema) { - it.err = fmt.Errorf("invalid histogram schema %d", schema) + it.err = histogram.InvalidSchemaError(schema) return ValNone } diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index d2b1f593a3..b191960d89 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -1844,7 +1844,7 @@ func TestHistogramIteratorFailIfSchemaInValid(t *testing.T) { it := c.Iterator(nil) require.Equal(t, ValNone, it.Next()) - require.EqualError(t, it.Err(), fmt.Sprintf("invalid histogram schema %d", schema)) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramsInvalidSchema) }) } }