mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 10:11:09 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1474 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1474 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright (c) 2015-2021 MinIO, Inc.
 | 
						|
//
 | 
						|
// This file is part of MinIO Object Storage stack
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package cmd
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net/http"
 | 
						|
	"reflect"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/minio/madmin-go"
 | 
						|
	"github.com/minio/minio-go/v7"
 | 
						|
	miniogo "github.com/minio/minio-go/v7"
 | 
						|
	"github.com/minio/minio-go/v7/pkg/encrypt"
 | 
						|
	"github.com/minio/minio-go/v7/pkg/tags"
 | 
						|
	"github.com/minio/minio/internal/bucket/bandwidth"
 | 
						|
	"github.com/minio/minio/internal/bucket/replication"
 | 
						|
	"github.com/minio/minio/internal/config/storageclass"
 | 
						|
	"github.com/minio/minio/internal/crypto"
 | 
						|
	"github.com/minio/minio/internal/event"
 | 
						|
	"github.com/minio/minio/internal/hash"
 | 
						|
	xhttp "github.com/minio/minio/internal/http"
 | 
						|
	"github.com/minio/minio/internal/logger"
 | 
						|
	iampolicy "github.com/minio/pkg/iam/policy"
 | 
						|
)
 | 
						|
 | 
						|
const throttleDeadline = 1 * time.Hour
 | 
						|
 | 
						|
// gets replication config associated to a given bucket name.
 | 
						|
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
 | 
						|
	if globalIsGateway {
 | 
						|
		objAPI := newObjectLayerFn()
 | 
						|
		if objAPI == nil {
 | 
						|
			return nil, errServerNotInitialized
 | 
						|
		}
 | 
						|
 | 
						|
		return nil, BucketReplicationConfigNotFound{Bucket: bucketName}
 | 
						|
	}
 | 
						|
 | 
						|
	return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
 | 
						|
}
 | 
						|
 | 
						|
// validateReplicationDestination returns error if replication destination bucket missing or not configured
 | 
						|
// It also returns true if replication destination is same as this server.
 | 
						|
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
 | 
						|
	arn, err := madmin.ParseARN(rCfg.RoleArn)
 | 
						|
	if err != nil {
 | 
						|
		return false, BucketRemoteArnInvalid{}
 | 
						|
	}
 | 
						|
	if arn.Type != madmin.ReplicationService {
 | 
						|
		return false, BucketRemoteArnTypeInvalid{}
 | 
						|
	}
 | 
						|
	clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rCfg.RoleArn)
 | 
						|
	if clnt == nil {
 | 
						|
		return false, BucketRemoteTargetNotFound{Bucket: bucket}
 | 
						|
	}
 | 
						|
	if found, _ := clnt.BucketExists(ctx, rCfg.GetDestination().Bucket); !found {
 | 
						|
		return false, BucketRemoteDestinationNotFound{Bucket: rCfg.GetDestination().Bucket}
 | 
						|
	}
 | 
						|
	if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
 | 
						|
		if ret.LockEnabled {
 | 
						|
			lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, rCfg.GetDestination().Bucket)
 | 
						|
			if err != nil || lock != "Enabled" {
 | 
						|
				return false, BucketReplicationDestinationMissingLock{Bucket: rCfg.GetDestination().Bucket}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// validate replication ARN against target endpoint
 | 
						|
	c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.RoleArn]
 | 
						|
	if ok {
 | 
						|
		if c.EndpointURL().String() == clnt.EndpointURL().String() {
 | 
						|
			sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
 | 
						|
			return sameTarget, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false, BucketRemoteTargetNotFound{Bucket: bucket}
 | 
						|
}
 | 
						|
 | 
						|
type mustReplicateOptions struct {
 | 
						|
	meta   map[string]string
 | 
						|
	status replication.StatusType
 | 
						|
	opType replication.Type
 | 
						|
}
 | 
						|
 | 
						|
func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) {
 | 
						|
	if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok {
 | 
						|
		return replication.StatusType(rs)
 | 
						|
	}
 | 
						|
	return s
 | 
						|
}
 | 
						|
func (o mustReplicateOptions) isExistingObjectReplication() bool {
 | 
						|
	return o.opType == replication.ExistingObjectReplicationType
 | 
						|
}
 | 
						|
 | 
						|
func (o mustReplicateOptions) isMetadataReplication() bool {
 | 
						|
	return o.opType == replication.MetadataReplicationType
 | 
						|
}
 | 
						|
func getMustReplicateOptions(o ObjectInfo, op replication.Type) mustReplicateOptions {
 | 
						|
	if !op.Valid() {
 | 
						|
		op = replication.ObjectReplicationType
 | 
						|
		if o.metadataOnly {
 | 
						|
			op = replication.MetadataReplicationType
 | 
						|
		}
 | 
						|
	}
 | 
						|
	meta := cloneMSS(o.UserDefined)
 | 
						|
	if o.UserTags != "" {
 | 
						|
		meta[xhttp.AmzObjectTagging] = o.UserTags
 | 
						|
	}
 | 
						|
	return mustReplicateOptions{
 | 
						|
		meta:   meta,
 | 
						|
		status: o.ReplicationStatus,
 | 
						|
		opType: op,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
 | 
						|
// a synchronous manner.
 | 
						|
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, opts mustReplicateOptions) (replicate bool, sync bool) {
 | 
						|
	if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	return mustReplicater(ctx, bucket, object, opts)
 | 
						|
}
 | 
						|
 | 
						|
// mustReplicater returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
 | 
						|
// a synchronous manner.
 | 
						|
func mustReplicater(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (replicate bool, sync bool) {
 | 
						|
	if globalIsGateway {
 | 
						|
		return replicate, sync
 | 
						|
	}
 | 
						|
	replStatus := mopts.ReplicationStatus()
 | 
						|
	if replStatus == replication.Replica && !mopts.isMetadataReplication() {
 | 
						|
		return replicate, sync
 | 
						|
	}
 | 
						|
	cfg, err := getReplicationConfig(ctx, bucket)
 | 
						|
	if err != nil {
 | 
						|
		return replicate, sync
 | 
						|
	}
 | 
						|
	opts := replication.ObjectOpts{
 | 
						|
		Name:           object,
 | 
						|
		SSEC:           crypto.SSEC.IsEncrypted(mopts.meta),
 | 
						|
		Replica:        replStatus == replication.Replica,
 | 
						|
		ExistingObject: mopts.isExistingObjectReplication(),
 | 
						|
	}
 | 
						|
	tagStr, ok := mopts.meta[xhttp.AmzObjectTagging]
 | 
						|
	if ok {
 | 
						|
		opts.UserTags = tagStr
 | 
						|
	}
 | 
						|
	tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
 | 
						|
	// the target online status should not be used here while deciding
 | 
						|
	// whether to replicate as the target could be temporarily down
 | 
						|
	if tgt != nil {
 | 
						|
		return cfg.Replicate(opts), tgt.replicateSync
 | 
						|
	}
 | 
						|
	return cfg.Replicate(opts), false
 | 
						|
}
 | 
						|
 | 
						|
// Standard headers that needs to be extracted from User metadata.
 | 
						|
var standardHeaders = []string{
 | 
						|
	xhttp.ContentType,
 | 
						|
	xhttp.CacheControl,
 | 
						|
	xhttp.ContentEncoding,
 | 
						|
	xhttp.ContentLanguage,
 | 
						|
	xhttp.ContentDisposition,
 | 
						|
	xhttp.AmzStorageClass,
 | 
						|
	xhttp.AmzObjectTagging,
 | 
						|
	xhttp.AmzBucketReplicationStatus,
 | 
						|
	xhttp.AmzObjectLockMode,
 | 
						|
	xhttp.AmzObjectLockRetainUntilDate,
 | 
						|
	xhttp.AmzObjectLockLegalHold,
 | 
						|
	xhttp.AmzTagCount,
 | 
						|
	xhttp.AmzServerSideEncryption,
 | 
						|
}
 | 
						|
 | 
						|
// returns true if any of the objects being deleted qualifies for replication.
 | 
						|
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
 | 
						|
	c, err := getReplicationConfig(ctx, bucket)
 | 
						|
	if err != nil || c == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	for _, obj := range objects {
 | 
						|
		if c.HasActiveRules(obj.ObjectName, true) {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// isStandardHeader returns true if header is a supported header and not a custom header
 | 
						|
func isStandardHeader(matchHeaderKey string) bool {
 | 
						|
	return equals(matchHeaderKey, standardHeaders...)
 | 
						|
}
 | 
						|
 | 
						|
// returns whether object version is a deletemarker and if object qualifies for replication
 | 
						|
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (replicate, sync bool) {
 | 
						|
	rcfg, err := getReplicationConfig(ctx, bucket)
 | 
						|
	if err != nil || rcfg == nil {
 | 
						|
		return false, sync
 | 
						|
	}
 | 
						|
	opts := replication.ObjectOpts{
 | 
						|
		Name:         dobj.ObjectName,
 | 
						|
		SSEC:         crypto.SSEC.IsEncrypted(oi.UserDefined),
 | 
						|
		UserTags:     oi.UserTags,
 | 
						|
		DeleteMarker: oi.DeleteMarker,
 | 
						|
		VersionID:    dobj.VersionID,
 | 
						|
		OpType:       replication.DeleteReplicationType,
 | 
						|
	}
 | 
						|
	replicate = rcfg.Replicate(opts)
 | 
						|
	// when incoming delete is removal of a delete marker( a.k.a versioned delete),
 | 
						|
	// GetObjectInfo returns extra information even though it returns errFileNotFound
 | 
						|
	if gerr != nil {
 | 
						|
		validReplStatus := false
 | 
						|
		switch oi.ReplicationStatus {
 | 
						|
		case replication.Pending, replication.Completed, replication.Failed:
 | 
						|
			validReplStatus = true
 | 
						|
		}
 | 
						|
		if oi.DeleteMarker && (validReplStatus || replicate) {
 | 
						|
			return true, sync
 | 
						|
		}
 | 
						|
		// can be the case that other cluster is down and duplicate `mc rm --vid`
 | 
						|
		// is issued - this still needs to be replicated back to the other target
 | 
						|
		return oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed, sync
 | 
						|
	}
 | 
						|
	tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
 | 
						|
	// the target online status should not be used here while deciding
 | 
						|
	// whether to replicate deletes as the target could be temporarily down
 | 
						|
	if tgt == nil {
 | 
						|
		return false, false
 | 
						|
	}
 | 
						|
	return replicate, tgt.replicateSync
 | 
						|
}
 | 
						|
 | 
						|
// replicate deletes to the designated replication target if replication configuration
 | 
						|
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
 | 
						|
// is specified) enabled.
 | 
						|
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
 | 
						|
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
 | 
						|
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
 | 
						|
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
 | 
						|
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
 | 
						|
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
 | 
						|
// on target.
 | 
						|
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, trigger string) {
 | 
						|
	var versionPurgeStatus VersionPurgeStatusType
 | 
						|
	var replicationStatus string
 | 
						|
 | 
						|
	bucket := dobj.Bucket
 | 
						|
	versionID := dobj.DeleteMarkerVersionID
 | 
						|
	if versionID == "" {
 | 
						|
		versionID = dobj.VersionID
 | 
						|
	}
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		replStatus := replicationStatus
 | 
						|
		if !versionPurgeStatus.Empty() {
 | 
						|
			replStatus = string(versionPurgeStatus)
 | 
						|
		}
 | 
						|
		auditLogInternal(context.Background(), bucket, dobj.ObjectName, AuditLogOptions{
 | 
						|
			Trigger:   trigger,
 | 
						|
			APIName:   ReplicateDeleteAPI,
 | 
						|
			VersionID: versionID,
 | 
						|
			Status:    replStatus,
 | 
						|
		})
 | 
						|
	}()
 | 
						|
 | 
						|
	rcfg, err := getReplicationConfig(ctx, bucket)
 | 
						|
	if err != nil || rcfg == nil {
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			BucketName: bucket,
 | 
						|
			Object: ObjectInfo{
 | 
						|
				Bucket:       bucket,
 | 
						|
				Name:         dobj.ObjectName,
 | 
						|
				VersionID:    versionID,
 | 
						|
				DeleteMarker: dobj.DeleteMarker,
 | 
						|
			},
 | 
						|
			Host:      "Internal: [Replication]",
 | 
						|
			EventName: event.ObjectReplicationNotTracked,
 | 
						|
		})
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
 | 
						|
	if tgt == nil {
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn))
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			BucketName: bucket,
 | 
						|
			Object: ObjectInfo{
 | 
						|
				Bucket:       bucket,
 | 
						|
				Name:         dobj.ObjectName,
 | 
						|
				VersionID:    versionID,
 | 
						|
				DeleteMarker: dobj.DeleteMarker,
 | 
						|
			},
 | 
						|
			Host:      "Internal: [Replication]",
 | 
						|
			EventName: event.ObjectReplicationNotTracked,
 | 
						|
		})
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Lock the object name before starting replication operation.
 | 
						|
	// Use separate lock that doesn't collide with regular objects.
 | 
						|
	lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
 | 
						|
	lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
 | 
						|
	if err != nil {
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn))
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			BucketName: bucket,
 | 
						|
			Object: ObjectInfo{
 | 
						|
				Bucket:       bucket,
 | 
						|
				Name:         dobj.ObjectName,
 | 
						|
				VersionID:    versionID,
 | 
						|
				DeleteMarker: dobj.DeleteMarker,
 | 
						|
			},
 | 
						|
			Host:      "Internal: [Replication]",
 | 
						|
			EventName: event.ObjectReplicationNotTracked,
 | 
						|
		})
 | 
						|
		return
 | 
						|
	}
 | 
						|
	ctx = lkctx.Context()
 | 
						|
	defer lk.Unlock(lkctx.Cancel)
 | 
						|
	// early return if already replicated delete marker for existing object replication
 | 
						|
	if dobj.DeleteMarkerVersionID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
 | 
						|
		_, err := tgt.StatObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.StatObjectOptions{
 | 
						|
			VersionID: versionID,
 | 
						|
			Internal: miniogo.AdvancedGetOptions{
 | 
						|
				ReplicationProxyRequest: "false",
 | 
						|
			}})
 | 
						|
		if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
 | 
						|
		VersionID: versionID,
 | 
						|
		Internal: miniogo.AdvancedRemoveOptions{
 | 
						|
			ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
 | 
						|
			ReplicationMTime:        dobj.DeleteMarkerMTime.Time,
 | 
						|
			ReplicationStatus:       miniogo.ReplicationStatusReplica,
 | 
						|
			ReplicationRequest:      true, // always set this to distinguish between `mc mirror` replication and serverside
 | 
						|
		},
 | 
						|
	})
 | 
						|
 | 
						|
	replicationStatus = dobj.DeleteMarkerReplicationStatus
 | 
						|
	versionPurgeStatus = dobj.VersionPurgeStatus
 | 
						|
 | 
						|
	if rmErr != nil {
 | 
						|
		if dobj.VersionID == "" {
 | 
						|
			replicationStatus = string(replication.Failed)
 | 
						|
		} else {
 | 
						|
			versionPurgeStatus = Failed
 | 
						|
		}
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, rmErr))
 | 
						|
	} else {
 | 
						|
		if dobj.VersionID == "" {
 | 
						|
			replicationStatus = string(replication.Completed)
 | 
						|
		} else {
 | 
						|
			versionPurgeStatus = Complete
 | 
						|
		}
 | 
						|
	}
 | 
						|
	prevStatus := dobj.DeleteMarkerReplicationStatus
 | 
						|
	currStatus := replicationStatus
 | 
						|
	if dobj.VersionID != "" {
 | 
						|
		prevStatus = string(dobj.VersionPurgeStatus)
 | 
						|
		currStatus = string(versionPurgeStatus)
 | 
						|
	}
 | 
						|
	// to decrement pending count later.
 | 
						|
	globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus),
 | 
						|
		replication.StatusType(prevStatus), replication.DeleteReplicationType)
 | 
						|
 | 
						|
	var eventName = event.ObjectReplicationComplete
 | 
						|
	if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
 | 
						|
		eventName = event.ObjectReplicationFailed
 | 
						|
	}
 | 
						|
 | 
						|
	// Update metadata on the delete marker or purge permanent delete if replication success.
 | 
						|
	dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
 | 
						|
		VersionID:                     versionID,
 | 
						|
		DeleteMarkerReplicationStatus: replicationStatus,
 | 
						|
		VersionPurgeStatus:            versionPurgeStatus,
 | 
						|
		Versioned:                     globalBucketVersioningSys.Enabled(bucket),
 | 
						|
		VersionSuspended:              globalBucketVersioningSys.Suspended(bucket),
 | 
						|
	})
 | 
						|
	if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err))
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			BucketName: bucket,
 | 
						|
			Object: ObjectInfo{
 | 
						|
				Bucket:       bucket,
 | 
						|
				Name:         dobj.ObjectName,
 | 
						|
				VersionID:    versionID,
 | 
						|
				DeleteMarker: dobj.DeleteMarker,
 | 
						|
			},
 | 
						|
			Host:      "Internal: [Replication]",
 | 
						|
			EventName: eventName,
 | 
						|
		})
 | 
						|
	} else {
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     dobjInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
			EventName:  eventName,
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string {
 | 
						|
	meta := make(map[string]string, len(oi.UserDefined))
 | 
						|
	for k, v := range oi.UserDefined {
 | 
						|
		if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if equals(k, xhttp.AmzBucketReplicationStatus) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
 | 
						|
		if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		meta[k] = v
 | 
						|
	}
 | 
						|
 | 
						|
	if oi.ContentEncoding != "" {
 | 
						|
		meta[xhttp.ContentEncoding] = oi.ContentEncoding
 | 
						|
	}
 | 
						|
 | 
						|
	if oi.ContentType != "" {
 | 
						|
		meta[xhttp.ContentType] = oi.ContentType
 | 
						|
	}
 | 
						|
 | 
						|
	if oi.UserTags != "" {
 | 
						|
		meta[xhttp.AmzObjectTagging] = oi.UserTags
 | 
						|
		meta[xhttp.AmzTagDirective] = "REPLACE"
 | 
						|
	}
 | 
						|
 | 
						|
	sc := dest.StorageClass
 | 
						|
	if sc == "" {
 | 
						|
		sc = oi.StorageClass
 | 
						|
	}
 | 
						|
	// drop non standard storage classes for tiering from replication
 | 
						|
	if sc != "" && (sc == storageclass.RRS || sc == storageclass.STANDARD) {
 | 
						|
		meta[xhttp.AmzStorageClass] = sc
 | 
						|
	}
 | 
						|
 | 
						|
	meta[xhttp.MinIOSourceETag] = oi.ETag
 | 
						|
	meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339Nano)
 | 
						|
	meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
 | 
						|
	return meta
 | 
						|
}
 | 
						|
 | 
						|
type caseInsensitiveMap map[string]string
 | 
						|
 | 
						|
// Lookup map entry case insensitively.
 | 
						|
func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
 | 
						|
	if len(m) == 0 {
 | 
						|
		return "", false
 | 
						|
	}
 | 
						|
	for _, k := range []string{
 | 
						|
		key,
 | 
						|
		strings.ToLower(key),
 | 
						|
		http.CanonicalHeaderKey(key),
 | 
						|
	} {
 | 
						|
		v, ok := m[k]
 | 
						|
		if ok {
 | 
						|
			return v, ok
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return "", false
 | 
						|
}
 | 
						|
 | 
						|
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
 | 
						|
	meta := make(map[string]string)
 | 
						|
	for k, v := range objInfo.UserDefined {
 | 
						|
		if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if isStandardHeader(k) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		meta[k] = v
 | 
						|
	}
 | 
						|
 | 
						|
	sc := dest.StorageClass
 | 
						|
	if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
 | 
						|
		sc = objInfo.StorageClass
 | 
						|
	}
 | 
						|
	putOpts = miniogo.PutObjectOptions{
 | 
						|
		UserMetadata:    meta,
 | 
						|
		ContentType:     objInfo.ContentType,
 | 
						|
		ContentEncoding: objInfo.ContentEncoding,
 | 
						|
		StorageClass:    sc,
 | 
						|
		Internal: miniogo.AdvancedPutOptions{
 | 
						|
			SourceVersionID:    objInfo.VersionID,
 | 
						|
			ReplicationStatus:  miniogo.ReplicationStatusReplica,
 | 
						|
			SourceMTime:        objInfo.ModTime,
 | 
						|
			SourceETag:         objInfo.ETag,
 | 
						|
			ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
 | 
						|
		},
 | 
						|
	}
 | 
						|
	if objInfo.UserTags != "" {
 | 
						|
		tag, _ := tags.ParseObjectTags(objInfo.UserTags)
 | 
						|
		if tag != nil {
 | 
						|
			putOpts.UserTags = tag.ToMap()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	lkMap := caseInsensitiveMap(objInfo.UserDefined)
 | 
						|
	if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok {
 | 
						|
		putOpts.ContentLanguage = lang
 | 
						|
	}
 | 
						|
	if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok {
 | 
						|
		putOpts.ContentDisposition = disp
 | 
						|
	}
 | 
						|
	if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok {
 | 
						|
		putOpts.CacheControl = cc
 | 
						|
	}
 | 
						|
	if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok {
 | 
						|
		rmode := miniogo.RetentionMode(mode)
 | 
						|
		putOpts.Mode = rmode
 | 
						|
	}
 | 
						|
	if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
 | 
						|
		rdate, err := time.Parse(time.RFC3339, retainDateStr)
 | 
						|
		if err != nil {
 | 
						|
			return putOpts, err
 | 
						|
		}
 | 
						|
		putOpts.RetainUntilDate = rdate
 | 
						|
	}
 | 
						|
	if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
 | 
						|
		putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
 | 
						|
	}
 | 
						|
	if crypto.S3.IsEncrypted(objInfo.UserDefined) {
 | 
						|
		putOpts.ServerSideEncryption = encrypt.NewSSE()
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
type replicationAction string
 | 
						|
 | 
						|
const (
 | 
						|
	replicateMetadata replicationAction = "metadata"
 | 
						|
	replicateNone     replicationAction = "none"
 | 
						|
	replicateAll      replicationAction = "all"
 | 
						|
)
 | 
						|
 | 
						|
// matches k1 with all keys, returns 'true' if one of them matches
 | 
						|
func equals(k1 string, keys ...string) bool {
 | 
						|
	for _, k2 := range keys {
 | 
						|
		if strings.ToLower(k1) == strings.ToLower(k2) {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// returns replicationAction by comparing metadata between source and target
 | 
						|
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
 | 
						|
	// needs full replication
 | 
						|
	if oi1.ETag != oi2.ETag ||
 | 
						|
		oi1.VersionID != oi2.VersionID ||
 | 
						|
		oi1.Size != oi2.Size ||
 | 
						|
		oi1.DeleteMarker != oi2.IsDeleteMarker ||
 | 
						|
		oi1.ModTime.Unix() != oi2.LastModified.Unix() {
 | 
						|
		return replicateAll
 | 
						|
	}
 | 
						|
 | 
						|
	if oi1.ContentType != oi2.ContentType {
 | 
						|
		return replicateMetadata
 | 
						|
	}
 | 
						|
 | 
						|
	if oi1.ContentEncoding != "" {
 | 
						|
		enc, ok := oi2.Metadata[xhttp.ContentEncoding]
 | 
						|
		if !ok {
 | 
						|
			enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)]
 | 
						|
			if !ok {
 | 
						|
				return replicateMetadata
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if strings.Join(enc, ",") != oi1.ContentEncoding {
 | 
						|
			return replicateMetadata
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	t, _ := tags.ParseObjectTags(oi1.UserTags)
 | 
						|
	if !reflect.DeepEqual(oi2.UserTags, t.ToMap()) {
 | 
						|
		return replicateMetadata
 | 
						|
	}
 | 
						|
 | 
						|
	// Compare only necessary headers
 | 
						|
	compareKeys := []string{
 | 
						|
		"Expires",
 | 
						|
		"Cache-Control",
 | 
						|
		"Content-Language",
 | 
						|
		"Content-Disposition",
 | 
						|
		"X-Amz-Object-Lock-Mode",
 | 
						|
		"X-Amz-Object-Lock-Retain-Until-Date",
 | 
						|
		"X-Amz-Object-Lock-Legal-Hold",
 | 
						|
		"X-Amz-Website-Redirect-Location",
 | 
						|
		"X-Amz-Meta-",
 | 
						|
	}
 | 
						|
 | 
						|
	// compare metadata on both maps to see if meta is identical
 | 
						|
	compareMeta1 := make(map[string]string)
 | 
						|
	for k, v := range oi1.UserDefined {
 | 
						|
		var found bool
 | 
						|
		for _, prefix := range compareKeys {
 | 
						|
			if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			found = true
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if found {
 | 
						|
			compareMeta1[strings.ToLower(k)] = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	compareMeta2 := make(map[string]string)
 | 
						|
	for k, v := range oi2.Metadata {
 | 
						|
		var found bool
 | 
						|
		for _, prefix := range compareKeys {
 | 
						|
			if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			found = true
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if found {
 | 
						|
			compareMeta2[strings.ToLower(k)] = strings.Join(v, ",")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if !reflect.DeepEqual(compareMeta1, compareMeta2) {
 | 
						|
		return replicateMetadata
 | 
						|
	}
 | 
						|
 | 
						|
	return replicateNone
 | 
						|
}
 | 
						|
 | 
						|
// replicateObject replicates the specified version of the object to destination bucket
 | 
						|
// The source object is then updated to reflect the replication status.
 | 
						|
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, trigger string) {
 | 
						|
	var replicationStatus replication.StatusType
 | 
						|
	defer func() {
 | 
						|
		if replicationStatus.Empty() {
 | 
						|
			// replication status is empty means
 | 
						|
			// replication was not attempted for some
 | 
						|
			// reason, notify the state of the object
 | 
						|
			// on disk.
 | 
						|
			replicationStatus = ri.ReplicationStatus
 | 
						|
		}
 | 
						|
		auditLogInternal(ctx, ri.Bucket, ri.Name, AuditLogOptions{
 | 
						|
			Trigger:   trigger,
 | 
						|
			APIName:   ReplicateObjectAPI,
 | 
						|
			VersionID: ri.VersionID,
 | 
						|
			Status:    replicationStatus.String(),
 | 
						|
		})
 | 
						|
	}()
 | 
						|
 | 
						|
	objInfo := ri.ObjectInfo
 | 
						|
	bucket := objInfo.Bucket
 | 
						|
	object := objInfo.Name
 | 
						|
 | 
						|
	cfg, err := getReplicationConfig(ctx, bucket)
 | 
						|
	if err != nil {
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			EventName:  event.ObjectReplicationNotTracked,
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     objInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
		})
 | 
						|
		return
 | 
						|
	}
 | 
						|
	tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
 | 
						|
	if tgt == nil {
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, cfg.RoleArn))
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			EventName:  event.ObjectReplicationNotTracked,
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     objInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
		})
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Lock the object name before starting replication.
 | 
						|
	// Use separate lock that doesn't collide with regular objects.
 | 
						|
	lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
 | 
						|
	lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
 | 
						|
	if err != nil {
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			EventName:  event.ObjectReplicationNotTracked,
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     objInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
		})
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	ctx = lkctx.Context()
 | 
						|
	defer lk.Unlock(lkctx.Cancel)
 | 
						|
 | 
						|
	var closeOnDefer bool
 | 
						|
	gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
 | 
						|
		VersionID: objInfo.VersionID,
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			EventName:  event.ObjectReplicationNotTracked,
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     objInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
		})
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer func() {
 | 
						|
		if closeOnDefer {
 | 
						|
			gr.Close()
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	closeOnDefer = true
 | 
						|
 | 
						|
	objInfo = gr.ObjInfo
 | 
						|
	size, err := objInfo.GetActualSize()
 | 
						|
	if err != nil {
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			EventName:  event.ObjectReplicationNotTracked,
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     objInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
		})
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	dest := cfg.GetDestination()
 | 
						|
	if dest.Bucket == "" {
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID))
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			EventName:  event.ObjectReplicationNotTracked,
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     objInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
		})
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	rtype := replicateAll
 | 
						|
	oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{
 | 
						|
		VersionID: objInfo.VersionID,
 | 
						|
		Internal: miniogo.AdvancedGetOptions{
 | 
						|
			ReplicationProxyRequest: "false",
 | 
						|
		}})
 | 
						|
	if err == nil {
 | 
						|
		rtype = getReplicationAction(objInfo, oi)
 | 
						|
		if rtype == replicateNone {
 | 
						|
			// object with same VersionID already exists, replication kicked off by
 | 
						|
			// PutObject might have completed
 | 
						|
			if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType {
 | 
						|
				// if metadata is not updated for some reason after replication, such as
 | 
						|
				// 503 encountered while updating metadata - make sure to set ReplicationStatus
 | 
						|
				// as Completed.
 | 
						|
				//
 | 
						|
				// Note: Replication Stats would have been updated despite metadata update failure.
 | 
						|
				gr.Close()
 | 
						|
				closeOnDefer = false
 | 
						|
				popts := ObjectOptions{
 | 
						|
					MTime:       objInfo.ModTime,
 | 
						|
					VersionID:   objInfo.VersionID,
 | 
						|
					UserDefined: make(map[string]string, len(objInfo.UserDefined)),
 | 
						|
				}
 | 
						|
				for k, v := range objInfo.UserDefined {
 | 
						|
					popts.UserDefined[k] = v
 | 
						|
				}
 | 
						|
				if ri.OpType == replication.ExistingObjectReplicationType {
 | 
						|
					popts.UserDefined[xhttp.MinIOReplicationResetStatus] = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), ri.ResetID)
 | 
						|
				}
 | 
						|
				popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Completed.String()
 | 
						|
				if objInfo.UserTags != "" {
 | 
						|
					popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
 | 
						|
				}
 | 
						|
				if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
 | 
						|
					logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
 | 
						|
				}
 | 
						|
			}
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	replicationStatus = replication.Completed
 | 
						|
	// use core client to avoid doing multipart on PUT
 | 
						|
	c := &miniogo.Core{Client: tgt.Client}
 | 
						|
	if rtype != replicateAll {
 | 
						|
		// replicate metadata for object tagging/copy with metadata replacement
 | 
						|
		srcOpts := miniogo.CopySrcOptions{
 | 
						|
			Bucket:    dest.Bucket,
 | 
						|
			Object:    object,
 | 
						|
			VersionID: objInfo.VersionID,
 | 
						|
		}
 | 
						|
		dstOpts := miniogo.PutObjectOptions{
 | 
						|
			Internal: miniogo.AdvancedPutOptions{
 | 
						|
				SourceVersionID:    objInfo.VersionID,
 | 
						|
				ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
 | 
						|
			}}
 | 
						|
		if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), srcOpts, dstOpts); err != nil {
 | 
						|
			replicationStatus = replication.Failed
 | 
						|
			logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		putOpts, err := putReplicationOpts(ctx, dest, objInfo)
 | 
						|
		if err != nil {
 | 
						|
			logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err))
 | 
						|
			sendEvent(eventArgs{
 | 
						|
				EventName:  event.ObjectReplicationNotTracked,
 | 
						|
				BucketName: bucket,
 | 
						|
				Object:     objInfo,
 | 
						|
				Host:       "Internal: [Replication]",
 | 
						|
			})
 | 
						|
			return
 | 
						|
		}
 | 
						|
		var headerSize int
 | 
						|
		for k, v := range putOpts.Header() {
 | 
						|
			headerSize += len(k) + len(v)
 | 
						|
		}
 | 
						|
 | 
						|
		opts := &bandwidth.MonitorReaderOptions{
 | 
						|
			Bucket:     objInfo.Bucket,
 | 
						|
			HeaderSize: headerSize,
 | 
						|
		}
 | 
						|
		newCtx := ctx
 | 
						|
		if globalBucketMonitor.IsThrottled(bucket) {
 | 
						|
			var cancel context.CancelFunc
 | 
						|
			newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
 | 
						|
			defer cancel()
 | 
						|
		}
 | 
						|
		r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
 | 
						|
		if objInfo.Multipart {
 | 
						|
			if err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object,
 | 
						|
				r, objInfo, putOpts); err != nil {
 | 
						|
				replicationStatus = replication.Failed
 | 
						|
				logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
 | 
						|
				replicationStatus = replication.Failed
 | 
						|
				logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	gr.Close()
 | 
						|
	closeOnDefer = false
 | 
						|
 | 
						|
	prevReplStatus := objInfo.ReplicationStatus
 | 
						|
	objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
 | 
						|
	if objInfo.UserTags != "" {
 | 
						|
		objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
 | 
						|
	}
 | 
						|
	if ri.OpType == replication.ExistingObjectReplicationType {
 | 
						|
		objInfo.UserDefined[xhttp.MinIOReplicationResetStatus] = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), ri.ResetID)
 | 
						|
	}
 | 
						|
	// FIXME: add support for missing replication events
 | 
						|
	// - event.ObjectReplicationMissedThreshold
 | 
						|
	// - event.ObjectReplicationReplicatedAfterThreshold
 | 
						|
	var eventName = event.ObjectReplicationComplete
 | 
						|
	if replicationStatus == replication.Failed {
 | 
						|
		eventName = event.ObjectReplicationFailed
 | 
						|
	}
 | 
						|
 | 
						|
	// Leave metadata in `PENDING` state if inline replication fails to save iops
 | 
						|
	if ri.OpType == replication.HealReplicationType ||
 | 
						|
		replicationStatus == replication.Completed {
 | 
						|
		popts := ObjectOptions{
 | 
						|
			MTime:       objInfo.ModTime,
 | 
						|
			VersionID:   objInfo.VersionID,
 | 
						|
			UserDefined: make(map[string]string, len(objInfo.UserDefined)),
 | 
						|
		}
 | 
						|
		for k, v := range objInfo.UserDefined {
 | 
						|
			popts.UserDefined[k] = v
 | 
						|
		}
 | 
						|
		popts.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
 | 
						|
		if objInfo.UserTags != "" {
 | 
						|
			popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
 | 
						|
		}
 | 
						|
		if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
 | 
						|
			logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w",
 | 
						|
				bucket, objInfo.Name, objInfo.VersionID, err))
 | 
						|
		}
 | 
						|
		opType := replication.MetadataReplicationType
 | 
						|
		if rtype == replicateAll {
 | 
						|
			opType = replication.ObjectReplicationType
 | 
						|
		}
 | 
						|
		globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
 | 
						|
		sendEvent(eventArgs{
 | 
						|
			EventName:  eventName,
 | 
						|
			BucketName: bucket,
 | 
						|
			Object:     objInfo,
 | 
						|
			Host:       "Internal: [Replication]",
 | 
						|
		})
 | 
						|
	}
 | 
						|
	// re-queue failures once more - keep a retry count to avoid flooding the queue if
 | 
						|
	// the target site is down. Leave it to scanner to catch up instead.
 | 
						|
	if replicationStatus != replication.Completed && ri.RetryCount < 1 {
 | 
						|
		ri.OpType = replication.HealReplicationType
 | 
						|
		ri.RetryCount++
 | 
						|
		globalReplicationPool.queueReplicaFailedTask(ri)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (err error) {
 | 
						|
	var uploadedParts []miniogo.CompletePart
 | 
						|
	uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		if err != nil {
 | 
						|
			// block and abort remote upload upon failure.
 | 
						|
			if aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID); aerr != nil {
 | 
						|
				aerr = fmt.Errorf("Unable to cleanup failed multipart replication %s on remote %s/%s: %w", uploadID, bucket, object, aerr)
 | 
						|
				logger.LogIf(ctx, aerr)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	var (
 | 
						|
		hr    *hash.Reader
 | 
						|
		pInfo miniogo.ObjectPart
 | 
						|
	)
 | 
						|
 | 
						|
	for _, partInfo := range objInfo.Parts {
 | 
						|
		hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.ActualSize, "", "", opts.ServerSideEncryption)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if pInfo.Size != partInfo.ActualSize {
 | 
						|
			return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
 | 
						|
		}
 | 
						|
		uploadedParts = append(uploadedParts, miniogo.CompletePart{
 | 
						|
			PartNumber: pInfo.PartNumber,
 | 
						|
			ETag:       pInfo.ETag,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{
 | 
						|
		Internal: miniogo.AdvancedPutOptions{
 | 
						|
			SourceMTime: objInfo.ModTime,
 | 
						|
			// always set this to distinguish between `mc mirror` replication and serverside
 | 
						|
			ReplicationRequest: true,
 | 
						|
		}})
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// filterReplicationStatusMetadata filters replication status metadata for COPY
 | 
						|
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
 | 
						|
	// Copy on write
 | 
						|
	dst := metadata
 | 
						|
	var copied bool
 | 
						|
	delKey := func(key string) {
 | 
						|
		if _, ok := metadata[key]; !ok {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if !copied {
 | 
						|
			dst = make(map[string]string, len(metadata))
 | 
						|
			for k, v := range metadata {
 | 
						|
				dst[k] = v
 | 
						|
			}
 | 
						|
			copied = true
 | 
						|
		}
 | 
						|
		delete(dst, key)
 | 
						|
	}
 | 
						|
 | 
						|
	delKey(xhttp.AmzBucketReplicationStatus)
 | 
						|
	return dst
 | 
						|
}
 | 
						|
 | 
						|
// DeletedObjectReplicationInfo has info on deleted object
 | 
						|
type DeletedObjectReplicationInfo struct {
 | 
						|
	DeletedObject
 | 
						|
	Bucket  string
 | 
						|
	OpType  replication.Type
 | 
						|
	ResetID string
 | 
						|
}
 | 
						|
 | 
						|
// Replication specific APIName
 | 
						|
const (
 | 
						|
	ReplicateObjectAPI = "ReplicateObject"
 | 
						|
	ReplicateDeleteAPI = "ReplicateDelete"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// ReplicateQueued - replication being queued trail
 | 
						|
	ReplicateQueued = "replicate:queue"
 | 
						|
 | 
						|
	// ReplicateExisting - audit trail for existing objects replication
 | 
						|
	ReplicateExisting = "replicate:existing"
 | 
						|
	// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
 | 
						|
	ReplicateExistingDelete = "replicate:existing:delete"
 | 
						|
 | 
						|
	// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
 | 
						|
	ReplicateMRF = "replicate:mrf"
 | 
						|
	// ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity]
 | 
						|
	ReplicateIncoming = "replicate:incoming"
 | 
						|
	// ReplicateHeal - audit trail for healing of failed/pending replications
 | 
						|
	ReplicateHeal = "replicate:heal"
 | 
						|
	// ReplicateDelete - audit trail for delete replication
 | 
						|
	ReplicateDelete = "replicate:delete"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	globalReplicationPool  *ReplicationPool
 | 
						|
	globalReplicationStats *ReplicationStats
 | 
						|
)
 | 
						|
 | 
						|
// ReplicationPool describes replication pool
 | 
						|
type ReplicationPool struct {
 | 
						|
	objLayer                ObjectLayer
 | 
						|
	ctx                     context.Context
 | 
						|
	mrfWorkerKillCh         chan struct{}
 | 
						|
	workerKillCh            chan struct{}
 | 
						|
	replicaCh               chan ReplicateObjectInfo
 | 
						|
	replicaDeleteCh         chan DeletedObjectReplicationInfo
 | 
						|
	mrfReplicaCh            chan ReplicateObjectInfo
 | 
						|
	existingReplicaCh       chan ReplicateObjectInfo
 | 
						|
	existingReplicaDeleteCh chan DeletedObjectReplicationInfo
 | 
						|
	workerSize              int
 | 
						|
	mrfWorkerSize           int
 | 
						|
	workerWg                sync.WaitGroup
 | 
						|
	mrfWorkerWg             sync.WaitGroup
 | 
						|
	once                    sync.Once
 | 
						|
	mu                      sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
// NewReplicationPool creates a pool of replication workers of specified size
 | 
						|
func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool {
 | 
						|
	pool := &ReplicationPool{
 | 
						|
		replicaCh:               make(chan ReplicateObjectInfo, 100000),
 | 
						|
		replicaDeleteCh:         make(chan DeletedObjectReplicationInfo, 100000),
 | 
						|
		mrfReplicaCh:            make(chan ReplicateObjectInfo, 100000),
 | 
						|
		workerKillCh:            make(chan struct{}, opts.Workers),
 | 
						|
		mrfWorkerKillCh:         make(chan struct{}, opts.FailedWorkers),
 | 
						|
		existingReplicaCh:       make(chan ReplicateObjectInfo, 100000),
 | 
						|
		existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
 | 
						|
		ctx:                     ctx,
 | 
						|
		objLayer:                o,
 | 
						|
	}
 | 
						|
 | 
						|
	pool.ResizeWorkers(opts.Workers)
 | 
						|
	pool.ResizeFailedWorkers(opts.FailedWorkers)
 | 
						|
	go pool.AddExistingObjectReplicateWorker()
 | 
						|
	return pool
 | 
						|
}
 | 
						|
 | 
						|
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
 | 
						|
// to the other workers
 | 
						|
func (p *ReplicationPool) AddMRFWorker() {
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-p.ctx.Done():
 | 
						|
			return
 | 
						|
		case oi, ok := <-p.mrfReplicaCh:
 | 
						|
			if !ok {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			replicateObject(p.ctx, oi, p.objLayer, ReplicateMRF)
 | 
						|
		case <-p.mrfWorkerKillCh:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// AddWorker adds a replication worker to the pool
 | 
						|
func (p *ReplicationPool) AddWorker() {
 | 
						|
	defer p.workerWg.Done()
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-p.ctx.Done():
 | 
						|
			return
 | 
						|
		case oi, ok := <-p.replicaCh:
 | 
						|
			if !ok {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			replicateObject(p.ctx, oi, p.objLayer, ReplicateIncoming)
 | 
						|
		case doi, ok := <-p.replicaDeleteCh:
 | 
						|
			if !ok {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			replicateDelete(p.ctx, doi, p.objLayer, ReplicateDelete)
 | 
						|
		case <-p.workerKillCh:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd
 | 
						|
func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-p.ctx.Done():
 | 
						|
			return
 | 
						|
		case oi, ok := <-p.existingReplicaCh:
 | 
						|
			if !ok {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			replicateObject(p.ctx, oi, p.objLayer, ReplicateExisting)
 | 
						|
		case doi, ok := <-p.existingReplicaDeleteCh:
 | 
						|
			if !ok {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			replicateDelete(p.ctx, doi, p.objLayer, ReplicateExistingDelete)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ResizeWorkers sets replication workers pool to new size
 | 
						|
func (p *ReplicationPool) ResizeWorkers(n int) {
 | 
						|
	p.mu.Lock()
 | 
						|
	defer p.mu.Unlock()
 | 
						|
 | 
						|
	for p.workerSize < n {
 | 
						|
		p.workerSize++
 | 
						|
		p.workerWg.Add(1)
 | 
						|
		go p.AddWorker()
 | 
						|
	}
 | 
						|
	for p.workerSize > n {
 | 
						|
		p.workerSize--
 | 
						|
		go func() { p.workerKillCh <- struct{}{} }()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ResizeFailedWorkers sets replication failed workers pool size
 | 
						|
func (p *ReplicationPool) ResizeFailedWorkers(n int) {
 | 
						|
	p.mu.Lock()
 | 
						|
	defer p.mu.Unlock()
 | 
						|
 | 
						|
	for p.mrfWorkerSize < n {
 | 
						|
		p.mrfWorkerSize++
 | 
						|
		p.mrfWorkerWg.Add(1)
 | 
						|
		go p.AddMRFWorker()
 | 
						|
	}
 | 
						|
	for p.mrfWorkerSize > n {
 | 
						|
		p.mrfWorkerSize--
 | 
						|
		go func() { p.mrfWorkerKillCh <- struct{}{} }()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
 | 
						|
	if p == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	select {
 | 
						|
	case <-GlobalContext.Done():
 | 
						|
		p.once.Do(func() {
 | 
						|
			close(p.replicaCh)
 | 
						|
			close(p.mrfReplicaCh)
 | 
						|
			close(p.existingReplicaCh)
 | 
						|
		})
 | 
						|
	case p.mrfReplicaCh <- ri:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
 | 
						|
	if p == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	var ch chan ReplicateObjectInfo
 | 
						|
	switch ri.OpType {
 | 
						|
	case replication.ExistingObjectReplicationType:
 | 
						|
		ch = p.existingReplicaCh
 | 
						|
	case replication.HealReplicationType:
 | 
						|
		fallthrough
 | 
						|
	default:
 | 
						|
		ch = p.replicaCh
 | 
						|
	}
 | 
						|
	select {
 | 
						|
	case <-GlobalContext.Done():
 | 
						|
		p.once.Do(func() {
 | 
						|
			close(p.replicaCh)
 | 
						|
			close(p.mrfReplicaCh)
 | 
						|
			close(p.existingReplicaCh)
 | 
						|
		})
 | 
						|
	case ch <- ri:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) {
 | 
						|
	if p == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var ch chan DeletedObjectReplicationInfo
 | 
						|
	switch doi.OpType {
 | 
						|
	case replication.ExistingObjectReplicationType:
 | 
						|
		ch = p.existingReplicaDeleteCh
 | 
						|
	case replication.HealReplicationType:
 | 
						|
		fallthrough
 | 
						|
	default:
 | 
						|
		ch = p.replicaDeleteCh
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-GlobalContext.Done():
 | 
						|
		p.once.Do(func() {
 | 
						|
			close(p.replicaDeleteCh)
 | 
						|
			close(p.existingReplicaDeleteCh)
 | 
						|
		})
 | 
						|
	case ch <- doi:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type replicationPoolOpts struct {
 | 
						|
	Workers       int
 | 
						|
	FailedWorkers int
 | 
						|
}
 | 
						|
 | 
						|
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
 | 
						|
	globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{
 | 
						|
		Workers:       globalAPIConfig.getReplicationWorkers(),
 | 
						|
		FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(),
 | 
						|
	})
 | 
						|
	globalReplicationStats = NewReplicationStats(ctx, objectAPI)
 | 
						|
}
 | 
						|
 | 
						|
// get Reader from replication target if active-active replication is in place and
 | 
						|
// this node returns a 404
 | 
						|
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, proxy bool) {
 | 
						|
	tgt, oi, proxy, err := proxyHeadToRepTarget(ctx, bucket, object, opts)
 | 
						|
	if !proxy || err != nil {
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
	fn, off, length, err := NewGetObjectReader(rs, oi, opts)
 | 
						|
	if err != nil {
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
	gopts := miniogo.GetObjectOptions{
 | 
						|
		VersionID:            opts.VersionID,
 | 
						|
		ServerSideEncryption: opts.ServerSideEncryption,
 | 
						|
		Internal: miniogo.AdvancedGetOptions{
 | 
						|
			ReplicationProxyRequest: "true",
 | 
						|
		},
 | 
						|
	}
 | 
						|
	// get correct offsets for encrypted object
 | 
						|
	if off >= 0 && length >= 0 {
 | 
						|
		if err := gopts.SetRange(off, off+length-1); err != nil {
 | 
						|
			return nil, false
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Make sure to match ETag when proxying.
 | 
						|
	if err = gopts.SetMatchETag(oi.ETag); err != nil {
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
	c := miniogo.Core{Client: tgt.Client}
 | 
						|
	obj, _, _, err := c.GetObject(ctx, bucket, object, gopts)
 | 
						|
	if err != nil {
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
	closeReader := func() { obj.Close() }
 | 
						|
 | 
						|
	reader, err := fn(obj, h, closeReader)
 | 
						|
	if err != nil {
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
	reader.ObjInfo = oi.Clone()
 | 
						|
	return reader, true
 | 
						|
}
 | 
						|
 | 
						|
// isProxyable returns true if replication config found for this bucket
 | 
						|
func isProxyable(ctx context.Context, bucket string) bool {
 | 
						|
	cfg, err := getReplicationConfig(ctx, bucket)
 | 
						|
	if err != nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	dest := cfg.GetDestination()
 | 
						|
	return dest.Bucket == bucket
 | 
						|
}
 | 
						|
 | 
						|
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (tgt *TargetClient, oi ObjectInfo, proxy bool, err error) {
 | 
						|
	// this option is set when active-active replication is in place between site A -> B,
 | 
						|
	// and site B does not have the object yet.
 | 
						|
	if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
 | 
						|
		return nil, oi, false, nil
 | 
						|
	}
 | 
						|
	cfg, err := getReplicationConfig(ctx, bucket)
 | 
						|
	if err != nil {
 | 
						|
		return nil, oi, false, err
 | 
						|
	}
 | 
						|
	dest := cfg.GetDestination()
 | 
						|
	if dest.Bucket != bucket { // not active-active
 | 
						|
		return nil, oi, false, err
 | 
						|
	}
 | 
						|
	ssec := false
 | 
						|
	if opts.ServerSideEncryption != nil {
 | 
						|
		ssec = opts.ServerSideEncryption.Type() == encrypt.SSEC
 | 
						|
	}
 | 
						|
	ropts := replication.ObjectOpts{
 | 
						|
		Name: object,
 | 
						|
		SSEC: ssec,
 | 
						|
	}
 | 
						|
	if !cfg.Replicate(ropts) { // no matching rule for object prefix
 | 
						|
		return nil, oi, false, nil
 | 
						|
	}
 | 
						|
	tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
 | 
						|
	if tgt == nil {
 | 
						|
		return nil, oi, false, fmt.Errorf("target is offline or not configured")
 | 
						|
	}
 | 
						|
	// if proxying explicitly disabled on remote target
 | 
						|
	if tgt.disableProxy {
 | 
						|
		return nil, oi, false, nil
 | 
						|
	}
 | 
						|
	gopts := miniogo.GetObjectOptions{
 | 
						|
		VersionID:            opts.VersionID,
 | 
						|
		ServerSideEncryption: opts.ServerSideEncryption,
 | 
						|
		Internal: miniogo.AdvancedGetOptions{
 | 
						|
			ReplicationProxyRequest: "true",
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	objInfo, err := tgt.StatObject(ctx, dest.Bucket, object, gopts)
 | 
						|
	if err != nil {
 | 
						|
		return nil, oi, false, err
 | 
						|
	}
 | 
						|
 | 
						|
	tags, _ := tags.MapToObjectTags(objInfo.UserTags)
 | 
						|
	oi = ObjectInfo{
 | 
						|
		Bucket:            bucket,
 | 
						|
		Name:              object,
 | 
						|
		ModTime:           objInfo.LastModified,
 | 
						|
		Size:              objInfo.Size,
 | 
						|
		ETag:              objInfo.ETag,
 | 
						|
		VersionID:         objInfo.VersionID,
 | 
						|
		IsLatest:          objInfo.IsLatest,
 | 
						|
		DeleteMarker:      objInfo.IsDeleteMarker,
 | 
						|
		ContentType:       objInfo.ContentType,
 | 
						|
		Expires:           objInfo.Expires,
 | 
						|
		StorageClass:      objInfo.StorageClass,
 | 
						|
		ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus),
 | 
						|
		UserTags:          tags.String(),
 | 
						|
	}
 | 
						|
	oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
 | 
						|
	for k, v := range objInfo.Metadata {
 | 
						|
		oi.UserDefined[k] = v[0]
 | 
						|
	}
 | 
						|
	ce, ok := oi.UserDefined[xhttp.ContentEncoding]
 | 
						|
	if !ok {
 | 
						|
		ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
 | 
						|
	}
 | 
						|
	if ok {
 | 
						|
		oi.ContentEncoding = ce
 | 
						|
	}
 | 
						|
	return tgt, oi, true, nil
 | 
						|
}
 | 
						|
 | 
						|
// get object info from replication target if active-active replication is in place and
 | 
						|
// this node returns a 404
 | 
						|
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, proxy bool, err error) {
 | 
						|
	_, oi, proxy, err = proxyHeadToRepTarget(ctx, bucket, object, opts)
 | 
						|
	return oi, proxy, err
 | 
						|
}
 | 
						|
 | 
						|
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) {
 | 
						|
	if sync {
 | 
						|
		replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o, ReplicateIncoming)
 | 
						|
	} else {
 | 
						|
		globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType})
 | 
						|
	}
 | 
						|
	if sz, err := objInfo.GetActualSize(); err == nil {
 | 
						|
		globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer, sync bool) {
 | 
						|
	globalReplicationPool.queueReplicaDeleteTask(dv)
 | 
						|
	globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
 | 
						|
}
 | 
						|
 | 
						|
type replicationConfig struct {
 | 
						|
	Config          *replication.Config
 | 
						|
	ResetID         string
 | 
						|
	ResetBeforeDate time.Time
 | 
						|
}
 | 
						|
 | 
						|
func (c replicationConfig) Empty() bool {
 | 
						|
	return c.Config == nil
 | 
						|
}
 | 
						|
func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
 | 
						|
	return c.Config.Replicate(opts)
 | 
						|
}
 | 
						|
 | 
						|
// Resync returns true if replication reset is requested
 | 
						|
func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo) bool {
 | 
						|
	if c.Empty() {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	// existing object replication does not apply to un-versioned objects
 | 
						|
	if oi.VersionID == "" || oi.VersionID == nullVersionID {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	var replicate bool
 | 
						|
	if oi.DeleteMarker {
 | 
						|
		if c.Replicate(replication.ObjectOpts{
 | 
						|
			Name:           oi.Name,
 | 
						|
			SSEC:           crypto.SSEC.IsEncrypted(oi.UserDefined),
 | 
						|
			UserTags:       oi.UserTags,
 | 
						|
			DeleteMarker:   oi.DeleteMarker,
 | 
						|
			VersionID:      oi.VersionID,
 | 
						|
			OpType:         replication.DeleteReplicationType,
 | 
						|
			ExistingObject: true}) {
 | 
						|
			replicate = true
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		// Ignore previous replication status when deciding if object can be re-replicated
 | 
						|
		objInfo := oi.Clone()
 | 
						|
		objInfo.ReplicationStatus = replication.StatusType("")
 | 
						|
		replicate, _ = mustReplicater(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType))
 | 
						|
	}
 | 
						|
	return c.resync(oi, replicate)
 | 
						|
}
 | 
						|
 | 
						|
// wrapper function for testability. Returns true if a new reset is requested on
 | 
						|
// already replicated objects OR object qualifies for existing object replication
 | 
						|
// and no reset requested.
 | 
						|
func (c replicationConfig) resync(oi ObjectInfo, replicate bool) bool {
 | 
						|
	if !replicate {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	rs, ok := oi.UserDefined[xhttp.MinIOReplicationResetStatus]
 | 
						|
	if !ok { // existing object replication is enabled and object version is unreplicated so far.
 | 
						|
		if c.ResetID != "" && oi.ModTime.Before(c.ResetBeforeDate) { // trigger replication if `mc replicate reset` requested
 | 
						|
			return true
 | 
						|
		}
 | 
						|
		return oi.ReplicationStatus != replication.Completed
 | 
						|
	}
 | 
						|
	if c.ResetID == "" || c.ResetBeforeDate.Equal(timeSentinel) { // no reset in progress
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	// if already replicated, return true if a new reset was requested.
 | 
						|
	splits := strings.SplitN(rs, ";", 2)
 | 
						|
	newReset := splits[1] != c.ResetID
 | 
						|
	if !newReset && oi.ReplicationStatus == replication.Completed {
 | 
						|
		// already replicated and no reset requested
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	return newReset && oi.ModTime.Before(c.ResetBeforeDate)
 | 
						|
}
 |