From e8c98c32464361dd7643b55e028d5735a48ebefc Mon Sep 17 00:00:00 2001 From: Poorna Date: Mon, 10 Jul 2023 10:57:56 -0400 Subject: [PATCH] Avoid extra GetObjectInfo call in DeleteObject API (#17599) Optimize DeleteObject API to avoid extra GetObjectInfo call on the replicating side. For receiving side, it is just a regular DeleteObject call. Bonus: Fix a corner case where version purged is absent on target (either due to replication not yet complete or target version already deleted in a one-way replication or when replication was disabled). In such cases, mark version purge complete. --- cmd/bucket-handlers.go | 4 +- cmd/bucket-object-lock.go | 26 ++++----- cmd/bucket-replication-utils.go | 10 +++- cmd/bucket-replication.go | 24 +++++---- cmd/erasure-object.go | 22 +++++++- cmd/object-api-datatypes.go | 1 + cmd/object-api-interface.go | 20 +++++-- cmd/object-handlers.go | 94 ++++++++++++++++----------------- cmd/s3-zip-handlers.go | 4 +- 9 files changed, 124 insertions(+), 81 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 98263a184..051baa68d 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -580,8 +580,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } } if object.VersionID != "" && hasLockEnabled { - if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone { - apiErr := errorCodes.ToAPIErr(apiErrCode) + if err := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); err != nil { + apiErr := toAPIError(ctx, err) deleteResults[index].errInfo = DeleteError{ Code: apiErr.Code, Message: apiErr.Description, diff --git a/cmd/bucket-object-lock.go b/cmd/bucket-object-lock.go index f94004002..adef0336a 100644 --- a/cmd/bucket-object-lock.go +++ b/cmd/bucket-object-lock.go @@ -82,24 +82,24 @@ func enforceRetentionForDeletion(ctx context.Context, objInfo ObjectInfo) (locke // For objects in "Governance" mode, overwrite is allowed if a) object retention date is past OR // governance bypass headers are set and user has governance bypass permissions. // Objects in "Compliance" mode can be overwritten only if retention date is past. -func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucket string, object ObjectToDelete, oi ObjectInfo, gerr error) APIErrorCode { +func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucket string, object ObjectToDelete, oi ObjectInfo, gerr error) error { if gerr != nil { // error from GetObjectInfo if _, ok := gerr.(MethodNotAllowed); ok { // This happens usually for a delete marker if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { // Delete marker should be present and valid. - return ErrNone + return nil } } if isErrObjectNotFound(gerr) || isErrVersionNotFound(gerr) { - return ErrNone + return nil } - return toAPIErrorCode(ctx, gerr) + return gerr } lhold := objectlock.GetObjectLegalHoldMeta(oi.UserDefined) if lhold.Status.Valid() && lhold.Status == objectlock.LegalHoldOn { - return ErrObjectLocked + return ObjectLocked{} } ret := objectlock.GetObjectRetentionMeta(oi.UserDefined) @@ -115,13 +115,13 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke t, err := objectlock.UTCNowNTP() if err != nil { logger.LogIf(ctx, err) - return ErrObjectLocked + return ObjectLocked{} } if !ret.RetainUntilDate.Before(t) { - return ErrObjectLocked + return ObjectLocked{} } - return ErrNone + return nil case objectlock.RetGovernance: // In governance mode, users can't overwrite or delete an object // version or alter its lock settings unless they have special @@ -141,22 +141,22 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke t, err := objectlock.UTCNowNTP() if err != nil { logger.LogIf(ctx, err) - return ErrObjectLocked + return ObjectLocked{} } if !ret.RetainUntilDate.Before(t) { - return ErrObjectLocked + return ObjectLocked{} } - return ErrNone + return nil } // https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html#object-lock-retention-modes // If you try to delete objects protected by governance mode and have s3:BypassGovernanceRetention, the operation will succeed. if checkRequestAuthType(ctx, r, policy.BypassGovernanceRetentionAction, bucket, object.ObjectName) != ErrNone { - return ErrAccessDenied + return errAuthentication } } } - return ErrNone + return nil } // enforceRetentionBypassForPut enforces whether an existing object under governance can be overwritten diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 2a8f09ad3..d2e8ec9d0 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -536,12 +536,18 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl } } +func (ri *ReplicateObjectInfo) getReplicationState() ReplicationState { + rs := ri.ObjectInfo.getReplicationState() + rs.ReplicateDecisionStr = ri.Dsc.String() + return rs +} + // vID here represents the versionID client specified in request - need to distinguish between delete marker and delete marker deletion -func (o *ObjectInfo) getReplicationState(dsc string, vID string, heal bool) ReplicationState { +func (o *ObjectInfo) getReplicationState() ReplicationState { rs := ReplicationState{ ReplicationStatusInternal: o.ReplicationStatusInternal, VersionPurgeStatusInternal: o.VersionPurgeStatusInternal, - ReplicateDecisionStr: dsc, + ReplicateDecisionStr: o.replicationDecision, Targets: make(map[string]replication.StatusType), PurgeTargets: make(map[string]VersionPurgeStatusType), ResetStatusesMap: make(map[string]string), diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 0ebc25aed..c1f505897 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -634,13 +634,20 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI IsReplicationReadyForDeleteMarker: true, }, }) - if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) { - if dobj.VersionID == "" { + switch { + case isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)): + // delete marker already replicated + if dobj.VersionID == "" && rinfo.VersionPurgeStatus.Empty() { rinfo.ReplicationStatus = replication.Completed return } - } - if !isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) { + case isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)): + // version being purged is already not found on target. + if !rinfo.VersionPurgeStatus.Empty() { + rinfo.VersionPurgeStatus = Complete + return + } + default: // mark delete marker replication as failed if target cluster not ready to receive // this request yet (object version not replicated yet) if err != nil && !toi.ReplicationReady { @@ -649,7 +656,6 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI } } } - rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, minio.RemoveObjectOptions{ VersionID: versionID, Internal: minio.AdvancedRemoveOptions{ @@ -1048,7 +1054,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje popts := ObjectOptions{ MTime: objInfo.ModTime, VersionID: objInfo.VersionID, - EvalMetadataFn: func(oi *ObjectInfo) error { + EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) { oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) oi.UserDefined[xhttp.AmzBucketReplicationStatus] = string(rinfos.ReplicationStatus()) @@ -1060,7 +1066,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje if objInfo.UserTags != "" { oi.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags } - return nil + return dsc, nil }, } @@ -2478,7 +2484,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object ObjectName: roi.Name, DeleteMarkerVersionID: dmVersionID, VersionID: versionID, - ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), + ReplicationState: roi.getReplicationState(), DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, DeleteMarker: roi.DeleteMarker, }, @@ -2895,7 +2901,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf ObjectName: roi.Name, DeleteMarkerVersionID: dmVersionID, VersionID: versionID, - ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), + ReplicationState: roi.getReplicationState(), DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, DeleteMarker: roi.DeleteMarker, }, diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index fe6da6328..ee55a62db 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1657,6 +1657,22 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string return objInfo, gerr } } + if opts.EvalMetadataFn != nil { + dsc, err := opts.EvalMetadataFn(&goi, err) + if err != nil { + return ObjectInfo{}, err + } + if dsc.ReplicateAny() { + opts.SetDeleteReplicationState(dsc, opts.VersionID) + goi.replicationDecision = opts.DeleteReplication.ReplicateDecisionStr + } + } + + if opts.EvalRetentionBypassFn != nil { + if err := opts.EvalRetentionBypassFn(goi, gerr); err != nil { + return ObjectInfo{}, err + } + } if opts.Expiration.Expire { if gerr == nil { @@ -1765,7 +1781,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if err = er.deleteObjectVersion(ctx, bucket, object, fi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } - return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil + oi := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) + oi.replicationDecision = goi.replicationDecision + return oi, nil } // Delete the object version on all disks. @@ -1855,7 +1873,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if opts.EvalMetadataFn != nil { - if err := opts.EvalMetadataFn(&objInfo); err != nil { + if _, err := opts.EvalMetadataFn(&objInfo, err); err != nil { return ObjectInfo{}, err } } diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index e9bf2ab9e..78175355e 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -185,6 +185,7 @@ type ObjectInfo struct { VersionPurgeStatusInternal string VersionPurgeStatus VersionPurgeStatusType + replicationDecision string // internal representation of replication decision for use by DeleteObject handler // The total count of all versions of this object NumVersions int // The modtime of the successor object version if any diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index f73b56864..56c2b6471 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -35,8 +35,11 @@ import ( // CheckPreconditionFn returns true if precondition check failed. type CheckPreconditionFn func(o ObjectInfo) bool -// EvalMetadataFn validates input objInfo and returns an updated metadata -type EvalMetadataFn func(o *ObjectInfo) error +// EvalMetadataFn validates input objInfo and GetObjectInfo error and returns an updated metadata and replication decision if any +type EvalMetadataFn func(o *ObjectInfo, gerr error) (ReplicateDecision, error) + +// EvalRetentionBypassFn validates input objInfo and GetObjectInfo error and returns an error if retention bypass is not allowed. +type EvalRetentionBypassFn func(o ObjectInfo, gerr error) error // GetObjectInfoFn is the signature of GetObjectInfo function. type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) @@ -100,7 +103,8 @@ type ObjectOptions struct { InclFreeVersions bool - MetadataChg bool // is true if it is a metadata update operation. + MetadataChg bool // is true if it is a metadata update operation. + EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject. } // ExpirationOptions represents object options for object expiration at objectLayer. @@ -182,6 +186,16 @@ func (o *ObjectOptions) PutReplicationState() (r ReplicationState) { return } +// SetEvalMetadataFn sets the metadata evaluation function +func (o *ObjectOptions) SetEvalMetadataFn(f EvalMetadataFn) { + o.EvalMetadataFn = f +} + +// SetEvalRetentionBypassFn sets the retention bypass function +func (o *ObjectOptions) SetEvalRetentionBypassFn(f EvalRetentionBypassFn) { + o.EvalRetentionBypassFn = f +} + // ObjectLayer implements primitives for object API layer. type ObjectLayer interface { // Locking operations on object. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 8976171ea..d6236e6be 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2326,33 +2326,29 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. return } - getObjectInfo := objectAPI.GetObjectInfo - if api.CacheAPI() != nil { - getObjectInfo = api.CacheAPI().GetObjectInfo - } - os := newObjSweeper(bucket, object).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended) - // Mutations of objects on versioning suspended buckets - // affect its null version. Through opts below we select - // the null version's remote object to delete if - // transitioned. - goiOpts := os.GetOpts() - goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts) - if gerr == nil { - os.SetTransitionState(goi.TransitionedObject) - } - dsc := checkReplicateDelete(ctx, bucket, ObjectToDelete{ - ObjectV: ObjectV{ - ObjectName: object, - VersionID: opts.VersionID, - }, - }, goi, opts, gerr) + opts.SetEvalMetadataFn(func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) { + if replica { // no need to check replication on receiver + return dsc, nil + } + dsc = checkReplicateDelete(ctx, bucket, ObjectToDelete{ + ObjectV: ObjectV{ + ObjectName: object, + VersionID: opts.VersionID, + }, + }, *oi, opts, gerr) + // Mutations of objects on versioning suspended buckets + // affect its null version. Through opts below we select + // the null version's remote object to delete if + // transitioned. + if gerr == nil { + os.SetTransitionState(oi.TransitionedObject) + } + return dsc, nil + }) vID := opts.VersionID - if dsc.ReplicateAny() { - opts.SetDeleteReplicationState(dsc, opts.VersionID) - } if replica { opts.SetReplicaStatus(replication.Replica) if opts.VersionPurgeStatus().Empty() { @@ -2360,25 +2356,21 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. vID = "" } } - - apiErr := ErrNone - if vID != "" { - apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{ - ObjectV: ObjectV{ - ObjectName: object, - VersionID: vID, - }, - }, goi, gerr) - if apiErr != ErrNone && apiErr != ErrNoSuchKey { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(apiErr), r.URL) - return + opts.SetEvalRetentionBypassFn(func(goi ObjectInfo, gerr error) (err error) { + err = nil + if vID != "" { + err := enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{ + ObjectV: ObjectV{ + ObjectName: object, + VersionID: vID, + }, + }, goi, gerr) + if err != nil && !isErrObjectNotFound(err) { + return err + } } - } - - if apiErr == ErrNoSuchKey { - writeSuccessNoContent(w) return - } + }) deleteObject := objectAPI.DeleteObject if api.CacheAPI() != nil { @@ -2393,6 +2385,12 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } + if isErrObjectNotFound(err) { + writeSuccessNoContent(w) + return + } + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return } if objInfo.Name == "" { @@ -2419,7 +2417,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. Host: handlers.GetSourceIP(r), }) - if dsc.ReplicateAny() { + if objInfo.ReplicationStatus == replication.Pending || objInfo.VersionPurgeStatus == Pending { dmVersionID := "" versionID := "" if objInfo.DeleteMarker { @@ -2434,7 +2432,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. DeleteMarkerVersionID: dmVersionID, DeleteMarkerMTime: DeleteMarkerMTime{objInfo.ModTime}, DeleteMarker: objInfo.DeleteMarker, - ReplicationState: objInfo.getReplicationState(dsc.String(), opts.VersionID, false), + ReplicationState: objInfo.getReplicationState(), }, Bucket: bucket, EventType: ReplicateIncomingDelete, @@ -2505,7 +2503,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r popts := ObjectOptions{ MTime: opts.MTime, VersionID: opts.VersionID, - EvalMetadataFn: func(oi *ObjectInfo) error { + EvalMetadataFn: func(oi *ObjectInfo, gerr error) (ReplicateDecision, error) { oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano) @@ -2514,7 +2512,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() } - return nil + return dsc, nil }, } @@ -2666,9 +2664,9 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r popts := ObjectOptions{ MTime: opts.MTime, VersionID: opts.VersionID, - EvalMetadataFn: func(oi *ObjectInfo) error { + EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) { if err := enforceRetentionBypassForPut(ctx, r, *oi, objRetention, cred, owner); err != nil { - return err + return dsc, err } if objRetention.Mode.Valid() { oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode) @@ -2678,12 +2676,12 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = "" } oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano) - dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts)) + dsc = mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts)) if dsc.ReplicateAny() { oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() } - return nil + return dsc, nil }, } diff --git a/cmd/s3-zip-handlers.go b/cmd/s3-zip-handlers.go index ebb8501be..3544cad18 100644 --- a/cmd/s3-zip-handlers.go +++ b/cmd/s3-zip-handlers.go @@ -519,10 +519,10 @@ func updateObjectMetadataWithZipInfo(ctx context.Context, objectAPI ObjectLayer, popts := ObjectOptions{ MTime: srcInfo.ModTime, VersionID: srcInfo.VersionID, - EvalMetadataFn: func(oi *ObjectInfo) error { + EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) { oi.UserDefined[archiveTypeMetadataKey] = at oi.UserDefined[archiveInfoMetadataKey] = zipInfoStr - return nil + return dsc, nil }, }