diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b7196f156c..0345498016 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -113,8 +113,8 @@ type StorageClient interface { Name() string } -// StorageQueueManagerConfig configures a storage queue. -type StorageQueueManagerConfig struct { +// QueueManagerConfig configures a storage queue. +type QueueManagerConfig struct { QueueCapacity int // Number of samples to buffer per shard before we start dropping them. Shards int // Number of shards, i.e. amount of concurrency. MaxSamplesPerSend int // Maximum number of samples per send. @@ -124,18 +124,18 @@ type StorageQueueManagerConfig struct { Client StorageClient } -// StorageQueueManager manages a queue of samples to be sent to the Storage +// QueueManager manages a queue of samples to be sent to the Storage // indicated by the provided StorageClient. -type StorageQueueManager struct { - cfg StorageQueueManagerConfig +type QueueManager struct { + cfg QueueManagerConfig shards []chan *model.Sample wg sync.WaitGroup done chan struct{} queueName string } -// NewStorageQueueManager builds a new StorageQueueManager. -func NewStorageQueueManager(cfg StorageQueueManagerConfig) *StorageQueueManager { +// NewQueueManager builds a new QueueManager. +func NewQueueManager(cfg QueueManagerConfig) *QueueManager { if cfg.QueueCapacity == 0 { cfg.QueueCapacity = defaultQueueCapacity } @@ -154,7 +154,7 @@ func NewStorageQueueManager(cfg StorageQueueManagerConfig) *StorageQueueManager shards[i] = make(chan *model.Sample, cfg.QueueCapacity) } - t := &StorageQueueManager{ + t := &QueueManager{ cfg: cfg, shards: shards, done: make(chan struct{}), @@ -169,7 +169,7 @@ func NewStorageQueueManager(cfg StorageQueueManagerConfig) *StorageQueueManager // Append queues a sample to be sent to the remote storage. It drops the // sample on the floor if the queue is full. // Always returns nil. -func (t *StorageQueueManager) Append(s *model.Sample) error { +func (t *QueueManager) Append(s *model.Sample) error { var snew model.Sample snew = *s snew.Metric = s.Metric.Clone() @@ -203,13 +203,13 @@ func (t *StorageQueueManager) Append(s *model.Sample) error { // NeedsThrottling implements storage.SampleAppender. It will always return // false as a remote storage drops samples on the floor if backlogging instead // of asking for throttling. -func (*StorageQueueManager) NeedsThrottling() bool { +func (*QueueManager) NeedsThrottling() bool { return false } // Start the queue manager sending samples to the remote storage. // Does not block. -func (t *StorageQueueManager) Start() { +func (t *QueueManager) Start() { for i := 0; i < t.cfg.Shards; i++ { go t.runShard(i) } @@ -217,7 +217,7 @@ func (t *StorageQueueManager) Start() { // Stop stops sending samples to the remote storage and waits for pending // sends to complete. -func (t *StorageQueueManager) Stop() { +func (t *QueueManager) Stop() { log.Infof("Stopping remote storage...") for _, shard := range t.shards { close(shard) @@ -226,7 +226,7 @@ func (t *StorageQueueManager) Stop() { log.Info("Remote storage stopped.") } -func (t *StorageQueueManager) runShard(i int) { +func (t *QueueManager) runShard(i int) { defer t.wg.Done() shard := t.shards[i] @@ -263,7 +263,7 @@ func (t *StorageQueueManager) runShard(i int) { } } -func (t *StorageQueueManager) sendSamples(s model.Samples) { +func (t *QueueManager) sendSamples(s model.Samples) { // Samples are sent to the remote storage on a best-effort basis. If a // sample isn't sent correctly the first time, it's simply dropped on the // floor. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 3908a5a02f..c843707ea2 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -97,7 +97,7 @@ func TestSampleDelivery(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - m := NewStorageQueueManager(StorageQueueManagerConfig{ + m := NewQueueManager(QueueManagerConfig{ Client: c, Shards: 1, }) @@ -134,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewStorageQueueManager(StorageQueueManagerConfig{ + m := NewQueueManager(QueueManagerConfig{ Client: c, // Ensure we don't drop samples in this test. QueueCapacity: n, @@ -184,7 +184,7 @@ func (c *TestBlockingStorageClient) Name() string { return "testblockingstorageclient" } -func (t *StorageQueueManager) queueLen() int { +func (t *QueueManager) queueLen() int { queueLength := 0 for _, shard := range t.shards { queueLength += len(shard) @@ -211,7 +211,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } c := NewTestBlockedStorageClient() - m := NewStorageQueueManager(StorageQueueManagerConfig{ + m := NewQueueManager(QueueManagerConfig{ Client: c, QueueCapacity: n, }) @@ -244,7 +244,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } if m.queueLen() != defaultMaxSamplesPerSend { - t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left", + t.Fatalf("Failed to drain QueueManager queue, %d elements left", m.queueLen(), ) } diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 2f9f58efaf..a53f866b3d 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -24,7 +24,7 @@ import ( // Storage allows queueing samples for remote writes. type Storage struct { mtx sync.RWMutex - queues []*StorageQueueManager + queues []*QueueManager } // ApplyConfig updates the state as the new config requires. @@ -32,7 +32,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() defer s.mtx.Unlock() - newQueues := []*StorageQueueManager{} + newQueues := []*QueueManager{} // TODO: we should only stop & recreate queues which have changes, // as this can be quite disruptive. for i, rwConf := range conf.RemoteWriteConfigs { @@ -40,7 +40,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { if err != nil { return err } - newQueues = append(newQueues, NewStorageQueueManager(StorageQueueManagerConfig{ + newQueues = append(newQueues, NewQueueManager(QueueManagerConfig{ Client: c, ExternalLabels: conf.GlobalConfig.ExternalLabels, RelabelConfigs: rwConf.WriteRelabelConfigs,