diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 0e87cc795..f96c5b7be 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1071,86 +1071,84 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) - var ( - walkCh = make(chan itemOrErr[ObjectInfo], 100) - slowCh = make(chan itemOrErr[ObjectInfo], 100) - ) - - if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { - go func() { - // Snowball currently needs the high level minio-go Client, not the Core one - cl, err := miniogo.New(u.Host, &miniogo.Options{ - Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), - Secure: u.Scheme == "https", - Transport: getRemoteInstanceTransport(), - BucketLookup: lookupStyle(r.Target.Path), - }) - if err != nil { - batchLogIf(ctx, err) - return - } - - // Already validated before arriving here - smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) - - batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch) - writeFn := func(batch []ObjectInfo) { - if len(batch) > 0 { - if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { - batchLogIf(ctx, err) - for _, b := range batch { - slowCh <- itemOrErr[ObjectInfo]{Item: b} - } - } else { - ri.trackCurrentBucketBatch(r.Source.Bucket, batch) - globalBatchJobsMetrics.save(job.ID, ri) - // persist in-memory state to disk after every 10secs. - batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) - } - } - } - for obj := range walkCh { - if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) { - slowCh <- obj - continue - } - - batch = append(batch, obj.Item) - - if len(batch) < *r.Source.Snowball.Batch { - continue - } - writeFn(batch) - batch = batch[:0] - } - writeFn(batch) - xioutil.SafeClose(slowCh) - }() - } else { - slowCh = walkCh - } - - workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) - if err != nil { - return err - } - - wk, err := workers.New(workerSize) - if err != nil { - // invalid worker size. - return err - } - - walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict") - if walkQuorum == "" { - walkQuorum = "strict" - } - retryAttempts := ri.RetryAttempts retry := false for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts + var ( + walkCh = make(chan itemOrErr[ObjectInfo], 100) + slowCh = make(chan itemOrErr[ObjectInfo], 100) + ) + if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { + go func() { + // Snowball currently needs the high level minio-go Client, not the Core one + cl, err := miniogo.New(u.Host, &miniogo.Options{ + Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), + Secure: u.Scheme == "https", + Transport: getRemoteInstanceTransport(), + BucketLookup: lookupStyle(r.Target.Path), + }) + if err != nil { + batchLogIf(ctx, err) + return + } + + // Already validated before arriving here + smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) + + batch := make([]ObjectInfo, 0, *r.Source.Snowball.Batch) + writeFn := func(batch []ObjectInfo) { + if len(batch) > 0 { + if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { + batchLogIf(ctx, err) + for _, b := range batch { + slowCh <- itemOrErr[ObjectInfo]{Item: b} + } + } else { + ri.trackCurrentBucketBatch(r.Source.Bucket, batch) + globalBatchJobsMetrics.save(job.ID, ri) + // persist in-memory state to disk after every 10secs. + batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) + } + } + } + for obj := range walkCh { + if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) { + slowCh <- obj + continue + } + + batch = append(batch, obj.Item) + + if len(batch) < *r.Source.Snowball.Batch { + continue + } + writeFn(batch) + batch = batch[:0] + } + writeFn(batch) + xioutil.SafeClose(slowCh) + }() + } else { + slowCh = walkCh + } + + workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) + if err != nil { + return err + } + + wk, err := workers.New(workerSize) + if err != nil { + // invalid worker size. + return err + } + + walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict") + if walkQuorum == "" { + walkQuorum = "strict" + } ctx, cancel := context.WithCancel(ctx) // one of source/target is s3, skip delete marker and all versions under the same object name. s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3