Utility for collecting new gauge metrics (#9017)

This commit is contained in:
Mark Gritter 2020-05-19 17:54:43 -05:00 committed by GitHub
parent a8c2591d36
commit 72afe55906
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 826 additions and 0 deletions

View File

@ -0,0 +1,257 @@
package metricsutil
import (
"context"
"math/rand"
"sort"
"time"
log "github.com/hashicorp/go-hclog"
)
// This interface allows unit tests to substitute in a simulated clock.
type clock interface {
Now() time.Time
NewTicker(time.Duration) *time.Ticker
}
type defaultClock struct {
}
func (_ defaultClock) Now() time.Time {
return time.Now()
}
func (_ defaultClock) NewTicker(d time.Duration) *time.Ticker {
return time.NewTicker(d)
}
// GaugeLabelValues is one gauge in a set sharing a single key, that
// are measured in a batch.
type GaugeLabelValues struct {
Labels []Label
Value float32
}
// GaugeCollector is a callback function that returns an unfiltered
// set of label-value pairs. It may be cancelled if it takes too long.
type GaugeCollector = func(context.Context) ([]GaugeLabelValues, error)
// collectionBound is a hard limit on how long a collection process
// may take, as a fraction of the current interval.
const collectionBound = 0.02
// collectionTarget is a soft limit; if exceeded, the collection interval
// will be doubled.
const collectionTarget = 0.01
// A GaugeCollectionProcess is responsible for one particular gauge metric.
// It handles a delay on initial startup; limiting the cardinality; and
// exponential backoff on the requested interval.
type GaugeCollectionProcess struct {
stop chan struct{}
stopped chan struct{}
// gauge name
key []string
// labels to use when reporting
labels []Label
// callback function
collector GaugeCollector
// destination for metrics
sink *ClusterMetricSink
logger log.Logger
// time between collections
originalInterval time.Duration
currentInterval time.Duration
ticker *time.Ticker
// time source
clock clock
}
// NewGaugeCollectionProcess creates a new collection process for the callback
// function given as an argument, and starts it running.
// A label should be provided for metrics *about* this collection process.
//
// The Run() method must be called to start the process.
func (m *ClusterMetricSink) NewGaugeCollectionProcess(
key []string,
id []Label,
collector GaugeCollector,
logger log.Logger,
) (*GaugeCollectionProcess, error) {
return m.newGaugeCollectionProcessWithClock(
key,
id,
collector,
logger,
defaultClock{},
)
}
// test version allows an alternative clock implementation
func (m *ClusterMetricSink) newGaugeCollectionProcessWithClock(
key []string,
id []Label,
collector GaugeCollector,
logger log.Logger,
clock clock,
) (*GaugeCollectionProcess, error) {
process := &GaugeCollectionProcess{
stop: make(chan struct{}, 1),
stopped: make(chan struct{}, 1),
key: key,
labels: id,
collector: collector,
sink: m,
originalInterval: m.GaugeInterval,
currentInterval: m.GaugeInterval,
logger: logger,
clock: clock,
}
return process, nil
}
// delayStart randomly delays by up to one extra interval
// so that collection processes do not all run at the time time.
// If we knew all the procsses in advance, we could just schedule them
// evenly, but a new one could be added per secret engine.
func (p *GaugeCollectionProcess) delayStart() bool {
randomDelay := time.Duration(rand.Intn(int(p.currentInterval)))
// A Timer might be better, but then we'd have to simulate
// one of those too?
delayTick := p.clock.NewTicker(randomDelay)
defer delayTick.Stop()
select {
case <-p.stop:
return true
case <-delayTick.C:
break
}
return false
}
// resetTicker stops the old ticker and starts a new one at the current
// interval setting.
func (p *GaugeCollectionProcess) resetTicker() {
if p.ticker != nil {
p.ticker.Stop()
}
p.ticker = p.clock.NewTicker(p.currentInterval)
}
// collectAndFilterGauges executes the callback function,
// limits the cardinality, and streams the results to the metrics sink.
func (p *GaugeCollectionProcess) collectAndFilterGauges() {
// Run for only an allotted amount of time.
timeout := time.Duration(collectionBound * float64(p.currentInterval))
ctx, cancel := context.WithTimeout(context.Background(),
timeout)
defer cancel()
p.sink.AddDurationWithLabels([]string{"metrics", "collection", "interval"},
p.currentInterval,
p.labels)
start := p.clock.Now()
values, err := p.collector(ctx)
end := p.clock.Now()
duration := end.Sub(start)
// Report how long it took to perform the operation.
p.sink.AddDurationWithLabels([]string{"metrics", "collection"},
duration,
p.labels)
// If over threshold, back off by doubling the measurement interval.
// Currently a restart is the only way to bring it back down.
threshold := time.Duration(collectionTarget * float64(p.currentInterval))
if duration > threshold {
p.logger.Warn("gauge collection time exceeded target", "target", threshold, "actual", duration, "id", p.labels)
p.currentInterval *= 2
p.resetTicker()
}
if err != nil {
p.logger.Error("error collecting gauge", "id", p.labels, "error", err)
p.sink.IncrCounterWithLabels([]string{"metrics", "collection", "error"},
1,
p.labels)
return
}
// Filter to top N.
// This does not guarantee total cardinality is <= N, but it does slow things down
// a little if the cardinality *is* too high and the gauge needs to be disabled.
if len(values) > p.sink.MaxGaugeCardinality {
sort.Slice(values, func(a, b int) bool {
return values[a].Value > values[b].Value
})
values = values[:p.sink.MaxGaugeCardinality]
}
p.streamGaugesToSink(values)
}
func (p *GaugeCollectionProcess) streamGaugesToSink(values []GaugeLabelValues) {
// Dumping 500 metrics in one big chunk is somewhat unfriendly to UDP-based
// transport, and to the rest of the metrics trying to get through.
// Let's smooth things out over the course of a second.
// 1 second / 500 = 2 ms each, so we can send 25 per 50 milliseconds.
// That should be one or two packets.
sendTick := p.clock.NewTicker(50 * time.Millisecond)
batchSize := 25
for i, lv := range values {
if i > 0 && i%batchSize == 0 {
select {
case <-p.stop:
// because the channel is closed,
// the main loop will successfully
// read from p.stop too, and exit.
return
case <-sendTick.C:
break
}
}
p.sink.SetGaugeWithLabels(p.key, lv.Value, lv.Labels)
}
sendTick.Stop()
}
// Run should be called as a goroutine.
func (p *GaugeCollectionProcess) Run() {
defer close(p.stopped)
// Wait a random amount of time
stopReceived := p.delayStart()
if stopReceived {
return
}
// Create a ticker to start each cycle
p.resetTicker()
// Loop until we get a signal to stop
for {
select {
case <-p.ticker.C:
p.collectAndFilterGauges()
case <-p.stop:
// Can't use defer because this might
// not be the original ticker.
p.ticker.Stop()
return
}
}
}
// Stop the collection process
func (p *GaugeCollectionProcess) Stop() {
close(p.stop)
}

View File

@ -0,0 +1,558 @@
package metricsutil
import (
"context"
"errors"
"fmt"
"math/rand"
"reflect"
"sync/atomic"
"testing"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
)
// SimulatedTime maintains a virtual clock so the test isn't
// dependent upon real time.
// Unfortunately there is no way to run these tests in parallel
// since they rely on the same global timeNow function.
type SimulatedTime struct {
now time.Time
tickerBarrier chan *SimulatedTicker
}
var _ clock = &SimulatedTime{}
type SimulatedTicker struct {
ticker *time.Ticker
duration time.Duration
sender chan time.Time
}
func (s *SimulatedTime) Now() time.Time {
return s.now
}
func (s *SimulatedTime) NewTicker(d time.Duration) *time.Ticker {
// Create a real ticker, but set its duration to an amount that will never fire for real.
// We'll inject times into the channel directly.
replacementChannel := make(chan time.Time)
t := time.NewTicker(1000 * time.Hour)
t.C = replacementChannel
s.tickerBarrier <- &SimulatedTicker{t, d, replacementChannel}
return t
}
func (s *SimulatedTime) waitForTicker(t *testing.T) *SimulatedTicker {
t.Helper()
// System under test should create a ticker within 100ms,
// wait for it to show up or else fail the test.
timeout := time.After(100 * time.Millisecond)
select {
case <-timeout:
t.Fatal("Timeout waiting for ticker creation.")
return nil
case t := <-s.tickerBarrier:
return t
}
}
func (s *SimulatedTime) allowTickers(n int) {
s.tickerBarrier = make(chan *SimulatedTicker, n)
}
func startSimulatedTime() *SimulatedTime {
s := &SimulatedTime{
now: time.Now(),
tickerBarrier: make(chan *SimulatedTicker, 1),
}
return s
}
type SimulatedCollector struct {
numCalls uint32
callBarrier chan uint32
}
func newSimulatedCollector() *SimulatedCollector {
return &SimulatedCollector{
numCalls: 0,
callBarrier: make(chan uint32, 1),
}
}
func (s *SimulatedCollector) waitForCall(t *testing.T) {
timeout := time.After(100 * time.Millisecond)
select {
case <-timeout:
t.Fatal("Timeout waiting for call to collection function.")
return
case <-s.callBarrier:
return
}
}
func (s *SimulatedCollector) EmptyCollectionFunction(ctx context.Context) ([]GaugeLabelValues, error) {
atomic.AddUint32(&s.numCalls, 1)
s.callBarrier <- s.numCalls
return []GaugeLabelValues{}, nil
}
func TestGauge_Creation(t *testing.T) {
c := newSimulatedCollector()
sink := BlackholeSink()
sink.GaugeInterval = 33 * time.Minute
key := []string{"example", "count"}
labels := []Label{{"gauge", "test"}}
p, err := sink.NewGaugeCollectionProcess(
key,
labels,
c.EmptyCollectionFunction,
log.Default(),
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
if _, ok := p.clock.(defaultClock); !ok {
t.Error("Default clock not installed.")
}
if !reflect.DeepEqual(p.key, key) {
t.Errorf("Key not initialized, got %v but expected %v",
p.key, key)
}
if !reflect.DeepEqual(p.labels, labels) {
t.Errorf("Labels not initialized, got %v but expected %v",
p.key, key)
}
if p.originalInterval != sink.GaugeInterval || p.currentInterval != sink.GaugeInterval {
t.Errorf("Intervals not initialized, got %v and %v, expected %v",
p.originalInterval, p.currentInterval, sink.GaugeInterval)
}
}
func TestGauge_StartDelay(t *testing.T) {
// Work through an entire startup sequence, up to collecting
// the first batch of gauges.
s := startSimulatedTime()
c := newSimulatedCollector()
sink := BlackholeSink()
sink.GaugeInterval = 2 * time.Hour
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
c.EmptyCollectionFunction,
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
go p.Run()
delayTicker := s.waitForTicker(t)
if delayTicker.duration > sink.GaugeInterval {
t.Errorf("Delayed start %v is more than interval %v.",
delayTicker.duration, sink.GaugeInterval)
}
if c.numCalls > 0 {
t.Error("Collection function has been called")
}
// Signal the end of delay, then another ticker should start
delayTicker.sender <- time.Now()
intervalTicker := s.waitForTicker(t)
if intervalTicker.duration != sink.GaugeInterval {
t.Errorf("Ticker duration is %v, expected %v",
intervalTicker.duration, sink.GaugeInterval)
}
if c.numCalls > 0 {
t.Error("Collection function has been called")
}
// Time's up, ensure the collection function is executed.
intervalTicker.sender <- time.Now()
c.waitForCall(t)
if c.numCalls != 1 {
t.Errorf("Collection function called %v times, expected %v.", c.numCalls, 1)
}
p.Stop()
}
func waitForStopped(t *testing.T, p *GaugeCollectionProcess) {
t.Helper()
timeout := time.After(100 * time.Millisecond)
select {
case <-timeout:
t.Fatal("Timeout waiting for process to stop.")
case <-p.stopped:
return
}
}
func TestGauge_StoppedDuringInitialDelay(t *testing.T) {
// Stop the process before it gets into its main loop
s := startSimulatedTime()
c := newSimulatedCollector()
sink := BlackholeSink()
sink.GaugeInterval = 2 * time.Hour
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
c.EmptyCollectionFunction,
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
go p.Run()
// Stop during the initial delay, check that goroutine exits
s.waitForTicker(t)
p.Stop()
waitForStopped(t, p)
}
func TestGauge_StoppedAfterInitialDelay(t *testing.T) {
// Stop the process during its main loop
s := startSimulatedTime()
c := newSimulatedCollector()
sink := BlackholeSink()
sink.GaugeInterval = 2 * time.Hour
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
c.EmptyCollectionFunction,
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
go p.Run()
// Get through initial delay, wait for interval ticker
delayTicker := s.waitForTicker(t)
delayTicker.sender <- time.Now()
s.waitForTicker(t)
p.Stop()
waitForStopped(t, p)
}
func TestGauge_Backoff(t *testing.T) {
s := startSimulatedTime()
s.allowTickers(100)
c := newSimulatedCollector()
sink := BlackholeSink()
sink.GaugeInterval = 2 * time.Hour
threshold := time.Duration(int(sink.GaugeInterval) / 100)
f := func(ctx context.Context) ([]GaugeLabelValues, error) {
atomic.AddUint32(&c.numCalls, 1)
// Move time forward by more than 1% of the gauge interval
s.now = s.now.Add(threshold).Add(time.Second)
c.callBarrier <- c.numCalls
return []GaugeLabelValues{}, nil
}
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
f,
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
// Do not run, we'll just going to call an internal function.
p.collectAndFilterGauges()
if p.currentInterval != 2*p.originalInterval {
t.Errorf("Current interval is %v, should be 2x%v.",
p.currentInterval,
p.originalInterval)
}
}
func TestGauge_RestartTimer(t *testing.T) {
s := startSimulatedTime()
c := newSimulatedCollector()
sink := BlackholeSink()
sink.GaugeInterval = 2 * time.Hour
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
c.EmptyCollectionFunction,
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
p.resetTicker()
t1 := s.waitForTicker(t)
if t1.duration != p.currentInterval {
t.Fatalf("Bad ticker interval, got %v expected %v",
t1.duration, p.currentInterval)
}
p.currentInterval = 4 * p.originalInterval
p.resetTicker()
t2 := s.waitForTicker(t)
if t2.duration != p.currentInterval {
t.Fatalf("Bad ticker interval, got %v expected %v",
t1.duration, p.currentInterval)
}
}
func waitForDone(t *testing.T,
tick chan<- time.Time,
done <-chan struct{},
) int {
t.Helper()
timeout := time.After(100 * time.Millisecond)
numTicks := 0
for {
select {
case <-timeout:
t.Fatal("Timeout waiting for metrics to be sent.")
case tick <- time.Now():
numTicks += 1
case <-done:
return numTicks
}
}
}
func makeLabels(numLabels int) []GaugeLabelValues {
values := make([]GaugeLabelValues, numLabels)
for i := range values {
values[i].Labels = []Label{
{"test", "true"},
{"which", fmt.Sprintf("%v", i)},
}
values[i].Value = float32(i + 1)
}
return values
}
func TestGauge_InterruptedStreaming(t *testing.T) {
s := startSimulatedTime()
// Long bucket time == low chance of crossing interval
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
sink := &ClusterMetricSink{
ClusterName: "test",
MaxGaugeCardinality: 500,
GaugeInterval: 2 * time.Hour,
Sink: inmemSink,
}
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
nil, // shouldn't be called
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
// We'll queue up at least two batches; only one will be sent
// unless we give a ticker.
values := makeLabels(75)
done := make(chan struct{})
go func() {
p.streamGaugesToSink(values)
close(done)
}()
p.Stop()
// a nil channel is never writeable
waitForDone(t, nil, done)
// If we start close to the end of an interval, metrics will
// be split across two buckets.
intervals := inmemSink.Data()
if len(intervals) > 1 {
t.Skip("Detected interval crossing.")
}
if len(intervals[0].Gauges) == len(values) {
t.Errorf("Found %v gauges, expected fewer.",
len(intervals[0].Gauges))
}
}
// helper function to create a closure that's a GaugeCollector.
func (c *SimulatedCollector) makeFunctionForValues(
values []GaugeLabelValues,
s *SimulatedTime,
advanceTime time.Duration,
) GaugeCollector {
// A function that returns a static list
return func(ctx context.Context) ([]GaugeLabelValues, error) {
atomic.AddUint32(&c.numCalls, 1)
// TODO: this seems like a data race?
s.now = s.now.Add(advanceTime)
c.callBarrier <- c.numCalls
return values, nil
}
}
func TestGauge_MaximumMeasurements(t *testing.T) {
s := startSimulatedTime()
c := newSimulatedCollector()
// Long bucket time == low chance of crossing interval
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
sink := &ClusterMetricSink{
ClusterName: "test",
MaxGaugeCardinality: 500,
GaugeInterval: 2 * time.Hour,
Sink: inmemSink,
}
// Create a report larger than the default limit
excessGauges := 100
values := makeLabels(sink.MaxGaugeCardinality + excessGauges)
rand.Shuffle(len(values), func(i, j int) {
values[i], values[j] = values[j], values[i]
})
// Advance time by 0.5% of duration
advance := time.Duration(int(0.005 * float32(sink.GaugeInterval)))
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
c.makeFunctionForValues(values, s, advance),
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
// This needs a ticker in order to do its thing,
// so run it in the background and we'll send the ticks
// from here.
done := make(chan struct{}, 1)
go func() {
p.collectAndFilterGauges()
close(done)
}()
sendTicker := s.waitForTicker(t)
numTicksSent := waitForDone(t, sendTicker.sender, done)
// 500 items, one delay after after each 25, means that
// 19 ticks are consumed, so 19 or 20 must be sent.
expectedTicks := sink.MaxGaugeCardinality/25 - 1
if numTicksSent < expectedTicks || numTicksSent > expectedTicks+1 {
t.Errorf("Number of ticks = %v, expected %v.", numTicksSent, expectedTicks)
}
// If we start close to the end of an interval, metrics will
// be split across two buckets.
intervals := inmemSink.Data()
if len(intervals) > 1 {
t.Skip("Detected interval crossing.")
}
if len(intervals[0].Gauges) != sink.MaxGaugeCardinality {
t.Errorf("Found %v gauges, expected %v.",
len(intervals[0].Gauges),
sink.MaxGaugeCardinality)
}
minVal := float32(excessGauges)
for _, v := range intervals[0].Gauges {
if v.Value < minVal {
t.Errorf("Gauge %v with value %v should not have been included.", v.Labels, v.Value)
break
}
}
}
func TestGauge_MeasurementError(t *testing.T) {
s := startSimulatedTime()
c := newSimulatedCollector()
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
sink := &ClusterMetricSink{
ClusterName: "test",
MaxGaugeCardinality: 500,
GaugeInterval: 2 * time.Hour,
Sink: inmemSink,
}
// Create a small report so we don't have to deal with batching.
numGauges := 10
values := make([]GaugeLabelValues, numGauges)
for i := range values {
values[i].Labels = []Label{
{"test", "true"},
{"which", fmt.Sprintf("%v", i)},
}
values[i].Value = float32(i + 1)
}
f := func(ctx context.Context) ([]GaugeLabelValues, error) {
atomic.AddUint32(&c.numCalls, 1)
c.callBarrier <- c.numCalls
return values, errors.New("test error")
}
p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
f,
log.Default(),
s,
)
if err != nil {
t.Fatalf("Error creating collection process: %v", err)
}
p.collectAndFilterGauges()
// We should see no data in the sink
intervals := inmemSink.Data()
if len(intervals) > 1 {
t.Skip("Detected interval crossing.")
}
if len(intervals[0].Gauges) != 0 {
t.Errorf("Found %v gauges, expected %v.",
len(intervals[0].Gauges), 0)
}
}

View File

@ -42,6 +42,17 @@ func (m *ClusterMetricSink) AddSampleWithLabels(key []string, val float32, label
append(labels, Label{"cluster", m.ClusterName}))
}
func (m *ClusterMetricSink) AddDurationWithLabels(key []string, d time.Duration, labels []Label) {
val := float32(d) / float32(time.Millisecond)
m.AddSampleWithLabels(key, val, labels)
}
func (m *ClusterMetricSink) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
elapsed := time.Now().Sub(start)
val := float32(elapsed) / float32(time.Millisecond)
m.AddSampleWithLabels(key, val, labels)
}
// BlackholeSink is a default suitable for use in unit tests.
func BlackholeSink() *ClusterMetricSink {
return &ClusterMetricSink{