From bd159f63e70acb34fea835d46b88650cab8236aa Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 13 Jan 2023 16:00:39 -0700 Subject: [PATCH] Initial support for returning histograms with Arrow Signed-off-by: Chris Marchbanks --- web/api/v1/api.go | 132 ++++++++++++++++++++++++++++++++--------- web/api/v1/api_test.go | 62 ++++++++++++++++--- 2 files changed, 156 insertions(+), 38 deletions(-) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index e46d770d41..d156c02f06 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -16,6 +16,7 @@ package v1 import ( "context" "fmt" + "io" "math" "math/rand" "net" @@ -1565,49 +1566,122 @@ func (api *API) tryArrowResponse(w http.ResponseWriter, r *http.Request, data in } matrix := result.Result.(promql.Matrix) - // Currently histograms are not handled with Arrow, just adding a binary - // type here halves the performance of Arrow. In the future we could - // consider building a custom type in Arrow for histograms. - for _, series := range matrix { - if len(series.Points) > 0 && series.Points[0].H != nil { - return fmt.Errorf("arrow not implemented for native histograms") - } - } - w.Header().Set("Content-Type", mimeTypeArrowStream) w.WriteHeader(http.StatusOK) pool := memory.NewGoAllocator() + for _, series := range matrix { + var err error + if len(series.Points) > 0 && series.Points[0].H != nil { + err = api.writeArrowHistogram(w, pool, series) + } else { + err = api.writeArrow(w, pool, series) + } + if err != nil { + level.Error(api.logger).Log("msg", "error writing arrow response", "err", err) + } + } + + return nil +} + +func (api *API) writeArrow(w io.Writer, pool memory.Allocator, series promql.Series) error { fields := []arrow.Field{ {Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}}, {Name: "v", Type: arrow.PrimitiveTypes.Float64}, } - for _, series := range matrix { - metadata := arrow.MetadataFrom(series.Metric.Map()) - seriesSchema := arrow.NewSchema(fields, &metadata) - b := array.NewRecordBuilder(pool, seriesSchema) - b.Reserve(len(series.Points)) + metadata := arrow.MetadataFrom(series.Metric.Map()) + seriesSchema := arrow.NewSchema(fields, &metadata) + b := array.NewRecordBuilder(pool, seriesSchema) + defer b.Release() + b.Reserve(len(series.Points)) - writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(seriesSchema)) + writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(seriesSchema)) + defer writer.Close() - for _, point := range series.Points { - // Since we reserve enough data for all points above we can use UnsafeAppend. - b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T)) - b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V) - } - rec := b.NewRecord() - if err := writer.Write(rec); err != nil { - level.Error(api.logger).Log("msg", "error writing arrow response", "err", err) - return nil - } + for _, point := range series.Points { + // Since we reserve enough data for all points above we can use UnsafeAppend. + b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T)) + b.Field(1).(*array.Float64Builder).UnsafeAppend(point.V) + } + rec := b.NewRecord() + defer rec.Release() + return writer.Write(rec) +} - writer.Close() - rec.Release() - b.Release() +func (api *API) writeArrowHistogram(w io.Writer, pool memory.Allocator, series promql.Series) error { + metadata := arrow.MetadataFrom(series.Metric.Map()) + spanFields := []arrow.Field{ + {Name: "offset", Type: arrow.PrimitiveTypes.Int32}, + {Name: "length", Type: arrow.PrimitiveTypes.Uint32}, + } + fields := []arrow.Field{ + {Name: "t", Type: &arrow.TimestampType{Unit: arrow.Millisecond}}, + {Name: "counterResetHint", Type: arrow.PrimitiveTypes.Uint8}, + {Name: "schema", Type: arrow.PrimitiveTypes.Int32}, + {Name: "zeroThreshold", Type: arrow.PrimitiveTypes.Float64}, + {Name: "zeroCount", Type: arrow.PrimitiveTypes.Float64}, + {Name: "count", Type: arrow.PrimitiveTypes.Float64}, + {Name: "sum", Type: arrow.PrimitiveTypes.Float64}, + // Spans are flattened into two separate lists for ease of Arrow encoding. + {Name: "positiveSpans", Type: arrow.ListOf(arrow.StructOf(spanFields...))}, + {Name: "negativeSpans", Type: arrow.ListOf(arrow.StructOf(spanFields...))}, + {Name: "positiveBuckets", Type: arrow.ListOf(arrow.PrimitiveTypes.Float64)}, + {Name: "negativeBuckets", Type: arrow.ListOf(arrow.PrimitiveTypes.Float64)}, } - return nil + schema := arrow.NewSchema(fields, &metadata) + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + b.Reserve(len(series.Points)) + writer := ipc.NewWriter(w, ipc.WithAllocator(pool), ipc.WithSchema(schema)) + defer writer.Close() + + for _, point := range series.Points { + // Since we reserve enough data for all points above we can use UnsafeAppend. + b.Field(0).(*array.TimestampBuilder).UnsafeAppend(arrow.Timestamp(point.T)) + b.Field(1).(*array.Uint8Builder).UnsafeAppend(uint8(point.H.CounterResetHint)) + b.Field(2).(*array.Int32Builder).UnsafeAppend(point.H.Schema) + b.Field(3).(*array.Float64Builder).UnsafeAppend(point.H.ZeroThreshold) + b.Field(4).(*array.Float64Builder).UnsafeAppend(point.H.ZeroCount) + b.Field(5).(*array.Float64Builder).UnsafeAppend(point.H.Count) + b.Field(6).(*array.Float64Builder).UnsafeAppend(point.H.Sum) + + positiveSpanBuilder := b.Field(7).(*array.ListBuilder) + positiveSpanBuilder.Append(true) + + positiveSpanValueBuilder := positiveSpanBuilder.ValueBuilder().(*array.StructBuilder) + positiveSpanValueBuilder.Reserve(len(point.H.PositiveSpans)) + for _, span := range point.H.PositiveSpans { + positiveSpanValueBuilder.FieldBuilder(0).(*array.Int32Builder).UnsafeAppend(span.Offset) + positiveSpanValueBuilder.FieldBuilder(1).(*array.Uint32Builder).UnsafeAppend(span.Length) + positiveSpanValueBuilder.Append(true) + } + + negativeSpanBuilder := b.Field(8).(*array.ListBuilder) + negativeSpanBuilder.Append(true) + + negativeSpanValueBuilder := negativeSpanBuilder.ValueBuilder().(*array.StructBuilder) + negativeSpanValueBuilder.Reserve(len(point.H.NegativeSpans)) + for _, span := range point.H.NegativeSpans { + negativeSpanValueBuilder.FieldBuilder(0).(*array.Int32Builder).UnsafeAppend(span.Offset) + negativeSpanValueBuilder.FieldBuilder(1).(*array.Uint32Builder).UnsafeAppend(span.Length) + negativeSpanValueBuilder.Append(true) + } + + positiveBucketsBuilder := b.Field(9).(*array.ListBuilder) + positiveBucketsBuilder.Append(true) + positiveBucketsBuilder.ValueBuilder().(*array.Float64Builder).AppendValues(point.H.PositiveBuckets, nil) + + negativeBucketsBuilder := b.Field(10).(*array.ListBuilder) + negativeBucketsBuilder.Append(true) + negativeBucketsBuilder.ValueBuilder().(*array.Float64Builder).AppendValues(point.H.NegativeBuckets, nil) + + } + rec := b.NewRecord() + defer rec.Release() + return writer.Write(rec) } func (api *API) respond(w http.ResponseWriter, r *http.Request, data interface{}, warnings storage.Warnings) { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 233b364b72..fbf7463e27 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -3244,15 +3244,7 @@ func arrowToJSONResponse(reader io.Reader) ([]byte, error) { if series.Metric == nil { series.Metric = labelsFromArrowMetadata(schema.Metadata()) } - rec := r.Record() - for i := 0; i < int(rec.NumRows()); i++ { - t := rec.Column(0).(*array.Timestamp).Value(i) - v := rec.Column(1).(*array.Float64).Value(i) - series.Points = append(series.Points, promql.Point{ - T: int64(t), - V: v, - }) - } + series.Points = pointsFromRecord(r.Record()) } data = append(data, series) } @@ -3277,6 +3269,58 @@ func labelsFromArrowMetadata(metadata arrow.Metadata) labels.Labels { return labels.FromStrings(strs...) } +func pointsFromRecord(rec array.Record) []promql.Point { + points := make([]promql.Point, 0, rec.NumRows()) + isHistogram := !rec.Schema().HasField("v") + for i := 0; i < int(rec.NumRows()); i++ { + t := rec.Column(0).(*array.Timestamp).Value(i) + + if isHistogram { + h := &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(rec.Column(1).(*array.Uint8).Value(i)), + Schema: rec.Column(2).(*array.Int32).Value(i), + ZeroThreshold: rec.Column(3).(*array.Float64).Value(i), + ZeroCount: rec.Column(4).(*array.Float64).Value(i), + Count: rec.Column(5).(*array.Float64).Value(i), + Sum: rec.Column(6).(*array.Float64).Value(i), + PositiveSpans: spans(rec.Column(7).(*array.List), i), + NegativeSpans: spans(rec.Column(8).(*array.List), i), + PositiveBuckets: buckets(rec.Column(9).(*array.List), i), + NegativeBuckets: buckets(rec.Column(10).(*array.List), i), + } + + points = append(points, promql.Point{ + T: int64(t), + H: h, + }) + } else { + v := rec.Column(1).(*array.Float64).Value(i) + points = append(points, promql.Point{ + T: int64(t), + V: v, + }) + } + } + + return points +} + +func spans(encoded *array.List, i int) []histogram.Span { + spans := make([]histogram.Span, 0, encoded.ListValues().Len()) + for i := 0; i < encoded.ListValues().Len(); i++ { + spans = append(spans, histogram.Span{ + Offset: encoded.ListValues().(*array.Struct).Field(0).(*array.Int32).Value(i), + Length: encoded.ListValues().(*array.Struct).Field(1).(*array.Uint32).Value(i), + }) + } + return spans +} + +func buckets(encoded *array.List, i int) []float64 { + offsets := encoded.Offsets() + return encoded.ListValues().(*array.Float64).Float64Values()[offsets[i]:offsets[i+1]] +} + func TestTSDBStatus(t *testing.T) { tsdb := &fakeDB{} tsdbStatusAPI := func(api *API) apiFunc { return api.serveTSDBStatus }