From 96fbf18201c2a1a2cd482e44b593a2c97506bc77 Mon Sep 17 00:00:00 2001 From: Poorna Date: Tue, 12 Sep 2023 21:59:15 -0700 Subject: [PATCH] replication: queue existing objects to same workers as incoming (#18020) Previously existing objects were queued to single worker and MRF re-queues are also handled by same worker - this does not fully use the available bandwidth in case there is no incoming workload. --- cmd/bucket-replication.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 73d8596ef..274a18085 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1661,9 +1661,8 @@ type ReplicationPool struct { resyncer *replicationResyncer // workers: - workers []chan ReplicationWorkerOperation - lrgworkers []chan ReplicationWorkerOperation - existingWorkers chan ReplicationWorkerOperation + workers []chan ReplicationWorkerOperation + lrgworkers []chan ReplicationWorkerOperation // mrf: mrfWorkerKillCh chan struct{} @@ -1723,8 +1722,6 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool pool := &ReplicationPool{ workers: make([]chan ReplicationWorkerOperation, 0, workers), lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount), - existingWorkers: make(chan ReplicationWorkerOperation, 100000), - mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000), mrfWorkerKillCh: make(chan struct{}, failedWorkers), resyncer: newresyncer(), @@ -1738,7 +1735,6 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool pool.AddLargeWorkers() pool.ResizeWorkers(workers, 0) pool.ResizeFailedWorkers(failedWorkers) - go pool.AddWorker(pool.existingWorkers, nil) go pool.resyncer.PersistToDisk(ctx, o) go pool.processMRF() go pool.persistMRF() @@ -1965,9 +1961,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { var ch, healCh chan<- ReplicationWorkerOperation switch ri.OpType { - case replication.ExistingObjectReplicationType: - ch = p.existingWorkers - case replication.HealReplicationType: + case replication.HealReplicationType, replication.ExistingObjectReplicationType: ch = p.mrfReplicaCh healCh = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size) default: @@ -2026,9 +2020,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf } var ch chan<- ReplicationWorkerOperation switch doi.OpType { - case replication.ExistingObjectReplicationType: - ch = p.existingWorkers - case replication.HealReplicationType: + case replication.HealReplicationType, replication.ExistingObjectReplicationType: fallthrough default: ch = p.getWorkerCh(doi.Bucket, doi.ObjectName, 0)