mirror of
https://github.com/prometheus/prometheus.git
synced 2025-10-29 15:31:00 +01:00
Merge pull request #17189 from prometheus/krajo/native-histogram-schema-validation
fix(nativehistograms): validation should fail on unsupported schemas
This commit is contained in:
commit
112f91803c
@ -798,7 +798,8 @@ func (h *FloatHistogram) AllReverseBucketIterator() BucketIterator[float64] {
|
|||||||
// create false positives here.
|
// create false positives here.
|
||||||
func (h *FloatHistogram) Validate() error {
|
func (h *FloatHistogram) Validate() error {
|
||||||
var nCount, pCount float64
|
var nCount, pCount float64
|
||||||
if h.UsesCustomBuckets() {
|
switch {
|
||||||
|
case IsCustomBucketsSchema(h.Schema):
|
||||||
if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||||
return fmt.Errorf("custom buckets: %w", err)
|
return fmt.Errorf("custom buckets: %w", err)
|
||||||
}
|
}
|
||||||
@ -814,7 +815,7 @@ func (h *FloatHistogram) Validate() error {
|
|||||||
if len(h.NegativeBuckets) > 0 {
|
if len(h.NegativeBuckets) > 0 {
|
||||||
return ErrHistogramCustomBucketsNegBuckets
|
return ErrHistogramCustomBucketsNegBuckets
|
||||||
}
|
}
|
||||||
} else {
|
case IsExponentialSchema(h.Schema):
|
||||||
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||||
return fmt.Errorf("positive side: %w", err)
|
return fmt.Errorf("positive side: %w", err)
|
||||||
}
|
}
|
||||||
@ -828,6 +829,8 @@ func (h *FloatHistogram) Validate() error {
|
|||||||
if h.CustomValues != nil {
|
if h.CustomValues != nil {
|
||||||
return ErrHistogramExpSchemaCustomBounds
|
return ErrHistogramExpSchemaCustomBounds
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
return InvalidSchemaError(h.Schema)
|
||||||
}
|
}
|
||||||
err := checkHistogramBuckets(h.PositiveBuckets, &pCount, false)
|
err := checkHistogramBuckets(h.PositiveBuckets, &pCount, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -21,9 +21,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ExponentialSchemaMax int32 = 8
|
ExponentialSchemaMax int32 = 8
|
||||||
ExponentialSchemaMin int32 = -4
|
ExponentialSchemaMaxReserved int32 = 52
|
||||||
CustomBucketsSchema int32 = -53
|
ExponentialSchemaMin int32 = -4
|
||||||
|
ExponentialSchemaMinReserved int32 = -9
|
||||||
|
CustomBucketsSchema int32 = -53
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -42,8 +44,13 @@ var (
|
|||||||
ErrHistogramCustomBucketsNegSpans = errors.New("custom buckets: must not have negative spans")
|
ErrHistogramCustomBucketsNegSpans = errors.New("custom buckets: must not have negative spans")
|
||||||
ErrHistogramCustomBucketsNegBuckets = errors.New("custom buckets: must not have negative buckets")
|
ErrHistogramCustomBucketsNegBuckets = errors.New("custom buckets: must not have negative buckets")
|
||||||
ErrHistogramExpSchemaCustomBounds = errors.New("histogram with exponential schema must not have custom bounds")
|
ErrHistogramExpSchemaCustomBounds = errors.New("histogram with exponential schema must not have custom bounds")
|
||||||
|
ErrHistogramsInvalidSchema = fmt.Errorf("histogram has an invalid schema, which must be between %d and %d for exponential buckets, or %d for custom buckets", ExponentialSchemaMin, ExponentialSchemaMax, CustomBucketsSchema)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func InvalidSchemaError(s int32) error {
|
||||||
|
return fmt.Errorf("%w, got schema %d", ErrHistogramsInvalidSchema, s)
|
||||||
|
}
|
||||||
|
|
||||||
func IsCustomBucketsSchema(s int32) bool {
|
func IsCustomBucketsSchema(s int32) bool {
|
||||||
return s == CustomBucketsSchema
|
return s == CustomBucketsSchema
|
||||||
}
|
}
|
||||||
@ -52,6 +59,14 @@ func IsExponentialSchema(s int32) bool {
|
|||||||
return s >= ExponentialSchemaMin && s <= ExponentialSchemaMax
|
return s >= ExponentialSchemaMin && s <= ExponentialSchemaMax
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsExponentialSchemaReserved(s int32) bool {
|
||||||
|
return s >= ExponentialSchemaMinReserved && s <= ExponentialSchemaMaxReserved
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsValidSchema(s int32) bool {
|
||||||
|
return IsCustomBucketsSchema(s) || IsExponentialSchema(s)
|
||||||
|
}
|
||||||
|
|
||||||
// BucketCount is a type constraint for the count in a bucket, which can be
|
// BucketCount is a type constraint for the count in a bucket, which can be
|
||||||
// float64 (for type FloatHistogram) or uint64 (for type Histogram).
|
// float64 (for type FloatHistogram) or uint64 (for type Histogram).
|
||||||
type BucketCount interface {
|
type BucketCount interface {
|
||||||
|
|||||||
@ -424,7 +424,8 @@ func resize[T any](items []T, n int) []T {
|
|||||||
// the total h.Count).
|
// the total h.Count).
|
||||||
func (h *Histogram) Validate() error {
|
func (h *Histogram) Validate() error {
|
||||||
var nCount, pCount uint64
|
var nCount, pCount uint64
|
||||||
if h.UsesCustomBuckets() {
|
switch {
|
||||||
|
case IsCustomBucketsSchema(h.Schema):
|
||||||
if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||||
return fmt.Errorf("custom buckets: %w", err)
|
return fmt.Errorf("custom buckets: %w", err)
|
||||||
}
|
}
|
||||||
@ -440,7 +441,7 @@ func (h *Histogram) Validate() error {
|
|||||||
if len(h.NegativeBuckets) > 0 {
|
if len(h.NegativeBuckets) > 0 {
|
||||||
return ErrHistogramCustomBucketsNegBuckets
|
return ErrHistogramCustomBucketsNegBuckets
|
||||||
}
|
}
|
||||||
} else {
|
case IsExponentialSchema(h.Schema):
|
||||||
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||||
return fmt.Errorf("positive side: %w", err)
|
return fmt.Errorf("positive side: %w", err)
|
||||||
}
|
}
|
||||||
@ -454,6 +455,8 @@ func (h *Histogram) Validate() error {
|
|||||||
if h.CustomValues != nil {
|
if h.CustomValues != nil {
|
||||||
return ErrHistogramExpSchemaCustomBounds
|
return ErrHistogramExpSchemaCustomBounds
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
return InvalidSchemaError(h.Schema)
|
||||||
}
|
}
|
||||||
err := checkHistogramBuckets(h.PositiveBuckets, &pCount, true)
|
err := checkHistogramBuckets(h.PositiveBuckets, &pCount, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -1565,6 +1565,18 @@ func TestHistogramValidation(t *testing.T) {
|
|||||||
CustomValues: []float64{1, 2, 3, 4, 5, 6, 7, 8},
|
CustomValues: []float64{1, 2, 3, 4, 5, 6, 7, 8},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"schema too high": {
|
||||||
|
h: &Histogram{
|
||||||
|
Schema: 10,
|
||||||
|
},
|
||||||
|
errMsg: `histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets, got schema 10`,
|
||||||
|
},
|
||||||
|
"schema too low": {
|
||||||
|
h: &Histogram{
|
||||||
|
Schema: -10,
|
||||||
|
},
|
||||||
|
errMsg: `histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets, got schema -10`,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for testName, tc := range tests {
|
for testName, tc := range tests {
|
||||||
|
|||||||
@ -414,12 +414,12 @@ type maxSchemaAppender struct {
|
|||||||
|
|
||||||
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
if h != nil {
|
if h != nil {
|
||||||
if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema {
|
if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema {
|
||||||
h = h.ReduceResolution(app.maxSchema)
|
h = h.ReduceResolution(app.maxSchema)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if fh != nil {
|
if fh != nil {
|
||||||
if histogram.IsExponentialSchema(fh.Schema) && fh.Schema > app.maxSchema {
|
if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema {
|
||||||
fh = fh.ReduceResolution(app.maxSchema)
|
fh = fh.ReduceResolution(app.maxSchema)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -388,6 +388,7 @@ type concreteSeriesIterator struct {
|
|||||||
histogramsCur int
|
histogramsCur int
|
||||||
curValType chunkenc.ValueType
|
curValType chunkenc.ValueType
|
||||||
series *concreteSeries
|
series *concreteSeries
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator {
|
func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator {
|
||||||
@ -404,10 +405,14 @@ func (c *concreteSeriesIterator) reset(series *concreteSeries) {
|
|||||||
c.histogramsCur = -1
|
c.histogramsCur = -1
|
||||||
c.curValType = chunkenc.ValNone
|
c.curValType = chunkenc.ValNone
|
||||||
c.series = series
|
c.series = series
|
||||||
|
c.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements storage.SeriesIterator.
|
// Seek implements storage.SeriesIterator.
|
||||||
func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
||||||
|
if c.err != nil {
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
if c.floatsCur == -1 {
|
if c.floatsCur == -1 {
|
||||||
c.floatsCur = 0
|
c.floatsCur = 0
|
||||||
}
|
}
|
||||||
@ -439,7 +444,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
|||||||
if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp {
|
if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp {
|
||||||
c.curValType = chunkenc.ValFloat
|
c.curValType = chunkenc.ValFloat
|
||||||
} else {
|
} else {
|
||||||
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur])
|
c.curValType = chunkenc.ValHistogram
|
||||||
}
|
}
|
||||||
// When the timestamps do not overlap the cursor for the non-selected sample type has advanced too
|
// When the timestamps do not overlap the cursor for the non-selected sample type has advanced too
|
||||||
// far; we decrement it back down here.
|
// far; we decrement it back down here.
|
||||||
@ -453,11 +458,26 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
|||||||
case c.floatsCur < len(c.series.floats):
|
case c.floatsCur < len(c.series.floats):
|
||||||
c.curValType = chunkenc.ValFloat
|
c.curValType = chunkenc.ValFloat
|
||||||
case c.histogramsCur < len(c.series.histograms):
|
case c.histogramsCur < len(c.series.histograms):
|
||||||
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur])
|
c.curValType = chunkenc.ValHistogram
|
||||||
|
}
|
||||||
|
if c.curValType == chunkenc.ValHistogram {
|
||||||
|
h := &c.series.histograms[c.histogramsCur]
|
||||||
|
c.curValType = getHistogramValType(h)
|
||||||
|
c.err = validateHistogramSchema(h)
|
||||||
|
}
|
||||||
|
if c.err != nil {
|
||||||
|
c.curValType = chunkenc.ValNone
|
||||||
}
|
}
|
||||||
return c.curValType
|
return c.curValType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateHistogramSchema(h *prompb.Histogram) error {
|
||||||
|
if histogram.IsValidSchema(h.Schema) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return histogram.InvalidSchemaError(h.Schema)
|
||||||
|
}
|
||||||
|
|
||||||
func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType {
|
func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType {
|
||||||
if h.IsFloatHistogram() {
|
if h.IsFloatHistogram() {
|
||||||
return chunkenc.ValFloatHistogram
|
return chunkenc.ValFloatHistogram
|
||||||
@ -504,6 +524,9 @@ const noTS = int64(math.MaxInt64)
|
|||||||
|
|
||||||
// Next implements chunkenc.Iterator.
|
// Next implements chunkenc.Iterator.
|
||||||
func (c *concreteSeriesIterator) Next() chunkenc.ValueType {
|
func (c *concreteSeriesIterator) Next() chunkenc.ValueType {
|
||||||
|
if c.err != nil {
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
peekFloatTS := noTS
|
peekFloatTS := noTS
|
||||||
if c.floatsCur+1 < len(c.series.floats) {
|
if c.floatsCur+1 < len(c.series.floats) {
|
||||||
peekFloatTS = c.series.floats[c.floatsCur+1].Timestamp
|
peekFloatTS = c.series.floats[c.floatsCur+1].Timestamp
|
||||||
@ -532,12 +555,21 @@ func (c *concreteSeriesIterator) Next() chunkenc.ValueType {
|
|||||||
c.histogramsCur++
|
c.histogramsCur++
|
||||||
c.curValType = chunkenc.ValFloat
|
c.curValType = chunkenc.ValFloat
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.curValType == chunkenc.ValHistogram {
|
||||||
|
h := &c.series.histograms[c.histogramsCur]
|
||||||
|
c.curValType = getHistogramValType(h)
|
||||||
|
c.err = validateHistogramSchema(h)
|
||||||
|
}
|
||||||
|
if c.err != nil {
|
||||||
|
c.curValType = chunkenc.ValNone
|
||||||
|
}
|
||||||
return c.curValType
|
return c.curValType
|
||||||
}
|
}
|
||||||
|
|
||||||
// Err implements chunkenc.Iterator.
|
// Err implements chunkenc.Iterator.
|
||||||
func (*concreteSeriesIterator) Err() error {
|
func (c *concreteSeriesIterator) Err() error {
|
||||||
return nil
|
return c.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// chunkedSeriesSet implements storage.SeriesSet.
|
// chunkedSeriesSet implements storage.SeriesSet.
|
||||||
|
|||||||
@ -548,6 +548,50 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
|
|||||||
require.Equal(t, chunkenc.ValNone, it.Seek(1))
|
require.Equal(t, chunkenc.ValNone, it.Seek(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) {
|
||||||
|
for _, schema := range []int32{-100, 100} {
|
||||||
|
t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) {
|
||||||
|
h := prompb.FromIntHistogram(2, &testHistogram)
|
||||||
|
h.Schema = schema
|
||||||
|
fh := prompb.FromFloatHistogram(4, testHistogram.ToFloat(nil))
|
||||||
|
fh.Schema = schema
|
||||||
|
series := &concreteSeries{
|
||||||
|
labels: labels.FromStrings("foo", "bar"),
|
||||||
|
floats: []prompb.Sample{
|
||||||
|
{Value: 1, Timestamp: 0},
|
||||||
|
{Value: 2, Timestamp: 3},
|
||||||
|
},
|
||||||
|
histograms: []prompb.Histogram{
|
||||||
|
h,
|
||||||
|
fh,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
it := series.Iterator(nil)
|
||||||
|
require.Equal(t, chunkenc.ValFloat, it.Next())
|
||||||
|
require.Equal(t, chunkenc.ValNone, it.Next())
|
||||||
|
require.Error(t, it.Err())
|
||||||
|
|
||||||
|
it = series.Iterator(it)
|
||||||
|
require.Equal(t, chunkenc.ValFloat, it.Next())
|
||||||
|
require.Equal(t, chunkenc.ValNone, it.Next())
|
||||||
|
require.Error(t, it.Err())
|
||||||
|
|
||||||
|
it = series.Iterator(it)
|
||||||
|
require.Equal(t, chunkenc.ValNone, it.Seek(1))
|
||||||
|
require.Error(t, it.Err())
|
||||||
|
|
||||||
|
it = series.Iterator(it)
|
||||||
|
require.Equal(t, chunkenc.ValFloat, it.Seek(3))
|
||||||
|
require.Equal(t, chunkenc.ValNone, it.Next())
|
||||||
|
require.Error(t, it.Err())
|
||||||
|
|
||||||
|
it = series.Iterator(it)
|
||||||
|
require.Equal(t, chunkenc.ValNone, it.Seek(4))
|
||||||
|
require.Error(t, it.Err())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFromQueryResultWithDuplicates(t *testing.T) {
|
func TestFromQueryResultWithDuplicates(t *testing.T) {
|
||||||
ts1 := prompb.TimeSeries{
|
ts1 := prompb.TimeSeries{
|
||||||
Labels: []prompb.Label{
|
Labels: []prompb.Label{
|
||||||
|
|||||||
@ -86,16 +86,16 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
|
|||||||
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (*histogram.Histogram, annotations.Annotations, error) {
|
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (*histogram.Histogram, annotations.Annotations, error) {
|
||||||
var annots annotations.Annotations
|
var annots annotations.Annotations
|
||||||
scale := p.Scale()
|
scale := p.Scale()
|
||||||
if scale < -4 {
|
if scale < histogram.ExponentialSchemaMin {
|
||||||
return nil, annots,
|
return nil, annots,
|
||||||
fmt.Errorf("cannot convert exponential to native histogram."+
|
fmt.Errorf("cannot convert exponential to native histogram."+
|
||||||
" Scale must be >= -4, was %d", scale)
|
" Scale must be >= %d, was %d", histogram.ExponentialSchemaMin, scale)
|
||||||
}
|
}
|
||||||
|
|
||||||
var scaleDown int32
|
var scaleDown int32
|
||||||
if scale > 8 {
|
if scale > histogram.ExponentialSchemaMax {
|
||||||
scaleDown = scale - 8
|
scaleDown = scale - histogram.ExponentialSchemaMax
|
||||||
scale = 8
|
scale = histogram.ExponentialSchemaMax
|
||||||
}
|
}
|
||||||
|
|
||||||
pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true)
|
pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true)
|
||||||
|
|||||||
@ -250,7 +250,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
|||||||
samplesWithInvalidLabels := 0
|
samplesWithInvalidLabels := 0
|
||||||
samplesAppended := 0
|
samplesAppended := 0
|
||||||
|
|
||||||
app := &timeLimitAppender{
|
app := &remoteWriteAppender{
|
||||||
Appender: h.appendable.Appender(ctx),
|
Appender: h.appendable.Appender(ctx),
|
||||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||||
}
|
}
|
||||||
@ -365,7 +365,7 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist
|
|||||||
// NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors.
|
// NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors.
|
||||||
// Once we have 5xx type of error, we immediately stop and rollback all appends.
|
// Once we have 5xx type of error, we immediately stop and rollback all appends.
|
||||||
func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ WriteResponseStats, errHTTPCode int, _ error) {
|
func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ WriteResponseStats, errHTTPCode int, _ error) {
|
||||||
app := &timeLimitAppender{
|
app := &remoteWriteAppender{
|
||||||
Appender: h.appendable.Appender(ctx),
|
Appender: h.appendable.Appender(ctx),
|
||||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||||
}
|
}
|
||||||
@ -642,7 +642,7 @@ type rwExporter struct {
|
|||||||
|
|
||||||
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||||
otlpCfg := rw.config().OTLPConfig
|
otlpCfg := rw.config().OTLPConfig
|
||||||
app := &timeLimitAppender{
|
app := &remoteWriteAppender{
|
||||||
Appender: rw.appendable.Appender(ctx),
|
Appender: rw.appendable.Appender(ctx),
|
||||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||||
}
|
}
|
||||||
@ -745,13 +745,13 @@ func hasDelta(md pmetric.Metrics) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
type timeLimitAppender struct {
|
type remoteWriteAppender struct {
|
||||||
storage.Appender
|
storage.Appender
|
||||||
|
|
||||||
maxTime int64
|
maxTime int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
func (app *remoteWriteAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||||
if t > app.maxTime {
|
if t > app.maxTime {
|
||||||
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
||||||
}
|
}
|
||||||
@ -763,11 +763,18 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels,
|
|||||||
return ref, nil
|
return ref, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
func (app *remoteWriteAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
if t > app.maxTime {
|
if t > app.maxTime {
|
||||||
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax {
|
||||||
|
h = h.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||||
|
}
|
||||||
|
if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax {
|
||||||
|
fh = fh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||||
|
}
|
||||||
|
|
||||||
ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh)
|
ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -775,7 +782,7 @@ func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.La
|
|||||||
return ref, nil
|
return ref, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
func (app *remoteWriteAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||||
if e.Ts > app.maxTime {
|
if e.Ts > app.maxTime {
|
||||||
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1222,3 +1222,100 @@ func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, l labels.Labels
|
|||||||
m.samples = append(m.samples, mockSample{l, ct, 0})
|
m.samples = append(m.samples, mockSample{l, ct, 0})
|
||||||
return storage.SeriesRef(hash), nil
|
return storage.SeriesRef(hash), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
highSchemaHistogram = &histogram.Histogram{
|
||||||
|
Schema: 10,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{
|
||||||
|
Offset: -1,
|
||||||
|
Length: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{1, 2},
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{
|
||||||
|
Offset: 0,
|
||||||
|
Length: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []int64{1},
|
||||||
|
}
|
||||||
|
reducedSchemaHistogram = &histogram.Histogram{
|
||||||
|
Schema: 8,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{
|
||||||
|
Offset: 0,
|
||||||
|
Length: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{4},
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{
|
||||||
|
Offset: 0,
|
||||||
|
Length: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []int64{1},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHistogramsReduction(t *testing.T) {
|
||||||
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
||||||
|
t.Run(string(protoMsg), func(t *testing.T) {
|
||||||
|
appendable := &mockAppendable{}
|
||||||
|
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false)
|
||||||
|
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
payload []byte
|
||||||
|
)
|
||||||
|
|
||||||
|
if protoMsg == config.RemoteWriteProtoMsgV1 {
|
||||||
|
payload, _, _, err = buildWriteRequest(nil, []prompb.TimeSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric1"}},
|
||||||
|
Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, highSchemaHistogram)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric2"}},
|
||||||
|
Histograms: []prompb.Histogram{prompb.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))},
|
||||||
|
},
|
||||||
|
}, nil, nil, nil, nil, "snappy")
|
||||||
|
} else {
|
||||||
|
payload, _, _, err = buildV2WriteRequest(promslog.NewNopLogger(), []writev2.TimeSeries{
|
||||||
|
{
|
||||||
|
LabelsRefs: []uint32{0, 1},
|
||||||
|
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, highSchemaHistogram)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
LabelsRefs: []uint32{0, 2},
|
||||||
|
Histograms: []writev2.Histogram{writev2.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))},
|
||||||
|
},
|
||||||
|
}, []string{"__name__", "test_metric1", "test_metric2"},
|
||||||
|
nil, nil, nil, "snappy")
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("", "", bytes.NewReader(payload))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[protoMsg])
|
||||||
|
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(recorder, req)
|
||||||
|
|
||||||
|
resp := recorder.Result()
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||||
|
require.Empty(t, body)
|
||||||
|
|
||||||
|
require.Len(t, appendable.histograms, 2)
|
||||||
|
require.Equal(t, int64(1), appendable.histograms[0].t)
|
||||||
|
require.Equal(t, reducedSchemaHistogram, appendable.histograms[0].h)
|
||||||
|
require.Equal(t, int64(2), appendable.histograms[1].t)
|
||||||
|
require.Equal(t, reducedSchemaHistogram.ToFloat(nil), appendable.histograms[1].fh)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -954,6 +954,12 @@ func (it *floatHistogramIterator) Next() ValueType {
|
|||||||
it.err = err
|
it.err = err
|
||||||
return ValNone
|
return ValNone
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !histogram.IsValidSchema(schema) {
|
||||||
|
it.err = histogram.InvalidSchemaError(schema)
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
|
||||||
it.schema = schema
|
it.schema = schema
|
||||||
it.zThreshold = zeroThreshold
|
it.zThreshold = zeroThreshold
|
||||||
it.pSpans, it.nSpans = posSpans, negSpans
|
it.pSpans, it.nSpans = posSpans, negSpans
|
||||||
|
|||||||
@ -14,6 +14,7 @@
|
|||||||
package chunkenc
|
package chunkenc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -1462,3 +1463,32 @@ func TestFloatHistogramEmptyBucketsWithGaps(t *testing.T) {
|
|||||||
require.Equal(t, ValNone, it.Next())
|
require.Equal(t, ValNone, it.Next())
|
||||||
require.NoError(t, it.Err())
|
require.NoError(t, it.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFloatHistogramIteratorFailIfSchemaInValid(t *testing.T) {
|
||||||
|
for _, schema := range []int32{-101, 101} {
|
||||||
|
t.Run(fmt.Sprintf("schema %d", schema), func(t *testing.T) {
|
||||||
|
h := &histogram.FloatHistogram{
|
||||||
|
Schema: schema,
|
||||||
|
Count: 10,
|
||||||
|
Sum: 15.0,
|
||||||
|
ZeroThreshold: 1e-100,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []float64{1, 2, 3, 4},
|
||||||
|
}
|
||||||
|
|
||||||
|
c := NewFloatHistogramChunk()
|
||||||
|
app, err := c.Appender()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, _, _, err = app.AppendFloatHistogram(nil, 1, h, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
it := c.Iterator(nil)
|
||||||
|
require.Equal(t, ValNone, it.Next())
|
||||||
|
require.ErrorIs(t, it.Err(), histogram.ErrHistogramsInvalidSchema)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1077,6 +1077,12 @@ func (it *histogramIterator) Next() ValueType {
|
|||||||
it.err = err
|
it.err = err
|
||||||
return ValNone
|
return ValNone
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !histogram.IsValidSchema(schema) {
|
||||||
|
it.err = histogram.InvalidSchemaError(schema)
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
|
||||||
it.schema = schema
|
it.schema = schema
|
||||||
it.zThreshold = zeroThreshold
|
it.zThreshold = zeroThreshold
|
||||||
it.pSpans, it.nSpans = posSpans, negSpans
|
it.pSpans, it.nSpans = posSpans, negSpans
|
||||||
|
|||||||
@ -14,6 +14,7 @@
|
|||||||
package chunkenc
|
package chunkenc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -1818,3 +1819,32 @@ func TestIntHistogramEmptyBucketsWithGaps(t *testing.T) {
|
|||||||
require.Equal(t, ValNone, it.Next())
|
require.Equal(t, ValNone, it.Next())
|
||||||
require.NoError(t, it.Err())
|
require.NoError(t, it.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHistogramIteratorFailIfSchemaInValid(t *testing.T) {
|
||||||
|
for _, schema := range []int32{-101, 101} {
|
||||||
|
t.Run(fmt.Sprintf("schema %d", schema), func(t *testing.T) {
|
||||||
|
h := &histogram.Histogram{
|
||||||
|
Schema: schema,
|
||||||
|
Count: 10,
|
||||||
|
Sum: 15.0,
|
||||||
|
ZeroThreshold: 1e-100,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{1, 2, 3, 4},
|
||||||
|
}
|
||||||
|
|
||||||
|
c := NewHistogramChunk()
|
||||||
|
app, err := c.Appender()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, _, _, err = app.AppendHistogram(nil, 1, h, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
it := c.Iterator(nil)
|
||||||
|
require.Equal(t, ValNone, it.Next())
|
||||||
|
require.ErrorIs(t, it.Err(), histogram.ErrHistogramsInvalidSchema)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user