From 72afe55906b8413ff81b830ebbd4e049ab64a91a Mon Sep 17 00:00:00 2001 From: Mark Gritter Date: Tue, 19 May 2020 17:54:43 -0500 Subject: [PATCH] Utility for collecting new gauge metrics (#9017) --- helper/metricsutil/gauge_process.go | 257 +++++++++++ helper/metricsutil/gauge_process_test.go | 558 +++++++++++++++++++++++ helper/metricsutil/wrapped_metrics.go | 11 + 3 files changed, 826 insertions(+) create mode 100644 helper/metricsutil/gauge_process.go create mode 100644 helper/metricsutil/gauge_process_test.go diff --git a/helper/metricsutil/gauge_process.go b/helper/metricsutil/gauge_process.go new file mode 100644 index 0000000000..7dced692ec --- /dev/null +++ b/helper/metricsutil/gauge_process.go @@ -0,0 +1,257 @@ +package metricsutil + +import ( + "context" + "math/rand" + "sort" + "time" + + log "github.com/hashicorp/go-hclog" +) + +// This interface allows unit tests to substitute in a simulated clock. +type clock interface { + Now() time.Time + NewTicker(time.Duration) *time.Ticker +} + +type defaultClock struct { +} + +func (_ defaultClock) Now() time.Time { + return time.Now() +} + +func (_ defaultClock) NewTicker(d time.Duration) *time.Ticker { + return time.NewTicker(d) +} + +// GaugeLabelValues is one gauge in a set sharing a single key, that +// are measured in a batch. +type GaugeLabelValues struct { + Labels []Label + Value float32 +} + +// GaugeCollector is a callback function that returns an unfiltered +// set of label-value pairs. It may be cancelled if it takes too long. +type GaugeCollector = func(context.Context) ([]GaugeLabelValues, error) + +// collectionBound is a hard limit on how long a collection process +// may take, as a fraction of the current interval. +const collectionBound = 0.02 + +// collectionTarget is a soft limit; if exceeded, the collection interval +// will be doubled. +const collectionTarget = 0.01 + +// A GaugeCollectionProcess is responsible for one particular gauge metric. +// It handles a delay on initial startup; limiting the cardinality; and +// exponential backoff on the requested interval. +type GaugeCollectionProcess struct { + stop chan struct{} + stopped chan struct{} + + // gauge name + key []string + // labels to use when reporting + labels []Label + + // callback function + collector GaugeCollector + + // destination for metrics + sink *ClusterMetricSink + logger log.Logger + + // time between collections + originalInterval time.Duration + currentInterval time.Duration + ticker *time.Ticker + + // time source + clock clock +} + +// NewGaugeCollectionProcess creates a new collection process for the callback +// function given as an argument, and starts it running. +// A label should be provided for metrics *about* this collection process. +// +// The Run() method must be called to start the process. +func (m *ClusterMetricSink) NewGaugeCollectionProcess( + key []string, + id []Label, + collector GaugeCollector, + logger log.Logger, +) (*GaugeCollectionProcess, error) { + return m.newGaugeCollectionProcessWithClock( + key, + id, + collector, + logger, + defaultClock{}, + ) +} + +// test version allows an alternative clock implementation +func (m *ClusterMetricSink) newGaugeCollectionProcessWithClock( + key []string, + id []Label, + collector GaugeCollector, + logger log.Logger, + clock clock, +) (*GaugeCollectionProcess, error) { + process := &GaugeCollectionProcess{ + stop: make(chan struct{}, 1), + stopped: make(chan struct{}, 1), + key: key, + labels: id, + collector: collector, + sink: m, + originalInterval: m.GaugeInterval, + currentInterval: m.GaugeInterval, + logger: logger, + clock: clock, + } + return process, nil +} + +// delayStart randomly delays by up to one extra interval +// so that collection processes do not all run at the time time. +// If we knew all the procsses in advance, we could just schedule them +// evenly, but a new one could be added per secret engine. +func (p *GaugeCollectionProcess) delayStart() bool { + randomDelay := time.Duration(rand.Intn(int(p.currentInterval))) + // A Timer might be better, but then we'd have to simulate + // one of those too? + delayTick := p.clock.NewTicker(randomDelay) + defer delayTick.Stop() + + select { + case <-p.stop: + return true + case <-delayTick.C: + break + } + return false +} + +// resetTicker stops the old ticker and starts a new one at the current +// interval setting. +func (p *GaugeCollectionProcess) resetTicker() { + if p.ticker != nil { + p.ticker.Stop() + } + p.ticker = p.clock.NewTicker(p.currentInterval) +} + +// collectAndFilterGauges executes the callback function, +// limits the cardinality, and streams the results to the metrics sink. +func (p *GaugeCollectionProcess) collectAndFilterGauges() { + // Run for only an allotted amount of time. + timeout := time.Duration(collectionBound * float64(p.currentInterval)) + ctx, cancel := context.WithTimeout(context.Background(), + timeout) + defer cancel() + + p.sink.AddDurationWithLabels([]string{"metrics", "collection", "interval"}, + p.currentInterval, + p.labels) + + start := p.clock.Now() + values, err := p.collector(ctx) + end := p.clock.Now() + duration := end.Sub(start) + + // Report how long it took to perform the operation. + p.sink.AddDurationWithLabels([]string{"metrics", "collection"}, + duration, + p.labels) + + // If over threshold, back off by doubling the measurement interval. + // Currently a restart is the only way to bring it back down. + threshold := time.Duration(collectionTarget * float64(p.currentInterval)) + if duration > threshold { + p.logger.Warn("gauge collection time exceeded target", "target", threshold, "actual", duration, "id", p.labels) + p.currentInterval *= 2 + p.resetTicker() + } + + if err != nil { + p.logger.Error("error collecting gauge", "id", p.labels, "error", err) + p.sink.IncrCounterWithLabels([]string{"metrics", "collection", "error"}, + 1, + p.labels) + return + } + + // Filter to top N. + // This does not guarantee total cardinality is <= N, but it does slow things down + // a little if the cardinality *is* too high and the gauge needs to be disabled. + if len(values) > p.sink.MaxGaugeCardinality { + sort.Slice(values, func(a, b int) bool { + return values[a].Value > values[b].Value + }) + values = values[:p.sink.MaxGaugeCardinality] + } + + p.streamGaugesToSink(values) +} + +func (p *GaugeCollectionProcess) streamGaugesToSink(values []GaugeLabelValues) { + // Dumping 500 metrics in one big chunk is somewhat unfriendly to UDP-based + // transport, and to the rest of the metrics trying to get through. + // Let's smooth things out over the course of a second. + // 1 second / 500 = 2 ms each, so we can send 25 per 50 milliseconds. + // That should be one or two packets. + sendTick := p.clock.NewTicker(50 * time.Millisecond) + batchSize := 25 + for i, lv := range values { + if i > 0 && i%batchSize == 0 { + select { + case <-p.stop: + // because the channel is closed, + // the main loop will successfully + // read from p.stop too, and exit. + return + case <-sendTick.C: + break + } + + } + p.sink.SetGaugeWithLabels(p.key, lv.Value, lv.Labels) + } + sendTick.Stop() +} + +// Run should be called as a goroutine. +func (p *GaugeCollectionProcess) Run() { + defer close(p.stopped) + + // Wait a random amount of time + stopReceived := p.delayStart() + if stopReceived { + return + } + + // Create a ticker to start each cycle + p.resetTicker() + + // Loop until we get a signal to stop + for { + select { + case <-p.ticker.C: + p.collectAndFilterGauges() + case <-p.stop: + // Can't use defer because this might + // not be the original ticker. + p.ticker.Stop() + return + } + } +} + +// Stop the collection process +func (p *GaugeCollectionProcess) Stop() { + close(p.stop) +} diff --git a/helper/metricsutil/gauge_process_test.go b/helper/metricsutil/gauge_process_test.go new file mode 100644 index 0000000000..cf739df9ec --- /dev/null +++ b/helper/metricsutil/gauge_process_test.go @@ -0,0 +1,558 @@ +package metricsutil + +import ( + "context" + "errors" + "fmt" + "math/rand" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/armon/go-metrics" + log "github.com/hashicorp/go-hclog" +) + +// SimulatedTime maintains a virtual clock so the test isn't +// dependent upon real time. +// Unfortunately there is no way to run these tests in parallel +// since they rely on the same global timeNow function. +type SimulatedTime struct { + now time.Time + tickerBarrier chan *SimulatedTicker +} + +var _ clock = &SimulatedTime{} + +type SimulatedTicker struct { + ticker *time.Ticker + duration time.Duration + sender chan time.Time +} + +func (s *SimulatedTime) Now() time.Time { + return s.now +} + +func (s *SimulatedTime) NewTicker(d time.Duration) *time.Ticker { + // Create a real ticker, but set its duration to an amount that will never fire for real. + // We'll inject times into the channel directly. + replacementChannel := make(chan time.Time) + t := time.NewTicker(1000 * time.Hour) + t.C = replacementChannel + s.tickerBarrier <- &SimulatedTicker{t, d, replacementChannel} + return t +} + +func (s *SimulatedTime) waitForTicker(t *testing.T) *SimulatedTicker { + t.Helper() + // System under test should create a ticker within 100ms, + // wait for it to show up or else fail the test. + timeout := time.After(100 * time.Millisecond) + select { + case <-timeout: + t.Fatal("Timeout waiting for ticker creation.") + return nil + case t := <-s.tickerBarrier: + return t + } +} + +func (s *SimulatedTime) allowTickers(n int) { + s.tickerBarrier = make(chan *SimulatedTicker, n) +} + +func startSimulatedTime() *SimulatedTime { + s := &SimulatedTime{ + now: time.Now(), + tickerBarrier: make(chan *SimulatedTicker, 1), + } + return s +} + +type SimulatedCollector struct { + numCalls uint32 + callBarrier chan uint32 +} + +func newSimulatedCollector() *SimulatedCollector { + return &SimulatedCollector{ + numCalls: 0, + callBarrier: make(chan uint32, 1), + } +} + +func (s *SimulatedCollector) waitForCall(t *testing.T) { + timeout := time.After(100 * time.Millisecond) + select { + case <-timeout: + t.Fatal("Timeout waiting for call to collection function.") + return + case <-s.callBarrier: + return + } +} + +func (s *SimulatedCollector) EmptyCollectionFunction(ctx context.Context) ([]GaugeLabelValues, error) { + atomic.AddUint32(&s.numCalls, 1) + s.callBarrier <- s.numCalls + return []GaugeLabelValues{}, nil +} + +func TestGauge_Creation(t *testing.T) { + c := newSimulatedCollector() + sink := BlackholeSink() + sink.GaugeInterval = 33 * time.Minute + + key := []string{"example", "count"} + labels := []Label{{"gauge", "test"}} + + p, err := sink.NewGaugeCollectionProcess( + key, + labels, + c.EmptyCollectionFunction, + log.Default(), + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + + if _, ok := p.clock.(defaultClock); !ok { + t.Error("Default clock not installed.") + } + + if !reflect.DeepEqual(p.key, key) { + t.Errorf("Key not initialized, got %v but expected %v", + p.key, key) + } + + if !reflect.DeepEqual(p.labels, labels) { + t.Errorf("Labels not initialized, got %v but expected %v", + p.key, key) + } + + if p.originalInterval != sink.GaugeInterval || p.currentInterval != sink.GaugeInterval { + t.Errorf("Intervals not initialized, got %v and %v, expected %v", + p.originalInterval, p.currentInterval, sink.GaugeInterval) + } +} + +func TestGauge_StartDelay(t *testing.T) { + // Work through an entire startup sequence, up to collecting + // the first batch of gauges. + s := startSimulatedTime() + c := newSimulatedCollector() + + sink := BlackholeSink() + sink.GaugeInterval = 2 * time.Hour + + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + c.EmptyCollectionFunction, + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + go p.Run() + + delayTicker := s.waitForTicker(t) + if delayTicker.duration > sink.GaugeInterval { + t.Errorf("Delayed start %v is more than interval %v.", + delayTicker.duration, sink.GaugeInterval) + } + if c.numCalls > 0 { + t.Error("Collection function has been called") + } + + // Signal the end of delay, then another ticker should start + delayTicker.sender <- time.Now() + + intervalTicker := s.waitForTicker(t) + if intervalTicker.duration != sink.GaugeInterval { + t.Errorf("Ticker duration is %v, expected %v", + intervalTicker.duration, sink.GaugeInterval) + } + if c.numCalls > 0 { + t.Error("Collection function has been called") + } + + // Time's up, ensure the collection function is executed. + intervalTicker.sender <- time.Now() + c.waitForCall(t) + if c.numCalls != 1 { + t.Errorf("Collection function called %v times, expected %v.", c.numCalls, 1) + } + + p.Stop() +} + +func waitForStopped(t *testing.T, p *GaugeCollectionProcess) { + t.Helper() + timeout := time.After(100 * time.Millisecond) + select { + case <-timeout: + t.Fatal("Timeout waiting for process to stop.") + case <-p.stopped: + return + } +} + +func TestGauge_StoppedDuringInitialDelay(t *testing.T) { + // Stop the process before it gets into its main loop + s := startSimulatedTime() + c := newSimulatedCollector() + + sink := BlackholeSink() + sink.GaugeInterval = 2 * time.Hour + + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + c.EmptyCollectionFunction, + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + go p.Run() + + // Stop during the initial delay, check that goroutine exits + s.waitForTicker(t) + p.Stop() + waitForStopped(t, p) +} + +func TestGauge_StoppedAfterInitialDelay(t *testing.T) { + // Stop the process during its main loop + s := startSimulatedTime() + c := newSimulatedCollector() + + sink := BlackholeSink() + sink.GaugeInterval = 2 * time.Hour + + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + c.EmptyCollectionFunction, + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + go p.Run() + + // Get through initial delay, wait for interval ticker + delayTicker := s.waitForTicker(t) + delayTicker.sender <- time.Now() + + s.waitForTicker(t) + p.Stop() + waitForStopped(t, p) +} + +func TestGauge_Backoff(t *testing.T) { + s := startSimulatedTime() + s.allowTickers(100) + + c := newSimulatedCollector() + + sink := BlackholeSink() + sink.GaugeInterval = 2 * time.Hour + + threshold := time.Duration(int(sink.GaugeInterval) / 100) + f := func(ctx context.Context) ([]GaugeLabelValues, error) { + atomic.AddUint32(&c.numCalls, 1) + // Move time forward by more than 1% of the gauge interval + s.now = s.now.Add(threshold).Add(time.Second) + c.callBarrier <- c.numCalls + return []GaugeLabelValues{}, nil + } + + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + f, + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + // Do not run, we'll just going to call an internal function. + p.collectAndFilterGauges() + + if p.currentInterval != 2*p.originalInterval { + t.Errorf("Current interval is %v, should be 2x%v.", + p.currentInterval, + p.originalInterval) + } +} + +func TestGauge_RestartTimer(t *testing.T) { + s := startSimulatedTime() + c := newSimulatedCollector() + sink := BlackholeSink() + sink.GaugeInterval = 2 * time.Hour + + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + c.EmptyCollectionFunction, + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + + p.resetTicker() + t1 := s.waitForTicker(t) + if t1.duration != p.currentInterval { + t.Fatalf("Bad ticker interval, got %v expected %v", + t1.duration, p.currentInterval) + } + + p.currentInterval = 4 * p.originalInterval + p.resetTicker() + t2 := s.waitForTicker(t) + if t2.duration != p.currentInterval { + t.Fatalf("Bad ticker interval, got %v expected %v", + t1.duration, p.currentInterval) + } +} + +func waitForDone(t *testing.T, + tick chan<- time.Time, + done <-chan struct{}, +) int { + t.Helper() + timeout := time.After(100 * time.Millisecond) + + numTicks := 0 + for { + select { + case <-timeout: + t.Fatal("Timeout waiting for metrics to be sent.") + case tick <- time.Now(): + numTicks += 1 + case <-done: + return numTicks + } + } +} + +func makeLabels(numLabels int) []GaugeLabelValues { + values := make([]GaugeLabelValues, numLabels) + for i := range values { + values[i].Labels = []Label{ + {"test", "true"}, + {"which", fmt.Sprintf("%v", i)}, + } + values[i].Value = float32(i + 1) + } + return values +} + +func TestGauge_InterruptedStreaming(t *testing.T) { + s := startSimulatedTime() + // Long bucket time == low chance of crossing interval + inmemSink := metrics.NewInmemSink( + 1000000*time.Hour, + 2000000*time.Hour) + + sink := &ClusterMetricSink{ + ClusterName: "test", + MaxGaugeCardinality: 500, + GaugeInterval: 2 * time.Hour, + Sink: inmemSink, + } + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + nil, // shouldn't be called + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + + // We'll queue up at least two batches; only one will be sent + // unless we give a ticker. + values := makeLabels(75) + done := make(chan struct{}) + go func() { + p.streamGaugesToSink(values) + close(done) + }() + + p.Stop() + // a nil channel is never writeable + waitForDone(t, nil, done) + + // If we start close to the end of an interval, metrics will + // be split across two buckets. + intervals := inmemSink.Data() + if len(intervals) > 1 { + t.Skip("Detected interval crossing.") + } + + if len(intervals[0].Gauges) == len(values) { + t.Errorf("Found %v gauges, expected fewer.", + len(intervals[0].Gauges)) + } + +} + +// helper function to create a closure that's a GaugeCollector. +func (c *SimulatedCollector) makeFunctionForValues( + values []GaugeLabelValues, + s *SimulatedTime, + advanceTime time.Duration, +) GaugeCollector { + // A function that returns a static list + return func(ctx context.Context) ([]GaugeLabelValues, error) { + atomic.AddUint32(&c.numCalls, 1) + // TODO: this seems like a data race? + s.now = s.now.Add(advanceTime) + c.callBarrier <- c.numCalls + return values, nil + } +} + +func TestGauge_MaximumMeasurements(t *testing.T) { + s := startSimulatedTime() + c := newSimulatedCollector() + + // Long bucket time == low chance of crossing interval + inmemSink := metrics.NewInmemSink( + 1000000*time.Hour, + 2000000*time.Hour) + + sink := &ClusterMetricSink{ + ClusterName: "test", + MaxGaugeCardinality: 500, + GaugeInterval: 2 * time.Hour, + Sink: inmemSink, + } + + // Create a report larger than the default limit + excessGauges := 100 + values := makeLabels(sink.MaxGaugeCardinality + excessGauges) + rand.Shuffle(len(values), func(i, j int) { + values[i], values[j] = values[j], values[i] + }) + + // Advance time by 0.5% of duration + advance := time.Duration(int(0.005 * float32(sink.GaugeInterval))) + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + c.makeFunctionForValues(values, s, advance), + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + + // This needs a ticker in order to do its thing, + // so run it in the background and we'll send the ticks + // from here. + done := make(chan struct{}, 1) + go func() { + p.collectAndFilterGauges() + close(done) + }() + + sendTicker := s.waitForTicker(t) + numTicksSent := waitForDone(t, sendTicker.sender, done) + + // 500 items, one delay after after each 25, means that + // 19 ticks are consumed, so 19 or 20 must be sent. + expectedTicks := sink.MaxGaugeCardinality/25 - 1 + if numTicksSent < expectedTicks || numTicksSent > expectedTicks+1 { + t.Errorf("Number of ticks = %v, expected %v.", numTicksSent, expectedTicks) + } + + // If we start close to the end of an interval, metrics will + // be split across two buckets. + intervals := inmemSink.Data() + if len(intervals) > 1 { + t.Skip("Detected interval crossing.") + } + + if len(intervals[0].Gauges) != sink.MaxGaugeCardinality { + t.Errorf("Found %v gauges, expected %v.", + len(intervals[0].Gauges), + sink.MaxGaugeCardinality) + } + + minVal := float32(excessGauges) + for _, v := range intervals[0].Gauges { + if v.Value < minVal { + t.Errorf("Gauge %v with value %v should not have been included.", v.Labels, v.Value) + break + } + } +} + +func TestGauge_MeasurementError(t *testing.T) { + s := startSimulatedTime() + c := newSimulatedCollector() + inmemSink := metrics.NewInmemSink( + 1000000*time.Hour, + 2000000*time.Hour) + sink := &ClusterMetricSink{ + ClusterName: "test", + MaxGaugeCardinality: 500, + GaugeInterval: 2 * time.Hour, + Sink: inmemSink, + } + // Create a small report so we don't have to deal with batching. + numGauges := 10 + values := make([]GaugeLabelValues, numGauges) + for i := range values { + values[i].Labels = []Label{ + {"test", "true"}, + {"which", fmt.Sprintf("%v", i)}, + } + values[i].Value = float32(i + 1) + } + + f := func(ctx context.Context) ([]GaugeLabelValues, error) { + atomic.AddUint32(&c.numCalls, 1) + c.callBarrier <- c.numCalls + return values, errors.New("test error") + } + + p, err := sink.newGaugeCollectionProcessWithClock( + []string{"example", "count"}, + []Label{{"gauge", "test"}}, + f, + log.Default(), + s, + ) + if err != nil { + t.Fatalf("Error creating collection process: %v", err) + } + + p.collectAndFilterGauges() + + // We should see no data in the sink + intervals := inmemSink.Data() + if len(intervals) > 1 { + t.Skip("Detected interval crossing.") + } + + if len(intervals[0].Gauges) != 0 { + t.Errorf("Found %v gauges, expected %v.", + len(intervals[0].Gauges), 0) + } +} diff --git a/helper/metricsutil/wrapped_metrics.go b/helper/metricsutil/wrapped_metrics.go index 5d4b26e549..dc60560fbd 100644 --- a/helper/metricsutil/wrapped_metrics.go +++ b/helper/metricsutil/wrapped_metrics.go @@ -42,6 +42,17 @@ func (m *ClusterMetricSink) AddSampleWithLabels(key []string, val float32, label append(labels, Label{"cluster", m.ClusterName})) } +func (m *ClusterMetricSink) AddDurationWithLabels(key []string, d time.Duration, labels []Label) { + val := float32(d) / float32(time.Millisecond) + m.AddSampleWithLabels(key, val, labels) +} + +func (m *ClusterMetricSink) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) { + elapsed := time.Now().Sub(start) + val := float32(elapsed) / float32(time.Millisecond) + m.AddSampleWithLabels(key, val, labels) +} + // BlackholeSink is a default suitable for use in unit tests. func BlackholeSink() *ClusterMetricSink { return &ClusterMetricSink{