diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3f5e59338b..591c5e9a24 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -124,42 +124,6 @@ func init() { prometheus.MustRegister(numShards) } -// QueueManagerConfig is the configuration for the queue used to write to remote -// storage. -type QueueManagerConfig struct { - // Number of samples to buffer per shard before we start dropping them. - QueueCapacity int - // Max number of shards, i.e. amount of concurrency. - MaxShards int - // Maximum number of samples per send. - MaxSamplesPerSend int - // Maximum time sample will wait in buffer. - BatchSendDeadline time.Duration - // Max number of times to retry a batch on recoverable errors. - MaxRetries int - // On recoverable errors, backoff exponentially. - MinBackoff time.Duration - MaxBackoff time.Duration -} - -// defaultQueueManagerConfig is the default remote queue configuration. -var defaultQueueManagerConfig = QueueManagerConfig{ - // With a maximum of 1000 shards, assuming an average of 100ms remote write - // time and 100 samples per batch, we will be able to push 1M samples/s. - MaxShards: 1000, - MaxSamplesPerSend: 100, - - // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At - // 1000 shards, this will buffer 100M samples total. - QueueCapacity: 100 * 1000, - BatchSendDeadline: 5 * time.Second, - - // Max number of times to retry a batch on recoverable errors. - MaxRetries: 10, - MinBackoff: 30 * time.Millisecond, - MaxBackoff: 100 * time.Millisecond, -} - // StorageClient defines an interface for sending a batch of samples to an // external timeseries database. type StorageClient interface { @@ -174,7 +138,7 @@ type StorageClient interface { type QueueManager struct { logger log.Logger - cfg QueueManagerConfig + cfg config.QueueConfig externalLabels model.LabelSet relabelConfigs []*config.RelabelConfig client StorageClient @@ -193,7 +157,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { +func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { if logger == nil { logger = log.NewNopLogger() } @@ -216,7 +180,7 @@ func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels m } t.shards = t.newShards(t.numShards) numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) - queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) + queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) return t } @@ -408,7 +372,7 @@ type shards struct { func (t *QueueManager) newShards(numShards int) *shards { queues := make([]chan *model.Sample, numShards) for i := 0; i < numShards; i++ { - queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) + queues[i] = make(chan *model.Sample, t.cfg.Capacity) } s := &shards{ qm: t, diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e621496cb8..c34dd94e95 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" ) type TestStorageClient struct { @@ -81,7 +82,7 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := defaultQueueManagerConfig.QueueCapacity * 2 + n := config.DefaultQueueConfig.Capacity * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -97,7 +98,7 @@ func TestSampleDelivery(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - cfg := defaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 m := NewQueueManager(nil, cfg, nil, nil, c) @@ -117,7 +118,7 @@ func TestSampleDelivery(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) { ts := 10 - n := defaultQueueManagerConfig.MaxSamplesPerSend * ts + n := config.DefaultQueueConfig.MaxSamplesPerSend * ts samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -133,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewQueueManager(nil, defaultQueueManagerConfig, nil, nil, c) + m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c) // These should be received by the client. for _, s := range samples { @@ -194,7 +195,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // `MaxSamplesPerSend*Shards` samples should be consumed by the // per-shard goroutines, and then another `MaxSamplesPerSend` // should be left on the queue. - n := defaultQueueManagerConfig.MaxSamplesPerSend * 2 + n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -208,9 +209,9 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } c := NewTestBlockedStorageClient() - cfg := defaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - cfg.QueueCapacity = n + cfg.Capacity = n m := NewQueueManager(nil, cfg, nil, nil, c) m.Start() @@ -240,7 +241,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { time.Sleep(10 * time.Millisecond) } - if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend { + if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend { t.Fatalf("Failed to drain QueueManager queue, %d elements left", m.queueLen(), ) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index e5bc3aaa5d..bb8c173720 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -68,7 +68,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { } newQueues = append(newQueues, NewQueueManager( s.logger, - defaultQueueManagerConfig, + config.DefaultQueueConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c,