diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7cc8869e1..ed02e6672 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1509,6 +1509,10 @@ var ( // ReplicationPool describes replication pool type ReplicationPool struct { + // atomics: + activeWorkers int32 + activeMRFWorkers int32 + objLayer ObjectLayer ctx context.Context mrfWorkerKillCh chan struct{} @@ -1522,8 +1526,6 @@ type ReplicationPool struct { saveStateCh chan struct{} workerSize int mrfWorkerSize int - activeWorkers int32 - activeMRFWorkers int32 priority string resyncer *replicationResyncer workerWg sync.WaitGroup @@ -1761,22 +1763,27 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { default: globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) p.mu.RLock() - switch p.priority { + prio := p.priority + p.mu.RUnlock() + switch prio { case "fast": logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic"), string(replicationSubsystem)) case "slow": logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) default: if p.ActiveWorkers() < WorkerMaxLimit { + p.mu.RLock() workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) + p.mu.RUnlock() p.ResizeWorkers(workers) } if p.ActiveMRFWorkers() < MRFWorkerMaxLimit { + p.mu.RLock() workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit)) + p.mu.RUnlock() p.ResizeFailedWorkers(workers) } } - p.mu.RUnlock() } } @@ -1815,18 +1822,21 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf default: globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) p.mu.RLock() - switch p.priority { + prio := p.priority + p.mu.RUnlock() + switch prio { case "fast": logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes"), string(replicationSubsystem)) case "slow": logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) default: if p.ActiveWorkers() < WorkerMaxLimit { + p.mu.RLock() workers := int(math.Min(float64(p.workerSize+1), WorkerMaxLimit)) + p.mu.RUnlock() p.ResizeWorkers(workers) } } - p.mu.RUnlock() } }