From 407c9ddcbf1763cd92b136c6e8dd2a08d6f10ec8 Mon Sep 17 00:00:00 2001 From: Poorna Date: Fri, 31 Mar 2023 10:48:36 -0700 Subject: [PATCH] feat: add batch replicate from remote MinIO to local cluster (#16922) --- cmd/batch-handlers.go | 439 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 422 insertions(+), 17 deletions(-) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index ae92ec9cb..ca5b4147f 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -28,6 +28,7 @@ import ( "math/rand" "net/http" "net/url" + "path" "runtime" "strconv" "strings" @@ -37,10 +38,14 @@ import ( "github.com/dustin/go-humanize" "github.com/lithammer/shortuuid/v4" "github.com/minio/madmin-go/v2" + "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/auth" + "github.com/minio/minio/internal/crypto" + "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/workers" @@ -189,6 +194,11 @@ type BatchJobReplicateCredentials struct { SessionToken string `xml:"SessionToken" json:"sessionToken,omitempty" yaml:"sessionToken"` } +// Empty indicates if credentials are not set +func (c BatchJobReplicateCredentials) Empty() bool { + return c.AccessKey == "" && c.SecretKey == "" && c.SessionToken == "" +} + // Validate validates if credentials are valid func (c BatchJobReplicateCredentials) Validate() error { if !auth.IsAccessKeyValid(c.AccessKey) || !auth.IsSecretKeyValid(c.SecretKey) { @@ -227,6 +237,11 @@ type BatchJobReplicateV1 struct { clnt *miniogo.Core `msg:"-"` } +// RemoteToLocal returns true if source is remote and target is local +func (r BatchJobReplicateV1) RemoteToLocal() bool { + return !r.Source.Creds.Empty() +} + // BatchJobRequest this is an internal data structure not for external consumption. type BatchJobRequest struct { ID string `yaml:"-" json:"name"` @@ -270,10 +285,360 @@ func (r BatchJobReplicateV1) Notify(ctx context.Context, body io.Reader) error { } // ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local. -func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObject string) error { +func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, core *minio.Core, srcObjInfo ObjectInfo, retry bool) error { + srcBucket := r.Source.Bucket + tgtBucket := r.Target.Bucket + srcObject := srcObjInfo.Name + tgtObject := srcObjInfo.Name + if r.Target.Prefix != "" { + tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name) + } + + versioned := globalBucketVersioningSys.PrefixEnabled(tgtBucket, tgtObject) + versionSuspended := globalBucketVersioningSys.PrefixSuspended(tgtBucket, tgtObject) + + if srcObjInfo.DeleteMarker { + _, err := api.DeleteObject(ctx, tgtBucket, tgtObject, ObjectOptions{ + VersionID: srcObjInfo.VersionID, + VersionSuspended: versionSuspended, + Versioned: versioned, + MTime: srcObjInfo.ModTime, + DeleteMarker: srcObjInfo.DeleteMarker, + ReplicationRequest: true, + }) + return err + } + + opts := ObjectOptions{ + VersionID: srcObjInfo.VersionID, + Versioned: versioned, + VersionSuspended: versionSuspended, + MTime: srcObjInfo.ModTime, + PreserveETag: srcObjInfo.ETag, + UserDefined: srcObjInfo.UserDefined, + } + if crypto.S3.IsEncrypted(srcObjInfo.UserDefined) { + opts.ServerSideEncryption = encrypt.NewSSE() + } + slc := strings.Split(srcObjInfo.ETag, "-") + if len(slc) == 2 { + partsCount, err := strconv.Atoi(slc[1]) + if err != nil { + return err + } + return r.copyWithMultipartfromSource(ctx, api, core, srcObjInfo, opts, partsCount) + } + gopts := minio.GetObjectOptions{ + VersionID: srcObjInfo.VersionID, + } + if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil { + return err + } + rd, objInfo, _, err := core.GetObject(ctx, srcBucket, srcObject, gopts) + if err != nil { + return err + } + defer rd.Close() + + hr, err := hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size) + if err != nil { + return err + } + pReader := NewPutObjReader(hr) + _, err = api.PutObject(ctx, tgtBucket, tgtObject, pReader, opts) + return err +} + +func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, api ObjectLayer, c *minio.Core, srcObjInfo ObjectInfo, opts ObjectOptions, partsCount int) (err error) { + srcBucket := r.Source.Bucket + tgtBucket := r.Target.Bucket + srcObject := srcObjInfo.Name + tgtObject := srcObjInfo.Name + if r.Target.Prefix != "" { + tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name) + } + + var uploadedParts []CompletePart + res, err := api.NewMultipartUpload(context.Background(), tgtBucket, tgtObject, opts) + if err != nil { + return err + } + + defer func() { + if err != nil { + // block and abort remote upload upon failure. + attempts := 1 + for attempts <= 3 { + aerr := api.AbortMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, ObjectOptions{}) + if aerr == nil { + return + } + logger.LogIf(ctx, + fmt.Errorf("trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster", + humanize.Ordinal(attempts), res.UploadID, tgtBucket, tgtObject, aerr)) + attempts++ + time.Sleep(time.Second) + } + } + }() + + var ( + hr *hash.Reader + pInfo PartInfo + ) + + for i := 0; i < partsCount; i++ { + gopts := minio.GetObjectOptions{ + VersionID: srcObjInfo.VersionID, + PartNumber: i + 1, + } + if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil { + return err + } + rd, objInfo, _, err := c.GetObject(ctx, srcBucket, srcObject, gopts) + if err != nil { + return err + } + defer rd.Close() + + hr, err = hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size) + if err != nil { + return err + } + pReader := NewPutObjReader(hr) + opts.PreserveETag = "" + pInfo, err = api.PutObjectPart(ctx, tgtBucket, tgtObject, res.UploadID, i+1, pReader, opts) + if err != nil { + return err + } + if pInfo.Size != objInfo.Size { + return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, objInfo.Size) + } + uploadedParts = append(uploadedParts, CompletePart{ + PartNumber: pInfo.PartNumber, + ETag: pInfo.ETag, + }) + } + _, err = api.CompleteMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, uploadedParts, opts) + return err +} + +// StartFromSource starts the batch replication job from remote source, resumes if there was a pending job via "job.ID" +func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { + ri := &batchJobInfo{ + JobID: job.ID, + JobType: string(job.Type()), + StartTime: job.Started, + } + if err := ri.load(ctx, api, job); err != nil { + return err + } + globalBatchJobsMetrics.save(job.ID, ri) + + delay := job.Replicate.Flags.Retry.Delay + if delay == 0 { + delay = batchReplJobDefaultRetryDelay + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + skip := func(oi ObjectInfo) (ok bool) { + if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan { + // skip all objects that are newer than specified older duration + return true + } + + if r.Flags.Filter.NewerThan > 0 && time.Since(oi.ModTime) >= r.Flags.Filter.NewerThan { + // skip all objects that are older than specified newer duration + return true + } + + if !r.Flags.Filter.CreatedAfter.IsZero() && r.Flags.Filter.CreatedAfter.Before(oi.ModTime) { + // skip all objects that are created before the specified time. + return true + } + + if !r.Flags.Filter.CreatedBefore.IsZero() && r.Flags.Filter.CreatedBefore.After(oi.ModTime) { + // skip all objects that are created after the specified time. + return true + } + if len(r.Flags.Filter.Tags) > 0 { + // Only parse object tags if tags filter is specified. + tagMap := map[string]string{} + tagStr := oi.UserTags + if len(tagStr) != 0 { + t, err := tags.ParseObjectTags(tagStr) + if err != nil { + return false + } + tagMap = t.ToMap() + } + for _, kv := range r.Flags.Filter.Tags { + for t, v := range tagMap { + if kv.Match(BatchJobReplicateKV{Key: t, Value: v}) { + return true + } + } + } + + // None of the provided tags filter match skip the object + return false + } + + if len(r.Flags.Filter.Metadata) > 0 { + for _, kv := range r.Flags.Filter.Metadata { + for k, v := range oi.UserDefined { + if !strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") && !isStandardHeader(k) { + continue + } + // We only need to match x-amz-meta or standardHeaders + if kv.Match(BatchJobReplicateKV{Key: k, Value: v}) { + return true + } + } + } + + // None of the provided metadata filters match skip the object. + return false + } + + return false + } + + u, err := url.Parse(r.Source.Endpoint) + if err != nil { + return err + } + + cred := r.Source.Creds + + c, err := miniogo.New(u.Host, &miniogo.Options{ + Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), + Secure: u.Scheme == "https", + Transport: getRemoteInstanceTransport, + }) + if err != nil { + return err + } + + c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) + core := &minio.Core{Client: c} + + workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) + if err != nil { + return err + } + + wk, err := workers.New(workerSize) + if err != nil { + // invalid worker size. + return err + } + + retryAttempts := ri.RetryAttempts + retry := false + for attempts := 1; attempts <= retryAttempts; attempts++ { + attempts := attempts + + ctx, cancel := context.WithCancel(ctx) + objInfoCh := c.ListObjects(ctx, r.Source.Bucket, minio.ListObjectsOptions{ + Prefix: r.Source.Prefix, + WithVersions: true, + Recursive: true, + WithMetadata: true, + }) + for obj := range objInfoCh { + oi := toObjectInfo(r.Source.Bucket, obj.Key, obj) + if skip(oi) { + continue + } + wk.Take() + go func() { + defer wk.Give() + stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, oi) + success := true + if err := r.ReplicateFromSource(ctx, api, core, oi, retry); err != nil { + // object must be deleted concurrently, allow these failures but do not count them + if isErrVersionNotFound(err) || isErrObjectNotFound(err) { + return + } + stopFn(err) + logger.LogIf(ctx, err) + success = false + } else { + stopFn(nil) + } + ri.trackCurrentBucketObject(r.Target.Bucket, oi, success) + globalBatchJobsMetrics.save(job.ID, ri) + // persist in-memory state to disk after every 10secs. + logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location)) + }() + } + wk.Wait() + + ri.RetryAttempts = attempts + ri.Complete = ri.ObjectsFailed == 0 + ri.Failed = ri.ObjectsFailed > 0 + + globalBatchJobsMetrics.save(job.ID, ri) + // persist in-memory state to disk. + logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job.Location)) + + buf, _ := json.Marshal(ri) + if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil { + logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err)) + } + + cancel() + if ri.Failed { + ri.ObjectsFailed = 0 + ri.Bucket = "" + ri.Object = "" + ri.Objects = 0 + ri.BytesFailed = 0 + ri.BytesTransferred = 0 + retry = true // indicate we are retrying.. + time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay))) + continue + } + + break + } + return nil } +// toObjectInfo converts minio.ObjectInfo to ObjectInfo +func toObjectInfo(bucket, object string, objInfo minio.ObjectInfo) ObjectInfo { + tags, _ := tags.MapToObjectTags(objInfo.UserTags) + oi := ObjectInfo{ + Bucket: bucket, + Name: object, + ModTime: objInfo.LastModified, + Size: objInfo.Size, + ETag: objInfo.ETag, + VersionID: objInfo.VersionID, + IsLatest: objInfo.IsLatest, + DeleteMarker: objInfo.IsDeleteMarker, + ContentType: objInfo.ContentType, + Expires: objInfo.Expires, + StorageClass: objInfo.StorageClass, + ReplicationStatusInternal: objInfo.ReplicationStatus, + UserTags: tags.String(), + } + oi.UserDefined = make(map[string]string, len(objInfo.Metadata)) + for k, v := range objInfo.Metadata { + oi.UserDefined[k] = v[0] + } + ce, ok := oi.UserDefined[xhttp.ContentEncoding] + if !ok { + ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)] + } + if ok { + oi.ContentEncoding = ce + } + return oi +} + // ReplicateToTarget read from source and replicate to configured target func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error { srcBucket := r.Source.Bucket @@ -705,7 +1070,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba buf, _ := json.Marshal(ri) if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to notify %v", err)) + logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err)) } cancel() @@ -751,8 +1116,11 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, if r.Source.Bucket == "" { return errInvalidArgument } - - info, err := o.GetBucketInfo(ctx, r.Source.Bucket, BucketOptions{}) + localBkt := r.Source.Bucket + if r.Source.Endpoint != "" { + localBkt = r.Target.Bucket + } + info, err := o.GetBucketInfo(ctx, localBkt, BucketOptions{}) if err != nil { if isErrBucketNotFound(err) { return batchReplicationJobError{ @@ -767,8 +1135,20 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, if err := r.Source.Type.Validate(); err != nil { return err } + if r.Source.Creds.Empty() && r.Target.Creds.Empty() { + return errInvalidArgument + } - if r.Target.Endpoint == "" { + if !r.Source.Creds.Empty() { + if err := r.Source.Creds.Validate(); err != nil { + return err + } + } + if r.Target.Endpoint == "" && !r.Target.Creds.Empty() { + return errInvalidArgument + } + + if r.Source.Endpoint == "" && !r.Source.Creds.Empty() { return errInvalidArgument } @@ -776,8 +1156,14 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, return errInvalidArgument } - if err := r.Target.Creds.Validate(); err != nil { - return err + if !r.Target.Creds.Empty() { + if err := r.Target.Creds.Validate(); err != nil { + return err + } + } + + if r.Source.Creds.Empty() && r.Target.Creds.Empty() { + return errInvalidArgument } if err := r.Target.Type.Validate(); err != nil { @@ -800,13 +1186,21 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, return err } - u, err := url.Parse(r.Target.Endpoint) + remoteEp := r.Target.Endpoint + remoteBkt := r.Target.Bucket + cred := r.Target.Creds + + if r.Source.Endpoint != "" { + remoteEp = r.Source.Endpoint + cred = r.Source.Creds + remoteBkt = r.Source.Bucket + } + + u, err := url.Parse(remoteEp) if err != nil { return err } - cred := r.Target.Creds - c, err := miniogo.NewCore(u.Host, &miniogo.Options{ Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), Secure: u.Scheme == "https", @@ -817,7 +1211,7 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, } c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) - vcfg, err := c.GetBucketVersioning(ctx, r.Target.Bucket) + vcfg, err := c.GetBucketVersioning(ctx, remoteBkt) if err != nil { if miniogo.ToErrorResponse(err).Code == "NoSuchBucket" { return batchReplicationJobError{ @@ -1154,13 +1548,24 @@ func (j *BatchJobPool) AddWorker() { return } if job.Replicate != nil { - if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil { - if !isErrBucketNotFound(err) { - logger.LogIf(j.ctx, err) - j.canceler(job.ID, false) - continue + if job.Replicate.RemoteToLocal() { + if err := job.Replicate.StartFromSource(job.ctx, j.objLayer, *job); err != nil { + if !isErrBucketNotFound(err) { + logger.LogIf(j.ctx, err) + j.canceler(job.ID, false) + continue + } + // Bucket not found proceed to delete such a job. + } + } else { + if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil { + if !isErrBucketNotFound(err) { + logger.LogIf(j.ctx, err) + j.canceler(job.ID, false) + continue + } + // Bucket not found proceed to delete such a job. } - // Bucket not found proceed to delete such a job. } } job.delete(j.ctx, j.objLayer)