From f986b0c49314b48d964cb21fbbce22e43fe7ecd8 Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Fri, 24 Feb 2023 12:07:34 -0800 Subject: [PATCH] replication: perform bucket resync in parallel (#16707) Default number of parallel resync operations for a bucket to 10 to speed up resync. --- cmd/bucket-replication.go | 173 ++++++++++++++++++++++---------------- 1 file changed, 99 insertions(+), 74 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index a19a883f0..b0847f95b 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -49,6 +49,7 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/workers" "github.com/zeebo/xxh3" ) @@ -2307,7 +2308,10 @@ func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI Objec } } -const resyncWorkerCnt = 50 // limit of number of bucket resyncs is progress at any given time +const ( + resyncWorkerCnt = 10 // limit of number of bucket resyncs is progress at any given time + resyncParallelRoutines = 10 // number of parallel resync ops per bucket +) func newresyncer() *replicationResyncer { rs := replicationResyncer{ @@ -2322,6 +2326,36 @@ func newresyncer() *replicationResyncer { return &rs } +// mark status of replication resync on remote target for the bucket +func (s *replicationResyncer) markStatus(status ResyncStatusType, opts resyncOpts) { + s.Lock() + defer s.Unlock() + + m := s.statusMap[opts.bucket] + st := m.TargetsMap[opts.arn] + st.LastUpdate = UTCNow() + st.ResyncStatus = status + m.TargetsMap[opts.arn] = st + m.LastUpdate = UTCNow() + s.statusMap[opts.bucket] = m +} + +// update replication resync stats for bucket's remote target +func (s *replicationResyncer) incStats(ts TargetReplicationResyncStatus, opts resyncOpts) { + s.Lock() + defer s.Unlock() + m := s.statusMap[opts.bucket] + st := m.TargetsMap[opts.arn] + st.Object = ts.Object + st.ReplicatedCount += ts.ReplicatedCount + st.FailedCount += ts.FailedCount + st.ReplicatedSize += ts.ReplicatedSize + st.FailedSize += ts.FailedSize + m.TargetsMap[opts.arn] = st + m.LastUpdate = UTCNow() + s.statusMap[opts.bucket] = m +} + // resyncBucket resyncs all qualifying objects as per replication rules for the target // ARN func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) { @@ -2333,15 +2367,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object resyncStatus := ResyncFailed defer func() { - s.Lock() - m := s.statusMap[opts.bucket] - st := m.TargetsMap[opts.arn] - st.LastUpdate = UTCNow() - st.ResyncStatus = resyncStatus - m.TargetsMap[opts.arn] = st - m.LastUpdate = UTCNow() - s.statusMap[opts.bucket] = m - s.Unlock() + s.markStatus(resyncStatus, opts) globalSiteResyncMetrics.incBucket(opts, resyncStatus) s.workerCh <- struct{}{} }() @@ -2377,15 +2403,9 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object } // mark resync status as resync started if !heal { - s.Lock() - m := s.statusMap[opts.bucket] - st := m.TargetsMap[opts.arn] - st.ResyncStatus = ResyncStarted - m.TargetsMap[opts.arn] = st - m.LastUpdate = UTCNow() - s.statusMap[opts.bucket] = m - s.Unlock() + s.markStatus(ResyncStarted, opts) } + // Walk through all object versions - Walk() is always in ascending order needed to ensure // delete marker replicated to target after object version is first created. if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil { @@ -2401,80 +2421,85 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed { lastCheckpoint = st.Object } + workers, err := workers.New(resyncParallelRoutines) for obj := range objInfoCh { select { case <-s.resyncCancelCh: resyncStatus = ResyncCanceled return + case <-ctx.Done(): + return default: } if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name { continue } lastCheckpoint = "" - - roi := getHealReplicateObjectInfo(obj, rcfg) - if !roi.ExistingObjResync.mustResync() { - continue - } - traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID)) - if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { - versionID := "" - dmVersionID := "" - if roi.VersionPurgeStatus.Empty() { - dmVersionID = roi.VersionID - } else { - versionID = roi.VersionID + obj := obj + workers.Take() + go func() { + defer workers.Give() + roi := getHealReplicateObjectInfo(obj, rcfg) + if !roi.ExistingObjResync.mustResync() { + return } + traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID)) + if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { + versionID := "" + dmVersionID := "" + if roi.VersionPurgeStatus.Empty() { + dmVersionID = roi.VersionID + } else { + versionID = roi.VersionID + } - doi := DeletedObjectReplicationInfo{ - DeletedObject: DeletedObject{ - ObjectName: roi.Name, - DeleteMarkerVersionID: dmVersionID, - VersionID: versionID, - ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), - DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, - DeleteMarker: roi.DeleteMarker, + doi := DeletedObjectReplicationInfo{ + DeletedObject: DeletedObject{ + ObjectName: roi.Name, + DeleteMarkerVersionID: dmVersionID, + VersionID: versionID, + ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), + DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, + DeleteMarker: roi.DeleteMarker, + }, + Bucket: roi.Bucket, + OpType: replication.ExistingObjectReplicationType, + EventType: ReplicateExistingDelete, + } + replicateDelete(ctx, doi, objectAPI) + } else { + roi.OpType = replication.ExistingObjectReplicationType + roi.EventType = ReplicateExisting + replicateObject(ctx, roi, objectAPI) + } + _, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{ + VersionID: roi.VersionID, + Internal: minio.AdvancedGetOptions{ + ReplicationProxyRequest: "false", }, - Bucket: roi.Bucket, - OpType: replication.ExistingObjectReplicationType, - EventType: ReplicateExistingDelete, + }) + st := TargetReplicationResyncStatus{ + Object: roi.Name, + Bucket: roi.Bucket, } - replicateDelete(ctx, doi, objectAPI) - } else { - roi.OpType = replication.ExistingObjectReplicationType - roi.EventType = ReplicateExisting - replicateObject(ctx, roi, objectAPI) - } - _, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{ - VersionID: roi.VersionID, - Internal: minio.AdvancedGetOptions{ - ReplicationProxyRequest: "false", - }, - }) - s.Lock() - m = s.statusMap[opts.bucket] - st = m.TargetsMap[opts.arn] - st.Object = roi.Name - success := true - if err != nil { - if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) { - st.ReplicatedCount++ + success := true + if err != nil { + if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) { + st.ReplicatedCount++ + } else { + st.FailedCount++ + success = false + } } else { - st.FailedCount++ - success = false + st.ReplicatedCount++ + st.ReplicatedSize += roi.Size } - } else { - st.ReplicatedCount++ - st.ReplicatedSize += roi.Size - } - m.TargetsMap[opts.arn] = st - m.LastUpdate = UTCNow() - s.statusMap[opts.bucket] = m - s.Unlock() - traceFn(err) - globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID) + s.incStats(st, opts) + traceFn(err) + globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID) + }() } + workers.Wait() resyncStatus = ResyncCompleted }