mirror of
https://github.com/prometheus/prometheus.git
synced 2025-09-21 13:51:00 +02:00
feat(remote): reduce resolution of native histograms on remote read
If a sample read through remote read has too high resolution, reduce it to the maximum allowed. This is a slow path, but we only expect it to happen if the server side is newer version that allows higher resolution. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
46d15c2626
commit
3e96d902e0
@ -472,7 +472,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func validateHistogramSchema(h *prompb.Histogram) error {
|
func validateHistogramSchema(h *prompb.Histogram) error {
|
||||||
if histogram.IsValidSchema(h.Schema) {
|
if histogram.IsAcceptibleSchema(h.Schema) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("invalid histogram schema %d", h.Schema)
|
return fmt.Errorf("invalid histogram schema %d", h.Schema)
|
||||||
@ -500,14 +500,28 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist
|
|||||||
panic("iterator is not on an integer histogram sample")
|
panic("iterator is not on an integer histogram sample")
|
||||||
}
|
}
|
||||||
h := c.series.histograms[c.histogramsCur]
|
h := c.series.histograms[c.histogramsCur]
|
||||||
return h.Timestamp, h.ToIntHistogram()
|
mh := h.ToIntHistogram()
|
||||||
|
if mh.Schema > histogram.ExponentialSchemaMax && mh.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||||
|
// This is a very slow path, but it should only happen if the
|
||||||
|
// sample is from a newer Prometheus version that supports higher
|
||||||
|
// resolution.
|
||||||
|
mh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||||
|
}
|
||||||
|
return h.Timestamp, mh
|
||||||
}
|
}
|
||||||
|
|
||||||
// AtFloatHistogram implements chunkenc.Iterator.
|
// AtFloatHistogram implements chunkenc.Iterator.
|
||||||
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||||
if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram {
|
if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram {
|
||||||
fh := c.series.histograms[c.histogramsCur]
|
fh := c.series.histograms[c.histogramsCur]
|
||||||
return fh.Timestamp, fh.ToFloatHistogram() // integer will be auto-converted.
|
mfh := fh.ToFloatHistogram() // integer will be auto-converted.
|
||||||
|
if mfh.Schema > histogram.ExponentialSchemaMax && mfh.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||||
|
// This is a very slow path, but it should only happen if the
|
||||||
|
// sample is from a newer Prometheus version that supports higher
|
||||||
|
// resolution.
|
||||||
|
mfh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||||
|
}
|
||||||
|
return fh.Timestamp, mfh
|
||||||
}
|
}
|
||||||
panic("iterator is not on a histogram sample")
|
panic("iterator is not on a histogram sample")
|
||||||
}
|
}
|
||||||
|
@ -592,6 +592,34 @@ func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcreteSeriesIterator_ReducesHighResolutionHistograms(t *testing.T) {
|
||||||
|
for _, schema := range []int32{9, 52} {
|
||||||
|
t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) {
|
||||||
|
h := testHistogram.Copy()
|
||||||
|
h.Schema = schema
|
||||||
|
fh := h.ToFloat(nil)
|
||||||
|
series := &concreteSeries{
|
||||||
|
labels: labels.FromStrings("foo", "bar"),
|
||||||
|
histograms: []prompb.Histogram{
|
||||||
|
prompb.FromIntHistogram(1, h),
|
||||||
|
prompb.FromFloatHistogram(2, fh),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
it := series.Iterator(nil)
|
||||||
|
require.Equal(t, chunkenc.ValHistogram, it.Next())
|
||||||
|
_, gotH := it.AtHistogram(nil)
|
||||||
|
require.Equal(t, histogram.ExponentialSchemaMax, gotH.Schema)
|
||||||
|
_, gotFH := it.AtFloatHistogram(nil)
|
||||||
|
require.Equal(t, histogram.ExponentialSchemaMax, gotFH.Schema)
|
||||||
|
require.Equal(t, chunkenc.ValFloatHistogram, it.Next())
|
||||||
|
_, gotFH = it.AtFloatHistogram(nil)
|
||||||
|
require.Equal(t, histogram.ExponentialSchemaMax, gotFH.Schema)
|
||||||
|
require.Equal(t, chunkenc.ValNone, it.Next())
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFromQueryResultWithDuplicates(t *testing.T) {
|
func TestFromQueryResultWithDuplicates(t *testing.T) {
|
||||||
ts1 := prompb.TimeSeries{
|
ts1 := prompb.TimeSeries{
|
||||||
Labels: []prompb.Label{
|
Labels: []prompb.Label{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user