From 08f3ddb86469297219087468218bbb3455cf7512 Mon Sep 17 00:00:00 2001 From: Levi Harrison Date: Thu, 14 Jul 2022 09:13:12 -0400 Subject: [PATCH] Sparse histogram remote-write support (#11001) --- config/config.go | 13 +- docs/configuration/configuration.md | 3 + .../example_write_adapter/server.go | 5 + documentation/examples/remote_storage/go.mod | 8 +- documentation/examples/remote_storage/go.sum | 5 + storage/remote/codec.go | 58 ++++ storage/remote/codec_test.go | 29 +- storage/remote/queue_manager.go | 312 +++++++++++++----- storage/remote/queue_manager_test.go | 141 ++++++-- storage/remote/write.go | 12 +- storage/remote/write_handler.go | 14 + storage/remote/write_handler_test.go | 56 +++- tsdb/wal/watcher.go | 69 +++- tsdb/wal/watcher_test.go | 42 ++- 14 files changed, 600 insertions(+), 167 deletions(-) diff --git a/config/config.go b/config/config.go index ce17803f97..782028cb76 100644 --- a/config/config.go +++ b/config/config.go @@ -748,12 +748,13 @@ func CheckTargetAddress(address model.LabelValue) error { // RemoteWriteConfig is the configuration for writing to remote storage. type RemoteWriteConfig struct { - URL *config.URL `yaml:"url"` - RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` - Headers map[string]string `yaml:"headers,omitempty"` - WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` - Name string `yaml:"name,omitempty"` - SendExemplars bool `yaml:"send_exemplars,omitempty"` + URL *config.URL `yaml:"url"` + RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` + Headers map[string]string `yaml:"headers,omitempty"` + WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"` + Name string `yaml:"name,omitempty"` + SendExemplars bool `yaml:"send_exemplars,omitempty"` + SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index c525109fdd..b9238ba97f 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -3045,6 +3045,9 @@ write_relabel_configs: # Enables sending of exemplars over remote write. Note that exemplar storage itself must be enabled for exemplars to be scraped in the first place. [ send_exemplars: | default = false ] +# Enables sending of native histograms, also known as sparse histograms, over remote write. +[ send_native_histograms: | default = false ] + # Sets the `Authorization` header on every remote write request with the # configured username and password. # password and password_file are mutually exclusive. diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index dc2bd0e70a..8de7acfe68 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -49,6 +49,11 @@ func main() { } fmt.Printf("\tExemplar: %+v %f %d\n", m, e.Value, e.Timestamp) } + + for _, hp := range ts.Histograms { + h := remote.HistogramProtoToHistogram(hp) + fmt.Printf("\tHistogram: %s\n", (&h).String()) + } } }) diff --git a/documentation/examples/remote_storage/go.mod b/documentation/examples/remote_storage/go.mod index 2c80a60b70..49664c58f4 100644 --- a/documentation/examples/remote_storage/go.mod +++ b/documentation/examples/remote_storage/go.mod @@ -16,7 +16,7 @@ require ( require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect - github.com/aws/aws-sdk-go v1.44.20 // indirect + github.com/aws/aws-sdk-go v1.44.47 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -46,9 +46,9 @@ require ( go.uber.org/goleak v1.1.12 // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect - golang.org/x/sys v0.0.0-20220624220833-87e55d714810 // indirect + golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect + golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03 // indirect @@ -58,7 +58,7 @@ require ( ) require ( - github.com/prometheus/prometheus v0.36.2 + github.com/prometheus/prometheus v0.37.0-rc.0.0.20220713192720-53982c3562b0 golang.org/x/oauth2 v0.0.0-20220630143837-2104d58473e0 // indirect ) diff --git a/documentation/examples/remote_storage/go.sum b/documentation/examples/remote_storage/go.sum index 76dd06383e..c3c744c43e 100644 --- a/documentation/examples/remote_storage/go.sum +++ b/documentation/examples/remote_storage/go.sum @@ -174,6 +174,7 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z github.com/aws/aws-sdk-go v1.43.11/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.44.20 h1:nllTRN24EfhDSeKsNbIc6HoC8Ogd2NCJTRB8l84kDlM= github.com/aws/aws-sdk-go v1.44.20/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/aws/aws-sdk-go v1.44.47/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= @@ -1022,6 +1023,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c/go.mod h1:S5n0C6tSgdnwWshBUceRx5G1OsjLv/EeZ9t3wIfEtsY= github.com/prometheus/prometheus v0.36.2 h1:ZMqiEKdamv/YgI/7V5WtQGWbwEerCsXJ26CZgeXDUXM= github.com/prometheus/prometheus v0.36.2/go.mod h1:GBcYMr17Nr2/iDIrWmiy9wC5GKl0NOQ5R9XynB1HAG8= +github.com/prometheus/prometheus v0.37.0-rc.0.0.20220713192720-53982c3562b0 h1:bdQvFWZ0EM5n6ditW8wtXWy3RMB19vhNiI5TWlgBYdM= +github.com/prometheus/prometheus v0.37.0-rc.0.0.20220713192720-53982c3562b0/go.mod h1:y+uCk/SdO73g9bMtjCZbejjmcjY4X+xLuKN7cBor5UE= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= @@ -1519,6 +1522,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220624220833-87e55d714810 h1:rHZQSjJdAI4Xf5Qzeh2bBc5YJIkPFVM6oDtMFYmgws0= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= @@ -1544,6 +1548,7 @@ golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs= golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 6c8eef2944..9286bc2261 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -502,6 +502,64 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar { } } +func HistogramProtoToHistogram(hp prompb.Histogram) histogram.Histogram { + h := histogram.Histogram{ + Schema: hp.Schema, + ZeroThreshold: hp.ZeroThreshold, + ZeroCount: hp.GetZeroCountInt(), + Count: hp.GetCountInt(), + Sum: hp.Sum, + PositiveBuckets: hp.PositiveBuckets.GetDelta(), + NegativeBuckets: hp.NegativeBuckets.GetDelta(), + } + + if hp.PositiveBuckets != nil { + h.PositiveSpans = spansProtoToSpans(hp.PositiveBuckets.Span) + } + if hp.NegativeBuckets != nil { + h.NegativeSpans = spansProtoToSpans(hp.NegativeBuckets.Span) + } + + return h +} + +func spansProtoToSpans(s []*prompb.Buckets_Span) []histogram.Span { + spans := make([]histogram.Span, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + +func histogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram { + return prompb.Histogram{ + Count: &prompb.Histogram_CountInt{CountInt: h.Count}, + Sum: h.Sum, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, + NegativeBuckets: &prompb.Buckets{ + Span: spansToSpansProto(h.NegativeSpans), + Delta: h.NegativeBuckets, + }, + PositiveBuckets: &prompb.Buckets{ + Span: spansToSpansProto(h.PositiveSpans), + Delta: h.PositiveBuckets, + }, + Timestamp: timestamp, + } +} + +func spansToSpansProto(s []histogram.Span) []*prompb.Buckets_Span { + spans := make([]*prompb.Buckets_Span, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = &prompb.Buckets_Span{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + // LabelProtosToMetric unpack a []*prompb.Label to a model.Metric func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { metric := make(model.Metric, len(labelPairs)) diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 4d59087e80..5366577336 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/prompb" @@ -27,6 +28,18 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) +var testHistogram = histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 0, + Sum: 20, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{-1}, +} + var writeRequestFixture = &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { @@ -37,8 +50,9 @@ var writeRequestFixture = &prompb.WriteRequest{ {Name: "d", Value: "e"}, {Name: "foo", Value: "bar"}, }, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, + Histograms: []prompb.Histogram{histogramToHistogramProto(0, &testHistogram)}, }, { Labels: []prompb.Label{ @@ -48,8 +62,9 @@ var writeRequestFixture = &prompb.WriteRequest{ {Name: "d", Value: "e"}, {Name: "foo", Value: "bar"}, }, - Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, + Histograms: []prompb.Histogram{histogramToHistogramProto(1, &testHistogram)}, }, }, } @@ -349,3 +364,9 @@ func TestDecodeWriteRequest(t *testing.T) { require.NoError(t, err) require.Equal(t, writeRequestFixture, actual) } + +func TestNilHistogramProto(t *testing.T) { + // This function will panic if it impromperly handles nil + // values, causing the test to fail. + HistogramProtoToHistogram(prompb.Histogram{}) +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 420a2d11ae..faf27257af 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -32,6 +32,7 @@ import ( "go.uber.org/atomic" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/prompb" @@ -54,30 +55,35 @@ const ( type queueManagerMetrics struct { reg prometheus.Registerer - samplesTotal prometheus.Counter - exemplarsTotal prometheus.Counter - metadataTotal prometheus.Counter - failedSamplesTotal prometheus.Counter - failedExemplarsTotal prometheus.Counter - failedMetadataTotal prometheus.Counter - retriedSamplesTotal prometheus.Counter - retriedExemplarsTotal prometheus.Counter - retriedMetadataTotal prometheus.Counter - droppedSamplesTotal prometheus.Counter - droppedExemplarsTotal prometheus.Counter - enqueueRetriesTotal prometheus.Counter - sentBatchDuration prometheus.Histogram - highestSentTimestamp *maxTimestamp - pendingSamples prometheus.Gauge - pendingExemplars prometheus.Gauge - shardCapacity prometheus.Gauge - numShards prometheus.Gauge - maxNumShards prometheus.Gauge - minNumShards prometheus.Gauge - desiredNumShards prometheus.Gauge - sentBytesTotal prometheus.Counter - metadataBytesTotal prometheus.Counter - maxSamplesPerSend prometheus.Gauge + samplesTotal prometheus.Counter + exemplarsTotal prometheus.Counter + histogramsTotal prometheus.Counter + metadataTotal prometheus.Counter + failedSamplesTotal prometheus.Counter + failedExemplarsTotal prometheus.Counter + failedHistogramsTotal prometheus.Counter + failedMetadataTotal prometheus.Counter + retriedSamplesTotal prometheus.Counter + retriedExemplarsTotal prometheus.Counter + retriedHistogramsTotal prometheus.Counter + retriedMetadataTotal prometheus.Counter + droppedSamplesTotal prometheus.Counter + droppedExemplarsTotal prometheus.Counter + droppedHistogramsTotal prometheus.Counter + enqueueRetriesTotal prometheus.Counter + sentBatchDuration prometheus.Histogram + highestSentTimestamp *maxTimestamp + pendingSamples prometheus.Gauge + pendingExemplars prometheus.Gauge + pendingHistograms prometheus.Gauge + shardCapacity prometheus.Gauge + numShards prometheus.Gauge + maxNumShards prometheus.Gauge + minNumShards prometheus.Gauge + desiredNumShards prometheus.Gauge + sentBytesTotal prometheus.Counter + metadataBytesTotal prometheus.Counter + maxSamplesPerSend prometheus.Gauge } func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics { @@ -103,6 +109,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of exemplars sent to remote storage.", ConstLabels: constLabels, }) + m.histogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "histograms_total", + Help: "Total number of histograms sent to remote storage.", + ConstLabels: constLabels, + }) m.metadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -124,6 +137,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of exemplars which failed on send to remote storage, non-recoverable errors.", ConstLabels: constLabels, }) + m.failedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "histograms_failed_total", + Help: "Total number of histograms which failed on send to remote storage, non-recoverable errors.", + ConstLabels: constLabels, + }) m.failedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -145,6 +165,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of exemplars which failed on send to remote storage but were retried because the send error was recoverable.", ConstLabels: constLabels, }) + m.retriedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "histograms_retried_total", + Help: "Total number of histograms which failed on send to remote storage but were retried because the send error was recoverable.", + ConstLabels: constLabels, + }) m.retriedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -166,6 +193,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, }) + m.droppedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "histograms_dropped_total", + Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + ConstLabels: constLabels, + }) m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -204,6 +238,13 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "The number of exemplars pending in the queues shards to be sent to the remote storage.", ConstLabels: constLabels, }) + m.pendingHistograms = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "histograms_pending", + Help: "The number of histograms pending in the queues shards to be sent to the remote storage.", + ConstLabels: constLabels, + }) m.shardCapacity = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -269,20 +310,25 @@ func (m *queueManagerMetrics) register() { m.reg.MustRegister( m.samplesTotal, m.exemplarsTotal, + m.histogramsTotal, m.metadataTotal, m.failedSamplesTotal, m.failedExemplarsTotal, + m.failedHistogramsTotal, m.failedMetadataTotal, m.retriedSamplesTotal, m.retriedExemplarsTotal, + m.retriedHistogramsTotal, m.retriedMetadataTotal, m.droppedSamplesTotal, m.droppedExemplarsTotal, + m.droppedHistogramsTotal, m.enqueueRetriesTotal, m.sentBatchDuration, m.highestSentTimestamp, m.pendingSamples, m.pendingExemplars, + m.pendingHistograms, m.shardCapacity, m.numShards, m.maxNumShards, @@ -299,20 +345,25 @@ func (m *queueManagerMetrics) unregister() { if m.reg != nil { m.reg.Unregister(m.samplesTotal) m.reg.Unregister(m.exemplarsTotal) + m.reg.Unregister(m.histogramsTotal) m.reg.Unregister(m.metadataTotal) m.reg.Unregister(m.failedSamplesTotal) m.reg.Unregister(m.failedExemplarsTotal) + m.reg.Unregister(m.failedHistogramsTotal) m.reg.Unregister(m.failedMetadataTotal) m.reg.Unregister(m.retriedSamplesTotal) m.reg.Unregister(m.retriedExemplarsTotal) + m.reg.Unregister(m.retriedHistogramsTotal) m.reg.Unregister(m.retriedMetadataTotal) m.reg.Unregister(m.droppedSamplesTotal) m.reg.Unregister(m.droppedExemplarsTotal) + m.reg.Unregister(m.droppedHistogramsTotal) m.reg.Unregister(m.enqueueRetriesTotal) m.reg.Unregister(m.sentBatchDuration) m.reg.Unregister(m.highestSentTimestamp) m.reg.Unregister(m.pendingSamples) m.reg.Unregister(m.pendingExemplars) + m.reg.Unregister(m.pendingHistograms) m.reg.Unregister(m.shardCapacity) m.reg.Unregister(m.numShards) m.reg.Unregister(m.maxNumShards) @@ -341,15 +392,16 @@ type WriteClient interface { type QueueManager struct { lastSendTimestamp atomic.Int64 - logger log.Logger - flushDeadline time.Duration - cfg config.QueueConfig - mcfg config.MetadataConfig - externalLabels labels.Labels - relabelConfigs []*relabel.Config - sendExemplars bool - watcher *wal.Watcher - metadataWatcher *MetadataWatcher + logger log.Logger + flushDeadline time.Duration + cfg config.QueueConfig + mcfg config.MetadataConfig + externalLabels labels.Labels + relabelConfigs []*relabel.Config + sendExemplars bool + sendNativeHistograms bool + watcher *wal.Watcher + metadataWatcher *MetadataWatcher clientMtx sync.RWMutex storeClient WriteClient @@ -396,6 +448,7 @@ func NewQueueManager( highestRecvTimestamp *maxTimestamp, sm ReadyScrapeManager, enableExemplarRemoteWrite bool, + enableNativeHistogramRemoteWrite bool, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -403,14 +456,15 @@ func NewQueueManager( logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint()) t := &QueueManager{ - logger: logger, - flushDeadline: flushDeadline, - cfg: cfg, - mcfg: mCfg, - externalLabels: externalLabels, - relabelConfigs: relabelConfigs, - storeClient: client, - sendExemplars: enableExemplarRemoteWrite, + logger: logger, + flushDeadline: flushDeadline, + cfg: cfg, + mcfg: mCfg, + externalLabels: externalLabels, + relabelConfigs: relabelConfigs, + storeClient: client, + sendExemplars: enableExemplarRemoteWrite, + sendNativeHistograms: enableNativeHistogramRemoteWrite, seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), @@ -430,7 +484,7 @@ func NewQueueManager( highestRecvTimestamp: highestRecvTimestamp, } - t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite) + t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) if t.mcfg.Send { t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) } @@ -538,11 +592,11 @@ outer: return false default: } - if t.shards.enqueue(s.Ref, sampleOrExemplar{ + if t.shards.enqueue(s.Ref, timeSeries{ seriesLabels: lbls, timestamp: s.T, value: s.V, - isSample: true, + sType: tSample, }) { continue outer } @@ -588,11 +642,59 @@ outer: return false default: } - if t.shards.enqueue(e.Ref, sampleOrExemplar{ + if t.shards.enqueue(e.Ref, timeSeries{ seriesLabels: lbls, timestamp: e.T, value: e.V, exemplarLabels: e.Labels, + sType: tExemplar, + }) { + continue outer + } + + t.metrics.enqueueRetriesTotal.Inc() + time.Sleep(time.Duration(backoff)) + backoff = backoff * 2 + if backoff > t.cfg.MaxBackoff { + backoff = t.cfg.MaxBackoff + } + } + } + return true +} + +func (t *QueueManager) AppendHistograms(histograms []record.RefHistogram) bool { + if !t.sendNativeHistograms { + return true + } + +outer: + for _, h := range histograms { + t.seriesMtx.Lock() + lbls, ok := t.seriesLabels[h.Ref] + if !ok { + t.metrics.droppedHistogramsTotal.Inc() + t.dataDropped.incr(1) + if _, ok := t.droppedSeries[h.Ref]; !ok { + level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) + } + t.seriesMtx.Unlock() + continue + } + t.seriesMtx.Unlock() + + backoff := model.Duration(5 * time.Millisecond) + for { + select { + case <-t.quit: + return false + default: + } + if t.shards.enqueue(h.Ref, timeSeries{ + seriesLabels: lbls, + timestamp: h.T, + histogram: h.H, + sType: tHistogram, }) { continue outer } @@ -921,8 +1023,9 @@ type shards struct { qm *QueueManager queues []*queue // So we can accurately track how many of each are lost during shard shutdowns. - enqueuedSamples atomic.Int64 - enqueuedExemplars atomic.Int64 + enqueuedSamples atomic.Int64 + enqueuedExemplars atomic.Int64 + enqueuedHistograms atomic.Int64 // Emulate a wait group with a channel and an atomic int, as you // cannot select on a wait group. @@ -934,9 +1037,10 @@ type shards struct { // Hard shutdown context is used to terminate outgoing HTTP connections // after giving them a chance to terminate. - hardShutdown context.CancelFunc - samplesDroppedOnHardShutdown atomic.Uint32 - exemplarsDroppedOnHardShutdown atomic.Uint32 + hardShutdown context.CancelFunc + samplesDroppedOnHardShutdown atomic.Uint32 + exemplarsDroppedOnHardShutdown atomic.Uint32 + histogramsDroppedOnHardShutdown atomic.Uint32 } // start the shards; must be called before any call to enqueue. @@ -961,8 +1065,10 @@ func (s *shards) start(n int) { s.done = make(chan struct{}) s.enqueuedSamples.Store(0) s.enqueuedExemplars.Store(0) + s.enqueuedHistograms.Store(0) s.samplesDroppedOnHardShutdown.Store(0) s.exemplarsDroppedOnHardShutdown.Store(0) + s.histogramsDroppedOnHardShutdown.Store(0) for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) } @@ -1008,7 +1114,7 @@ func (s *shards) stop() { // retry. A shard is full when its configured capacity has been reached, // specifically, when s.queues[shard] has filled its batchQueue channel and the // partial batch has also been filled. -func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { +func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool { s.mtx.RLock() defer s.mtx.RUnlock() @@ -1021,12 +1127,16 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { if !appended { return false } - if data.isSample { + switch data.sType { + case tSample: s.qm.metrics.pendingSamples.Inc() s.enqueuedSamples.Inc() - } else { + case tExemplar: s.qm.metrics.pendingExemplars.Inc() s.enqueuedExemplars.Inc() + case tHistogram: + s.qm.metrics.pendingHistograms.Inc() + s.enqueuedHistograms.Inc() } return true } @@ -1035,24 +1145,34 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { type queue struct { // batchMtx covers operations appending to or publishing the partial batch. batchMtx sync.Mutex - batch []sampleOrExemplar - batchQueue chan []sampleOrExemplar + batch []timeSeries + batchQueue chan []timeSeries // Since we know there are a limited number of batches out, using a stack // is easy and safe so a sync.Pool is not necessary. // poolMtx covers adding and removing batches from the batchPool. poolMtx sync.Mutex - batchPool [][]sampleOrExemplar + batchPool [][]timeSeries } -type sampleOrExemplar struct { +type timeSeries struct { seriesLabels labels.Labels value float64 + histogram *histogram.Histogram timestamp int64 exemplarLabels labels.Labels - isSample bool + // The type of series: sample, exemplar, or histogram. + sType seriesType } +type seriesType int + +const ( + tSample seriesType = iota + tExemplar + tHistogram +) + func newQueue(batchSize, capacity int) *queue { batches := capacity / batchSize // Always create an unbuffered channel even if capacity is configured to be @@ -1061,17 +1181,17 @@ func newQueue(batchSize, capacity int) *queue { batches = 1 } return &queue{ - batch: make([]sampleOrExemplar, 0, batchSize), - batchQueue: make(chan []sampleOrExemplar, batches), + batch: make([]timeSeries, 0, batchSize), + batchQueue: make(chan []timeSeries, batches), // batchPool should have capacity for everything in the channel + 1 for // the batch being processed. - batchPool: make([][]sampleOrExemplar, 0, batches+1), + batchPool: make([][]timeSeries, 0, batches+1), } } -// Append the sampleOrExemplar to the buffered batch. Returns false if it +// Append the timeSeries to the buffered batch. Returns false if it // cannot be added and must be retried. -func (q *queue) Append(datum sampleOrExemplar) bool { +func (q *queue) Append(datum timeSeries) bool { q.batchMtx.Lock() defer q.batchMtx.Unlock() q.batch = append(q.batch, datum) @@ -1089,12 +1209,12 @@ func (q *queue) Append(datum sampleOrExemplar) bool { return true } -func (q *queue) Chan() <-chan []sampleOrExemplar { +func (q *queue) Chan() <-chan []timeSeries { return q.batchQueue } // Batch returns the current batch and allocates a new batch. -func (q *queue) Batch() []sampleOrExemplar { +func (q *queue) Batch() []timeSeries { q.batchMtx.Lock() defer q.batchMtx.Unlock() @@ -1109,7 +1229,7 @@ func (q *queue) Batch() []sampleOrExemplar { } // ReturnForReuse adds the batch buffer back to the internal pool. -func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { +func (q *queue) ReturnForReuse(batch []timeSeries) { q.poolMtx.Lock() defer q.poolMtx.Unlock() if len(q.batchPool) < cap(q.batchPool) { @@ -1149,7 +1269,7 @@ func (q *queue) tryEnqueueingBatch(done <-chan struct{}) bool { } } -func (q *queue) newBatch(capacity int) []sampleOrExemplar { +func (q *queue) newBatch(capacity int) []timeSeries { q.poolMtx.Lock() defer q.poolMtx.Unlock() batches := len(q.batchPool) @@ -1158,7 +1278,7 @@ func (q *queue) newBatch(capacity int) []sampleOrExemplar { q.batchPool = q.batchPool[:batches-1] return batch } - return make([]sampleOrExemplar, 0, capacity) + return make([]timeSeries, 0, capacity) } func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { @@ -1209,22 +1329,26 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { // Remove them from pending and mark them as failed. droppedSamples := int(s.enqueuedSamples.Load()) droppedExemplars := int(s.enqueuedExemplars.Load()) + droppedHistograms := int(s.enqueuedHistograms.Load()) s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) s.qm.metrics.pendingExemplars.Sub(float64(droppedExemplars)) + s.qm.metrics.pendingHistograms.Sub(float64(droppedHistograms)) s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) s.qm.metrics.failedExemplarsTotal.Add(float64(droppedExemplars)) + s.qm.metrics.failedHistogramsTotal.Add(float64(droppedHistograms)) s.samplesDroppedOnHardShutdown.Add(uint32(droppedSamples)) s.exemplarsDroppedOnHardShutdown.Add(uint32(droppedExemplars)) + s.histogramsDroppedOnHardShutdown.Add(uint32(droppedHistograms)) return case batch, ok := <-batchQueue: if !ok { return } - nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) queue.ReturnForReuse(batch) - n := nPendingSamples + nPendingExemplars - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) + n := nPendingSamples + nPendingExemplars + nPendingHistograms + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1232,10 +1356,10 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case <-timer.C: batch := queue.Batch() if len(batch) > 0 { - nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) - n := nPendingSamples + nPendingExemplars + nPendingSamples, nPendingExemplars, nPendingHistograms := s.populateTimeSeries(batch, pendingData) + n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1243,43 +1367,51 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } -func (s *shards) populateTimeSeries(batch []sampleOrExemplar, pendingData []prompb.TimeSeries) (int, int) { - var nPendingSamples, nPendingExemplars int +func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int) { + var nPendingSamples, nPendingExemplars, nPendingHistograms int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] if s.qm.sendExemplars { pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] } + if s.qm.sendNativeHistograms { + pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] + } + // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. - if d.isSample { - pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + switch d.sType { + case tSample: pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ Value: d.value, Timestamp: d.timestamp, }) nPendingSamples++ - } else { - pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + case tExemplar: pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ Labels: labelsToLabelsProto(d.exemplarLabels, nil), Value: d.value, Timestamp: d.timestamp, }) nPendingExemplars++ + case tHistogram: + pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, histogramToHistogramProto(d.timestamp, d.histogram)) + nPendingHistograms++ } } - return nPendingSamples, nPendingExemplars + return nPendingSamples, nPendingExemplars, nPendingHistograms } -func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, pBuf, buf) + err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf) if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) + s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) } // These counters are used to calculate the dynamic sharding, and as such @@ -1287,16 +1419,18 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s s.qm.dataOut.incr(int64(len(samples))) s.qm.dataOutDuration.incr(int64(time.Since(begin))) s.qm.lastSendTimestamp.Store(time.Now().Unix()) - // Pending samples/exemplars also should be subtracted as an error means + // Pending samples/exemplars/histograms also should be subtracted as an error means // they will not be retried. s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) + s.qm.metrics.pendingHistograms.Sub(float64(histogramCount)) s.enqueuedSamples.Sub(int64(sampleCount)) s.enqueuedExemplars.Sub(int64(exemplarCount)) + s.enqueuedHistograms.Sub(int64(histogramCount)) } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount int, pBuf *proto.Buffer, buf *[]byte) error { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { // Build the WriteRequest with no metadata. req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) if err != nil { @@ -1326,10 +1460,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti if exemplarCount > 0 { span.SetAttributes(attribute.Int("exemplars", exemplarCount)) } + if histogramCount > 0 { + span.SetAttributes(attribute.Int("histograms", histogramCount)) + } begin := time.Now() s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) + s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) err := s.qm.client().Store(ctx, *buf) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) @@ -1344,6 +1482,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti onRetry := func() { s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) + s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) } err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) @@ -1420,6 +1559,9 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { highest = ts.Exemplars[0].Timestamp } + if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { + highest = ts.Histograms[0].Timestamp + } } req := &prompb.WriteRequest{ diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index aec783c4db..f6259862e8 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -36,6 +36,7 @@ import ( "go.uber.org/atomic" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/timestamp" @@ -60,13 +61,15 @@ func newHighestTimestampMetric() *maxTimestamp { func TestSampleDelivery(t *testing.T) { testcases := []struct { - name string - samples bool - exemplars bool + name string + samples bool + exemplars bool + histograms bool }{ - {samples: true, exemplars: false, name: "samples only"}, - {samples: true, exemplars: true, name: "both samples and exemplars"}, - {samples: false, exemplars: true, name: "exemplars only"}, + {samples: true, exemplars: false, histograms: false, name: "samples only"}, + {samples: true, exemplars: true, histograms: true, name: "samples, exemplars, and histograms"}, + {samples: false, exemplars: true, histograms: false, name: "exemplars only"}, + {samples: false, exemplars: false, histograms: true, name: "histograms only"}, } // Let's create an even number of send batches so we don't run into the @@ -86,6 +89,7 @@ func TestSampleDelivery(t *testing.T) { writeConfig := baseRemoteWriteConfig("http://test-storage.com") writeConfig.QueueConfig = queueConfig writeConfig.SendExemplars = true + writeConfig.SendNativeHistograms = true conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -97,9 +101,10 @@ func TestSampleDelivery(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { var ( - series []record.RefSeries - samples []record.RefSample - exemplars []record.RefExemplar + series []record.RefSeries + samples []record.RefSample + exemplars []record.RefExemplar + histograms []record.RefHistogram ) // Generates same series in both cases. @@ -109,6 +114,9 @@ func TestSampleDelivery(t *testing.T) { if tc.exemplars { exemplars, series = createExemplars(n, n) } + if tc.histograms { + histograms, series = createHistograms(n, n) + } // Apply new config. queueConfig.Capacity = len(samples) @@ -126,15 +134,19 @@ func TestSampleDelivery(t *testing.T) { // Send first half of data. c.expectSamples(samples[:len(samples)/2], series) c.expectExemplars(exemplars[:len(exemplars)/2], series) + c.expectHistograms(histograms[:len(histograms)/2], series) qm.Append(samples[:len(samples)/2]) qm.AppendExemplars(exemplars[:len(exemplars)/2]) + qm.AppendHistograms(histograms[:len(histograms)/2]) c.waitForExpectedData(t) // Send second half of data. c.expectSamples(samples[len(samples)/2:], series) c.expectExemplars(exemplars[len(exemplars)/2:], series) + c.expectHistograms(histograms[len(histograms)/2:], series) qm.Append(samples[len(samples)/2:]) qm.AppendExemplars(exemplars[len(exemplars)/2:]) + qm.AppendHistograms(histograms[len(histograms)/2:]) c.waitForExpectedData(t) }) } @@ -149,7 +161,7 @@ func TestMetadataDelivery(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.Start() defer m.Stop() @@ -188,7 +200,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -230,7 +242,7 @@ func TestSampleDeliveryOrder(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.StoreSeries(series, 0) m.Start() @@ -250,7 +262,7 @@ func TestShutdown(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -288,7 +300,7 @@ func TestSeriesReset(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -317,7 +329,7 @@ func TestReshard(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.StoreSeries(series, 0) m.Start() @@ -353,7 +365,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.Start() h.Unlock() h.Lock() @@ -388,7 +400,7 @@ func TestReshardPartialBatch(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.StoreSeries(series, 0) m.Start() @@ -433,7 +445,7 @@ func TestQueueFilledDeadlock(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -460,7 +472,7 @@ func TestReleaseNoninternedString(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.Start() defer m.Stop() @@ -512,7 +524,7 @@ func TestShouldReshard(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -571,6 +583,37 @@ func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []recor return exemplars, series } +func createHistograms(numSamples, numSeries int) ([]record.RefHistogram, []record.RefSeries) { + histograms := make([]record.RefHistogram, 0, numSamples) + series := make([]record.RefSeries, 0, numSeries) + for i := 0; i < numSeries; i++ { + name := fmt.Sprintf("test_metric_%d", i) + for j := 0; j < numSamples; j++ { + h := record.RefHistogram{ + Ref: chunks.HeadSeriesRef(i), + T: int64(j), + H: &histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{int64(i) + 1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{int64(-i) - 1}, + }, + } + histograms = append(histograms, h) + } + series = append(series, record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: labels.Labels{{Name: "__name__", Value: name}}, + }) + } + return histograms, series +} + func getSeriesNameFromRef(r record.RefSeries) string { for _, l := range r.Labels { if l.Name == "__name__" { @@ -581,16 +624,18 @@ func getSeriesNameFromRef(r record.RefSeries) string { } type TestWriteClient struct { - receivedSamples map[string][]prompb.Sample - expectedSamples map[string][]prompb.Sample - receivedExemplars map[string][]prompb.Exemplar - expectedExemplars map[string][]prompb.Exemplar - receivedMetadata map[string][]prompb.MetricMetadata - writesReceived int - withWaitGroup bool - wg sync.WaitGroup - mtx sync.Mutex - buf []byte + receivedSamples map[string][]prompb.Sample + expectedSamples map[string][]prompb.Sample + receivedExemplars map[string][]prompb.Exemplar + expectedExemplars map[string][]prompb.Exemplar + receivedHistograms map[string][]prompb.Histogram + expectedHistograms map[string][]prompb.Histogram + receivedMetadata map[string][]prompb.MetricMetadata + writesReceived int + withWaitGroup bool + wg sync.WaitGroup + mtx sync.Mutex + buf []byte } func NewTestWriteClient() *TestWriteClient { @@ -644,6 +689,23 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco c.wg.Add(len(ss)) } +func (c *TestWriteClient) expectHistograms(hh []record.RefHistogram, series []record.RefSeries) { + if !c.withWaitGroup { + return + } + c.mtx.Lock() + defer c.mtx.Unlock() + + c.expectedHistograms = map[string][]prompb.Histogram{} + c.receivedHistograms = map[string][]prompb.Histogram{} + + for _, h := range hh { + seriesName := getSeriesNameFromRef(series[h.Ref]) + c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], histogramToHistogramProto(h.T, h.H)) + } + c.wg.Add(len(hh)) +} + func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { if !c.withWaitGroup { return @@ -657,6 +719,9 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { for ts, expectedExemplar := range c.expectedExemplars { require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts) } + for ts, expectedHistogram := range c.expectedHistograms { + require.Equal(tb, expectedHistogram, c.receivedHistograms[ts], ts) + } } func (c *TestWriteClient) Store(_ context.Context, req []byte) error { @@ -676,7 +741,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error { if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { return err } - count := 0 for _, ts := range reqProto.Timeseries { var seriesName string @@ -695,6 +759,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error { count++ c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) } + + for _, histogram := range ts.Histograms { + count++ + c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram) + } } if c.withWaitGroup { c.wg.Add(-count) @@ -791,7 +860,7 @@ func BenchmarkSampleSend(b *testing.B) { dir := b.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) m.StoreSeries(series, 0) // These should be received by the client. @@ -837,7 +906,7 @@ func BenchmarkStartup(b *testing.B) { c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false) + cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -913,7 +982,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -990,7 +1059,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) for _, tc := range []struct { name string @@ -1181,7 +1250,7 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) { batchSize := 10 queue := newQueue(batchSize, capacity) for i := 0; i < capacity+batchSize; i++ { - queue.Append(sampleOrExemplar{}) + queue.Append(timeSeries{}) } done := make(chan struct{}) diff --git a/storage/remote/write.go b/storage/remote/write.go index 2faff35204..63a454e3dc 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -45,6 +45,12 @@ var ( Name: "exemplars_in_total", Help: "Exemplars in to remote storage, compare to exemplars out for queue managers.", }) + histogramsIn = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "histograms_in_total", + Help: "Histograms in to remote storage, compare to histograms out for queue managers.", + }) ) // WriteStorage represents all the remote write storage. @@ -188,6 +194,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.highestTimestamp, rws.scraper, rwConf.SendExemplars, + rwConf.SendNativeHistograms, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) @@ -270,7 +277,7 @@ func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, return 0, nil } -func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, _ *histogram.Histogram) (storage.SeriesRef, error) { +func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, ts int64, h *histogram.Histogram) (storage.SeriesRef, error) { t.histograms++ if ts > t.highestTimestamp { t.highestTimestamp = ts @@ -280,10 +287,11 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { - t.writeStorage.samplesIn.incr(t.samples + t.exemplars) + t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) samplesIn.Add(float64(t.samples)) exemplarsIn.Add(float64(t.exemplars)) + histogramsIn.Add(float64(t.histograms)) t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000)) return nil } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 6493622239..226b5ac434 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -117,6 +117,20 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) } } + + for _, hp := range ts.Histograms { + hs := HistogramProtoToHistogram(hp) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, &hs) + if err != nil { + unwrappedErr := errors.Unwrap(err) + // Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is + // a note indicating its inclusion in the future. + if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { + level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) + } + return err + } + } } if outOfOrderExemplarErrs > 0 { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 99936073d0..8eacc27c1f 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -50,6 +50,7 @@ func TestRemoteWriteHandler(t *testing.T) { i := 0 j := 0 + k := 0 for _, ts := range writeRequestFixture.Timeseries { labels := labelProtosToLabels(ts.Labels) for _, s := range ts.Samples { @@ -62,6 +63,12 @@ func TestRemoteWriteHandler(t *testing.T) { require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) j++ } + + for _, hp := range ts.Histograms { + h := HistogramProtoToHistogram(hp) + require.Equal(t, mockHistogram{labels, hp.Timestamp, &h}, appendable.histograms[k]) + k++ + } } } @@ -113,6 +120,28 @@ func TestOutOfOrderExemplar(t *testing.T) { require.Equal(t, http.StatusNoContent, resp.StatusCode) } +func TestOutOfOrderHistogram(t *testing.T) { + buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Histograms: []prompb.Histogram{histogramToHistogramProto(0, &testHistogram)}, + }}, nil, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + latestHistogram: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + func TestCommitErr(t *testing.T) { buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) require.NoError(t, err) @@ -136,11 +165,13 @@ func TestCommitErr(t *testing.T) { } type mockAppendable struct { - latestSample int64 - samples []mockSample - latestExemplar int64 - exemplars []mockExemplar - commitErr error + latestSample int64 + samples []mockSample + latestExemplar int64 + exemplars []mockExemplar + latestHistogram int64 + histograms []mockHistogram + commitErr error } type mockSample struct { @@ -156,6 +187,12 @@ type mockExemplar struct { v float64 } +type mockHistogram struct { + l labels.Labels + t int64 + h *histogram.Histogram +} + func (m *mockAppendable) Appender(_ context.Context) storage.Appender { return m } @@ -188,7 +225,12 @@ func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e return 0, nil } -func (*mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { - // TODO(beorn7): Noop until we implement sparse histograms over remote write. +func (m *mockAppendable) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { + if t < m.latestHistogram { + return 0, storage.ErrOutOfOrderSample + } + + m.latestHistogram = t + m.histograms = append(m.histograms, mockHistogram{l, t, h}) return 0, nil } diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 7b026b4ea8..7188f8b1d2 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -49,6 +49,7 @@ type WriteTo interface { // Once returned, the WAL Watcher will not attempt to pass that data again. Append([]record.RefSample) bool AppendExemplars([]record.RefExemplar) bool + AppendHistograms([]record.RefHistogram) bool StoreSeries([]record.RefSeries, int) // Next two methods are intended for garbage-collection: first we call @@ -74,6 +75,7 @@ type Watcher struct { walDir string lastCheckpoint string sendExemplars bool + sendHistograms bool metrics *WatcherMetrics readerMetrics *LiveReaderMetrics @@ -144,18 +146,19 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars bool) *Watcher { +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher { if logger == nil { logger = log.NewNopLogger() } return &Watcher{ - logger: logger, - writer: writer, - metrics: metrics, - readerMetrics: readerMetrics, - walDir: path.Join(dir, "wal"), - name: name, - sendExemplars: sendExemplars, + logger: logger, + writer: writer, + metrics: metrics, + readerMetrics: readerMetrics, + walDir: path.Join(dir, "wal"), + name: name, + sendExemplars: sendExemplars, + sendHistograms: sendHistograms, quit: make(chan struct{}), done: make(chan struct{}), @@ -473,11 +476,13 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { // Also used with readCheckpoint - implements segmentReadFn. func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( - dec record.Decoder - series []record.RefSeries - samples []record.RefSample - send []record.RefSample - exemplars []record.RefExemplar + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + samplesToSend []record.RefSample + exemplars []record.RefExemplar + histograms []record.RefHistogram + histogramsToSend []record.RefHistogram ) for r.Next() && !isClosed(w.quit) { rec := r.Record() @@ -510,12 +515,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { duration := time.Since(w.startTime) level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration) } - send = append(send, s) + samplesToSend = append(samplesToSend, s) } } - if len(send) > 0 { - w.writer.Append(send) - send = send[:0] + if len(samplesToSend) > 0 { + w.writer.Append(samplesToSend) + samplesToSend = samplesToSend[:0] } case record.Exemplars: @@ -535,6 +540,34 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } w.writer.AppendExemplars(exemplars) + case record.Histograms: + // Skip if experimental "histograms over remote write" is not enabled. + if !w.sendHistograms { + break + } + if !tail { + break + } + histograms, err := dec.Histograms(rec, histograms[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + for _, h := range histograms { + if h.T > w.startTimestamp { + if !w.sendSamples { + w.sendSamples = true + duration := time.Since(w.startTime) + level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration) + } + histogramsToSend = append(histogramsToSend, h) + } + } + if len(histogramsToSend) > 0 { + w.writer.AppendHistograms(histogramsToSend) + histogramsToSend = histogramsToSend[:0] + } + case record.Tombstones: default: @@ -589,6 +622,8 @@ func recordType(rt record.Type) string { return "series" case record.Samples: return "samples" + case record.Histograms: + return "histograms" case record.Tombstones: return "tombstones" default: diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index 1c76ea585b..829ae1741a 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" @@ -53,6 +54,7 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) { type writeToMock struct { samplesAppended int exemplarsAppended int + histogramsAppended int seriesLock sync.Mutex seriesSegmentIndexes map[chunks.HeadSeriesRef]int } @@ -67,6 +69,11 @@ func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { return true } +func (wtm *writeToMock) AppendHistograms(h []record.RefHistogram) bool { + wtm.histogramsAppended += len(h) + return true +} + func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { wtm.UpdateSeriesSegment(series, index) } @@ -108,6 +115,7 @@ func TestTailSamples(t *testing.T) { const seriesCount = 10 const samplesCount = 250 const exemplarsCount = 25 + const histogramsCount = 50 for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { now := time.Now() @@ -160,6 +168,26 @@ func TestTailSamples(t *testing.T) { }, nil) require.NoError(t, w.Log(exemplar)) } + + for j := 0; j < histogramsCount; j++ { + inner := rand.Intn(ref + 1) + histogram := enc.Histograms([]record.RefHistogram{{ + Ref: chunks.HeadSeriesRef(inner), + T: now.UnixNano() + 1, + H: &histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{int64(i) + 1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{int64(-i) - 1}, + }, + }}, nil) + require.NoError(t, w.Log(histogram)) + } } // Start read after checkpoint, no more data written. @@ -167,7 +195,7 @@ func TestTailSamples(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true) watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. @@ -185,12 +213,14 @@ func TestTailSamples(t *testing.T) { expectedSeries := seriesCount expectedSamples := seriesCount * samplesCount expectedExemplars := seriesCount * exemplarsCount + expectedHistograms := seriesCount * histogramsCount retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumLabels() >= expectedSeries }) require.Equal(t, expectedSeries, wt.checkNumLabels(), "did not receive the expected number of series") require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") + require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") }) } } @@ -249,7 +279,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() expected := seriesCount @@ -338,7 +368,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() expected := seriesCount * 2 @@ -404,7 +434,7 @@ func TestReadCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() expectedSeries := seriesCount @@ -473,7 +503,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -545,7 +575,7 @@ func TestCheckpointSeriesReset(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = -1 go watcher.Start()