From 311380f8cbb99006f02112545eba22e563f03175 Mon Sep 17 00:00:00 2001 From: Poorna Date: Tue, 1 Aug 2023 11:51:15 -0700 Subject: [PATCH] replication resync: fix queueing (#17775) Assign resync of all versions of object to the same worker to avoid locking contention. Fixes parallel resync implementation in #16707 --- cmd/bucket-replication-utils.go | 1 + cmd/bucket-replication.go | 174 +++++++++++------- cmd/site-replication-utils.go | 12 +- .../setup_2site_existing_replication.sh | 2 +- 4 files changed, 112 insertions(+), 77 deletions(-) diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 04f0a6ef1..392844ebb 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -720,6 +720,7 @@ type TargetReplicationResyncStatus struct { // Last bucket/object replicated. Bucket string `json:"-" msg:"bkt"` Object string `json:"-" msg:"obj"` + Error error `json:"-" msg:"-"` } // BucketReplicationResyncStatus captures current replication resync status diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index e7b2fb37e..8de37220d 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -51,7 +51,6 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" - "github.com/minio/pkg/workers" "github.com/zeebo/xxh3" ) @@ -2440,12 +2439,12 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object objInfoCh := make(chan ObjectInfo) cfg, err := getReplicationConfig(ctx, opts.bucket) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err)) + logger.LogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err)) return } tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, opts.bucket) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err)) + logger.LogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err)) return } rcfg := replicationConfig{ @@ -2458,12 +2457,12 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object TargetArn: opts.arn, }) if len(tgtArns) != 1 { - logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn)) + logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn)) return } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, opts.arn) if tgt == nil { - logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn)) + logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn)) return } // mark resync status as resync started @@ -2486,7 +2485,83 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed { lastCheckpoint = st.Object } - workers, err := workers.New(resyncParallelRoutines) + workers := make([]chan ReplicateObjectInfo, resyncParallelRoutines) + resultCh := make(chan TargetReplicationResyncStatus, 1) + defer close(resultCh) + var wg sync.WaitGroup + for i := 0; i < resyncParallelRoutines; i++ { + wg.Add(1) + workers[i] = make(chan ReplicateObjectInfo, 100) + i := i + go func(ctx context.Context, idx int) { + defer wg.Done() + for roi := range workers[idx] { + select { + case <-ctx.Done(): + return + case <-s.resyncCancelCh: + default: + } + traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID)) + if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { + versionID := "" + dmVersionID := "" + if roi.VersionPurgeStatus.Empty() { + dmVersionID = roi.VersionID + } else { + versionID = roi.VersionID + } + + doi := DeletedObjectReplicationInfo{ + DeletedObject: DeletedObject{ + ObjectName: roi.Name, + DeleteMarkerVersionID: dmVersionID, + VersionID: versionID, + ReplicationState: roi.getReplicationState(), + DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, + DeleteMarker: roi.DeleteMarker, + }, + Bucket: roi.Bucket, + OpType: replication.ExistingObjectReplicationType, + EventType: ReplicateExistingDelete, + } + replicateDelete(ctx, doi, objectAPI) + } else { + roi.OpType = replication.ExistingObjectReplicationType + roi.EventType = ReplicateExisting + replicateObject(ctx, roi, objectAPI) + } + _, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{ + VersionID: roi.VersionID, + Internal: minio.AdvancedGetOptions{ + ReplicationProxyRequest: "false", + }, + }) + st := TargetReplicationResyncStatus{ + Object: roi.Name, + Bucket: roi.Bucket, + } + if err != nil { + if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) { + st.ReplicatedCount++ + } else { + st.FailedCount++ + } + } else { + st.ReplicatedCount++ + st.ReplicatedSize += roi.Size + } + traceFn(err) + select { + case <-ctx.Done(): + return + case <-s.resyncCancelCh: + return + case resultCh <- st: + } + } + }(ctx, i) + } for obj := range objInfoCh { select { case <-s.resyncCancelCh: @@ -2500,71 +2575,30 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object continue } lastCheckpoint = "" - obj := obj - workers.Take() - go func() { - defer workers.Give() - roi := getHealReplicateObjectInfo(obj, rcfg) - if !roi.ExistingObjResync.mustResync() { - return - } - traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID)) - if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { - versionID := "" - dmVersionID := "" - if roi.VersionPurgeStatus.Empty() { - dmVersionID = roi.VersionID - } else { - versionID = roi.VersionID - } - - doi := DeletedObjectReplicationInfo{ - DeletedObject: DeletedObject{ - ObjectName: roi.Name, - DeleteMarkerVersionID: dmVersionID, - VersionID: versionID, - ReplicationState: roi.getReplicationState(), - DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, - DeleteMarker: roi.DeleteMarker, - }, - Bucket: roi.Bucket, - OpType: replication.ExistingObjectReplicationType, - EventType: ReplicateExistingDelete, - } - replicateDelete(ctx, doi, objectAPI) - } else { - roi.OpType = replication.ExistingObjectReplicationType - roi.EventType = ReplicateExisting - replicateObject(ctx, roi, objectAPI) - } - _, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{ - VersionID: roi.VersionID, - Internal: minio.AdvancedGetOptions{ - ReplicationProxyRequest: "false", - }, - }) - st := TargetReplicationResyncStatus{ - Object: roi.Name, - Bucket: roi.Bucket, - } - success := true - if err != nil { - if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) { - st.ReplicatedCount++ - } else { - st.FailedCount++ - success = false - } - } else { - st.ReplicatedCount++ - st.ReplicatedSize += roi.Size - } - s.incStats(st, opts) - traceFn(err) - globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID) - }() + roi := getHealReplicateObjectInfo(obj, rcfg) + if !roi.ExistingObjResync.mustResync() { + continue + } + select { + case <-s.resyncCancelCh: + return + case <-ctx.Done(): + return + default: + h := xxh3.HashString(roi.Bucket + roi.Name) + workers[h%uint64(resyncParallelRoutines)] <- roi + } } - workers.Wait() + for i := 0; i < resyncParallelRoutines; i++ { + close(workers[i]) + } + go func() { + for r := range resultCh { + s.incStats(r, opts) + globalSiteResyncMetrics.updateMetric(r, opts.resyncID) + } + }() + wg.Wait() resyncStatus = ResyncCompleted } diff --git a/cmd/site-replication-utils.go b/cmd/site-replication-utils.go index 7a9c13896..a577bef02 100644 --- a/cmd/site-replication-utils.go +++ b/cmd/site-replication-utils.go @@ -292,22 +292,22 @@ func siteResyncStatus(currSt ResyncStatusType, m map[string]ResyncStatusType) Re } // update resync metrics per object -func (sm *siteResyncMetrics) updateMetric(roi ReplicateObjectInfo, success bool, resyncID string) { +func (sm *siteResyncMetrics) updateMetric(r TargetReplicationResyncStatus, resyncID string) { if !globalSiteReplicationSys.isEnabled() { return } sm.Lock() defer sm.Unlock() s := sm.resyncStatus[resyncID] - if success { + if r.ReplicatedCount > 0 { s.ReplicatedCount++ - s.ReplicatedSize += roi.Size + s.ReplicatedSize += r.ReplicatedSize } else { s.FailedCount++ - s.FailedSize += roi.Size + s.FailedSize += r.FailedSize } - s.Bucket = roi.Bucket - s.Object = roi.Name + s.Bucket = r.Bucket + s.Object = r.Object s.LastUpdate = UTCNow() sm.resyncStatus[resyncID] = s } diff --git a/docs/bucket/replication/setup_2site_existing_replication.sh b/docs/bucket/replication/setup_2site_existing_replication.sh index d720439d6..e4689b6fc 100755 --- a/docs/bucket/replication/setup_2site_existing_replication.sh +++ b/docs/bucket/replication/setup_2site_existing_replication.sh @@ -84,7 +84,7 @@ remote_arn=$(./mc replicate ls sitea/bucket --json | jq -r .rule.Destination.Buc sleep 1 ./mc replicate resync start sitea/bucket/ --remote-bucket "${remote_arn}" -sleep 30s ## sleep for 30s idea is that we give 300ms per object. +sleep 10s ## sleep for 10s idea is that we give 100ms per object. count=$(./mc replicate resync status sitea/bucket --remote-bucket "${remote_arn}" --json | jq .resyncInfo.target[].replicationCount)