diff --git a/Makefile b/Makefile index def7285f1..88564d90c 100644 --- a/Makefile +++ b/Makefile @@ -49,18 +49,18 @@ test: verifiers build ## builds minio, runs linters, tests @echo "Running unit tests" @MINIO_API_REQUESTS_MAX=10000 CGO_ENABLED=0 go test -tags kqueue ./... -test-root-disable: install +test-root-disable: install-race @echo "Running minio root lockdown tests" @env bash $(PWD)/buildscripts/disable-root.sh -test-decom: install +test-decom: install-race @echo "Running minio decom tests" @env bash $(PWD)/docs/distributed/decom.sh @env bash $(PWD)/docs/distributed/decom-encrypted.sh @env bash $(PWD)/docs/distributed/decom-encrypted-sse-s3.sh @env bash $(PWD)/docs/distributed/decom-compressed-sse-s3.sh -test-upgrade: build +test-upgrade: install-race @echo "Running minio upgrade tests" @(env bash $(PWD)/buildscripts/minio-upgrade.sh) @@ -86,18 +86,18 @@ test-replication-3site: test-delete-replication: @(env bash $(PWD)/docs/bucket/replication/delete-replication.sh) -test-replication: install test-replication-2site test-replication-3site test-delete-replication test-sio-error ## verify multi site replication +test-replication: install-race test-replication-2site test-replication-3site test-delete-replication test-sio-error ## verify multi site replication @echo "Running tests for replicating three sites" -test-site-replication-ldap: install ## verify automatic site replication +test-site-replication-ldap: install-race ## verify automatic site replication @echo "Running tests for automatic site replication of IAM (with LDAP)" @(env bash $(PWD)/docs/site-replication/run-multi-site-ldap.sh) -test-site-replication-oidc: install ## verify automatic site replication +test-site-replication-oidc: install-race ## verify automatic site replication @echo "Running tests for automatic site replication of IAM (with OIDC)" @(env bash $(PWD)/docs/site-replication/run-multi-site-oidc.sh) -test-site-replication-minio: install ## verify automatic site replication +test-site-replication-minio: install-race ## verify automatic site replication @echo "Running tests for automatic site replication of IAM (with MinIO IDP)" @(env bash $(PWD)/docs/site-replication/run-multi-site-minio-idp.sh) @@ -159,6 +159,12 @@ docker: build ## builds minio docker container @echo "Building minio docker image '$(TAG)'" @docker build -q --no-cache -t $(TAG) . -f Dockerfile +install-race: checks ## builds minio to $(PWD) + @echo "Building minio binary to './minio'" + @GORACE=history_size=7 CGO_ENABLED=1 go build -tags kqueue -race -trimpath --ldflags "$(LDFLAGS)" -o $(PWD)/minio 1>/dev/null + @echo "Installing minio binary to '$(GOPATH)/bin/minio'" + @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/minio $(GOPATH)/bin/minio + install: build ## builds minio and installs it to $GOPATH/bin. @echo "Installing minio binary to '$(GOPATH)/bin/minio'" @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/minio $(GOPATH)/bin/minio diff --git a/buildscripts/disable-root.sh b/buildscripts/disable-root.sh index 7234a7043..fdef12068 100755 --- a/buildscripts/disable-root.sh +++ b/buildscripts/disable-root.sh @@ -56,6 +56,8 @@ done set +e +sleep 10 + ./mc ls minioadm/ if [ $? -ne 0 ]; then echo "listing failed, 'minioadmin' should be enabled" diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 1d623ae00..48816a4aa 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -32,7 +32,6 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/replication" xhttp "github.com/minio/minio/internal/http" - "github.com/minio/minio/internal/logger" ) //go:generate msgp -file=$GOFILE @@ -167,7 +166,21 @@ func (ri replicatedInfos) Action() replicationAction { var replStatusRegex = regexp.MustCompile(`([^=].*?)=([^,].*?);`) // TargetReplicationStatus - returns replication status of a target -func (o *ObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) { +func (ri ReplicateObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) { + repStatMatches := replStatusRegex.FindAllStringSubmatch(ri.ReplicationStatusInternal, -1) + for _, repStatMatch := range repStatMatches { + if len(repStatMatch) != 3 { + return + } + if repStatMatch[1] == arn { + return replication.StatusType(repStatMatch[2]) + } + } + return +} + +// TargetReplicationStatus - returns replication status of a target +func (o ObjectInfo) TargetReplicationStatus(arn string) (status replication.StatusType) { repStatMatches := replStatusRegex.FindAllStringSubmatch(o.ReplicationStatusInternal, -1) for _, repStatMatch := range repStatMatches { if len(repStatMatch) != 3 { @@ -185,7 +198,6 @@ type replicateTargetDecision struct { Synchronous bool // Synchronous replication configured. Arn string // ARN of replication target ID string - Tgt *TargetClient } func (t *replicateTargetDecision) String() string { @@ -207,7 +219,7 @@ type ReplicateDecision struct { } // ReplicateAny returns true if atleast one target qualifies for replication -func (d *ReplicateDecision) ReplicateAny() bool { +func (d ReplicateDecision) ReplicateAny() bool { for _, t := range d.targetsMap { if t.Replicate { return true @@ -217,7 +229,7 @@ func (d *ReplicateDecision) ReplicateAny() bool { } // Synchronous returns true if atleast one target qualifies for synchronous replication -func (d *ReplicateDecision) Synchronous() bool { +func (d ReplicateDecision) Synchronous() bool { for _, t := range d.targetsMap { if t.Synchronous { return true @@ -226,7 +238,7 @@ func (d *ReplicateDecision) Synchronous() bool { return false } -func (d *ReplicateDecision) String() string { +func (d ReplicateDecision) String() string { b := new(bytes.Buffer) for key, value := range d.targetsMap { fmt.Fprintf(b, "%s=%s,", key, value.String()) @@ -243,7 +255,7 @@ func (d *ReplicateDecision) Set(t replicateTargetDecision) { } // PendingStatus returns a stringified representation of internal replication status with all targets marked as `PENDING` -func (d *ReplicateDecision) PendingStatus() string { +func (d ReplicateDecision) PendingStatus() string { b := new(bytes.Buffer) for _, k := range d.targetsMap { if k.Replicate { @@ -259,11 +271,11 @@ type ResyncDecision struct { } // Empty returns true if no targets with resync decision present -func (r *ResyncDecision) Empty() bool { +func (r ResyncDecision) Empty() bool { return r.targets == nil } -func (r *ResyncDecision) mustResync() bool { +func (r ResyncDecision) mustResync() bool { for _, v := range r.targets { if v.Replicate { return true @@ -272,15 +284,12 @@ func (r *ResyncDecision) mustResync() bool { return false } -func (r *ResyncDecision) mustResyncTarget(tgtArn string) bool { +func (r ResyncDecision) mustResyncTarget(tgtArn string) bool { if r.targets == nil { return false } v, ok := r.targets[tgtArn] - if ok && v.Replicate { - return true - } - return false + return ok && v.Replicate } // ResyncTargetDecision is struct that represents resync decision for this target @@ -301,35 +310,20 @@ func parseReplicateDecision(ctx context.Context, bucket, s string) (r ReplicateD if len(s) == 0 { return } - pairs := strings.Split(s, ",") - for _, p := range pairs { + for _, p := range strings.Split(s, ",") { + if p == "" { + continue + } slc := strings.Split(p, "=") if len(slc) != 2 { return r, errInvalidReplicateDecisionFormat } - tgtStr := strings.TrimPrefix(slc[1], "\"") - tgtStr = strings.TrimSuffix(tgtStr, "\"") + tgtStr := strings.TrimSuffix(strings.TrimPrefix(slc[1], `"`), `"`) tgt := strings.Split(tgtStr, ";") if len(tgt) != 4 { return r, errInvalidReplicateDecisionFormat } - var replicate, sync bool - var err error - replicate, err = strconv.ParseBool(tgt[0]) - if err != nil { - return r, err - } - sync, err = strconv.ParseBool(tgt[1]) - if err != nil { - return r, err - } - tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(slc[0]) - if tgtClnt == nil { - // Skip stale targets if any and log them to be missing atleast once. - logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, slc[0]), slc[0]) - // We save the targetDecision even when its not configured or stale. - } - r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: replicate, Synchronous: sync, Arn: tgt[2], ID: tgt[3], Tgt: tgtClnt} + r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: tgt[0] == "true", Synchronous: tgt[1] == "true", Arn: tgt[2], ID: tgt[3]} } return } @@ -496,8 +490,8 @@ func getCompositeVersionPurgeStatus(m map[string]VersionPurgeStatusType) Version } // getHealReplicateObjectInfo returns info needed by heal replication in ReplicateObjectInfo -func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) ReplicateObjectInfo { - oi := objInfo.Clone() +func getHealReplicateObjectInfo(oi ObjectInfo, rcfg replicationConfig) ReplicateObjectInfo { + userDefined := cloneMSS(oi.UserDefined) if rcfg.Config != nil && rcfg.Config.RoleArn != "" { // For backward compatibility of objects pending/failed replication. // Save replication related statuses in the new internal representation for @@ -508,17 +502,15 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl if !oi.VersionPurgeStatus.Empty() { oi.VersionPurgeStatusInternal = fmt.Sprintf("%s=%s;", rcfg.Config.RoleArn, oi.VersionPurgeStatus) } - for k, v := range oi.UserDefined { + for k, v := range userDefined { if strings.EqualFold(k, ReservedMetadataPrefixLower+ReplicationReset) { - delete(oi.UserDefined, k) - oi.UserDefined[targetResetHeader(rcfg.Config.RoleArn)] = v + delete(userDefined, k) + userDefined[targetResetHeader(rcfg.Config.RoleArn)] = v } } } - var dsc ReplicateDecision - var tgtStatuses map[string]replication.StatusType - var purgeStatuses map[string]VersionPurgeStatusType + var dsc ReplicateDecision if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { dsc = checkReplicateDelete(GlobalContext, oi.Bucket, ObjectToDelete{ ObjectV: ObjectV{ @@ -530,16 +522,31 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name), }, nil) } else { - dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(ObjectInfo{ - UserDefined: oi.UserDefined, - }, replication.HealReplicationType, ObjectOptions{})) + dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(userDefined, oi.UserTags, "", replication.HealReplicationType, ObjectOptions{})) } - tgtStatuses = replicationStatusesMap(oi.ReplicationStatusInternal) - purgeStatuses = versionPurgeStatusesMap(oi.VersionPurgeStatusInternal) - existingObjResync := rcfg.Resync(GlobalContext, oi, &dsc, tgtStatuses) - tm, _ := time.Parse(time.RFC3339Nano, oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp]) + + tgtStatuses := replicationStatusesMap(oi.ReplicationStatusInternal) + purgeStatuses := versionPurgeStatusesMap(oi.VersionPurgeStatusInternal) + existingObjResync := rcfg.Resync(GlobalContext, oi, dsc, tgtStatuses) + tm, _ := time.Parse(time.RFC3339Nano, userDefined[ReservedMetadataPrefixLower+ReplicationTimestamp]) + rstate := oi.ReplicationState() + rstate.ReplicateDecisionStr = dsc.String() + asz, _ := oi.GetActualSize() + return ReplicateObjectInfo{ - ObjectInfo: oi, + Name: oi.Name, + Size: oi.Size, + ActualSize: asz, + Bucket: oi.Bucket, + VersionID: oi.VersionID, + ModTime: oi.ModTime, + ReplicationStatus: oi.ReplicationStatus, + ReplicationStatusInternal: oi.ReplicationStatusInternal, + DeleteMarker: oi.DeleteMarker, + VersionPurgeStatusInternal: oi.VersionPurgeStatusInternal, + VersionPurgeStatus: oi.VersionPurgeStatus, + + ReplicationState: rstate, OpType: replication.HealReplicationType, Dsc: dsc, ExistingObjResync: existingObjResync, @@ -549,14 +556,8 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl } } -func (ri *ReplicateObjectInfo) getReplicationState() ReplicationState { - rs := ri.ObjectInfo.getReplicationState() - rs.ReplicateDecisionStr = ri.Dsc.String() - return rs -} - -// vID here represents the versionID client specified in request - need to distinguish between delete marker and delete marker deletion -func (o *ObjectInfo) getReplicationState() ReplicationState { +// ReplicationState - returns replication state using other internal replication metadata in ObjectInfo +func (o ObjectInfo) ReplicationState() ReplicationState { rs := ReplicationState{ ReplicationStatusInternal: o.ReplicationStatusInternal, VersionPurgeStatusInternal: o.VersionPurgeStatusInternal, @@ -577,7 +578,7 @@ func (o *ObjectInfo) getReplicationState() ReplicationState { } // ReplicationState returns replication state using other internal replication metadata in ObjectToDelete -func (o *ObjectToDelete) ReplicationState() ReplicationState { +func (o ObjectToDelete) ReplicationState() ReplicationState { r := ReplicationState{ ReplicationStatusInternal: o.DeleteMarkerReplicationStatus, VersionPurgeStatusInternal: o.VersionPurgeStatuses, diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index d618483e7..72508b013 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -224,21 +224,19 @@ func (o mustReplicateOptions) isMetadataReplication() bool { return o.opType == replication.MetadataReplicationType } -func getMustReplicateOptions(o ObjectInfo, op replication.Type, opts ObjectOptions) mustReplicateOptions { - if !op.Valid() { - op = replication.ObjectReplicationType - if o.metadataOnly { - op = replication.MetadataReplicationType - } - } - meta := cloneMSS(o.UserDefined) - if o.UserTags != "" { - meta[xhttp.AmzObjectTagging] = o.UserTags +func (o ObjectInfo) getMustReplicateOptions(op replication.Type, opts ObjectOptions) mustReplicateOptions { + return getMustReplicateOptions(o.UserDefined, o.UserTags, o.ReplicationStatus, op, opts) +} + +func getMustReplicateOptions(userDefined map[string]string, userTags string, status replication.StatusType, op replication.Type, opts ObjectOptions) mustReplicateOptions { + meta := cloneMSS(userDefined) + if userTags != "" { + meta[xhttp.AmzObjectTagging] = userTags } return mustReplicateOptions{ meta: meta, - status: o.ReplicationStatus, + status: status, opType: op, replicationRequest: opts.ReplicationRequest, } @@ -356,40 +354,41 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet OpType: replication.DeleteReplicationType, } tgtArns := rcfg.FilterTargetArns(opts) - if len(tgtArns) > 0 { - dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns)) - var sync, replicate bool - for _, tgtArn := range tgtArns { - opts.TargetArn = tgtArn - replicate = rcfg.Replicate(opts) - // when incoming delete is removal of a delete marker(a.k.a versioned delete), - // GetObjectInfo returns extra information even though it returns errFileNotFound - if gerr != nil { - validReplStatus := false - switch oi.TargetReplicationStatus(tgtArn) { - case replication.Pending, replication.Completed, replication.Failed: - validReplStatus = true - } - if oi.DeleteMarker && (validReplStatus || replicate) { - dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync)) - continue - } else { - // can be the case that other cluster is down and duplicate `mc rm --vid` - // is issued - this still needs to be replicated back to the other target - replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed - dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync)) - continue - } + dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns)) + if len(tgtArns) == 0 { + return dsc + } + var sync, replicate bool + for _, tgtArn := range tgtArns { + opts.TargetArn = tgtArn + replicate = rcfg.Replicate(opts) + // when incoming delete is removal of a delete marker(a.k.a versioned delete), + // GetObjectInfo returns extra information even though it returns errFileNotFound + if gerr != nil { + validReplStatus := false + switch oi.TargetReplicationStatus(tgtArn) { + case replication.Pending, replication.Completed, replication.Failed: + validReplStatus = true } - tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn) - // the target online status should not be used here while deciding - // whether to replicate deletes as the target could be temporarily down - tgtDsc := newReplicateTargetDecision(tgtArn, false, false) - if tgt != nil { - tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync) + if oi.DeleteMarker && (validReplStatus || replicate) { + dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync)) + continue + } else { + // can be the case that other cluster is down and duplicate `mc rm --vid` + // is issued - this still needs to be replicated back to the other target + replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed + dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync)) + continue } - dsc.Set(tgtDsc) } + tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn) + // the target online status should not be used here while deciding + // whether to replicate deletes as the target could be temporarily down + tgtDsc := newReplicateTargetDecision(tgtArn, false, false) + if tgt != nil { + tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync) + } + dsc.Set(tgtDsc) } return dsc } @@ -483,15 +482,10 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj ctx = lkctx.Context() defer lk.Unlock(lkctx) + rinfos := replicatedInfos{Targets: make([]replicatedTargetInfo, 0, len(dsc.targetsMap))} var wg sync.WaitGroup - var rinfos replicatedInfos - rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap)) - idx := -1 + var mu sync.Mutex for _, tgtEntry := range dsc.targetsMap { - idx++ - if tgtEntry.Tgt == nil { - continue - } if !tgtEntry.Replicate { continue } @@ -499,11 +493,33 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn { continue } + tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(tgtEntry.Arn) + if tgtClnt == nil { + // Skip stale targets if any and log them to be missing atleast once. + logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtEntry.Arn), tgtEntry.Arn) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: ObjectInfo{ + Bucket: bucket, + Name: dobj.ObjectName, + VersionID: versionID, + DeleteMarker: dobj.DeleteMarker, + }, + UserAgent: "Internal: [Replication]", + Host: globalLocalNodeName, + }) + continue + } wg.Add(1) - go func(index int, tgt *TargetClient) { + go func(tgt *TargetClient) { defer wg.Done() - rinfos.Targets[index] = replicateDeleteToTarget(ctx, dobj, tgt) - }(idx, tgtEntry.Tgt) + tgtInfo := replicateDeleteToTarget(ctx, dobj, tgt) + + mu.Lock() + rinfos.Targets = append(rinfos.Targets, tgtInfo) + mu.Unlock() + }(tgtClnt) } wg.Wait() @@ -963,9 +979,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje }) }() - objInfo := ri.ObjectInfo - bucket := objInfo.Bucket - object := objInfo.Name + bucket := ri.Bucket + object := ri.Name cfg, err := getReplicationConfig(ctx, bucket) if err != nil { @@ -973,7 +988,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, - Object: objInfo, + Object: ri.ToObjectInfo(), UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) @@ -981,8 +996,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje } tgtArns := cfg.FilterTargetArns(replication.ObjectOpts{ Name: object, - SSEC: crypto.SSEC.IsEncrypted(objInfo.UserDefined), - UserTags: objInfo.UserTags, + SSEC: ri.SSEC, + UserTags: ri.UserTags, }) // Lock the object name before starting replication. // Use separate lock that doesn't collide with regular objects. @@ -992,7 +1007,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, - Object: objInfo, + Object: ri.ToObjectInfo(), UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) @@ -1002,32 +1017,38 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje ctx = lkctx.Context() defer lk.Unlock(lkctx) + rinfos := replicatedInfos{Targets: make([]replicatedTargetInfo, 0, len(tgtArns))} var wg sync.WaitGroup - var rinfos replicatedInfos - rinfos.Targets = make([]replicatedTargetInfo, len(tgtArns)) - for i, tgtArn := range tgtArns { + var mu sync.Mutex + for _, tgtArn := range tgtArns { tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn) if tgt == nil { logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn) sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, - Object: objInfo, + Object: ri.ToObjectInfo(), UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) continue } wg.Add(1) - go func(index int, tgt *TargetClient) { + go func(tgt *TargetClient) { defer wg.Done() + + var tgtInfo replicatedTargetInfo if ri.OpType == replication.ObjectReplicationType { // all incoming calls go through optimized path. - rinfos.Targets[index] = ri.replicateObject(ctx, objectAPI, tgt) + tgtInfo = ri.replicateObject(ctx, objectAPI, tgt) } else { - rinfos.Targets[index] = ri.replicateAll(ctx, objectAPI, tgt) + tgtInfo = ri.replicateAll(ctx, objectAPI, tgt) } - }(i, tgt) + + mu.Lock() + rinfos.Targets = append(rinfos.Targets, tgtInfo) + mu.Unlock() + }(tgt) } wg.Wait() @@ -1042,10 +1063,11 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje newReplStatusInternal := rinfos.ReplicationStatusInternal() // Note that internal replication status(es) may match for previously replicated objects - in such cases // metadata should be updated with last resync timestamp. - if objInfo.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() { + objInfo := ri.ToObjectInfo() + if ri.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() { popts := ObjectOptions{ - MTime: objInfo.ModTime, - VersionID: objInfo.VersionID, + MTime: ri.ModTime, + VersionID: ri.VersionID, EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) { oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) @@ -1055,14 +1077,18 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje oi.UserDefined[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp } } - if objInfo.UserTags != "" { - oi.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags + if ri.UserTags != "" { + oi.UserDefined[xhttp.AmzObjectTagging] = ri.UserTags } return dsc, nil }, } - _, _ = objectAPI.PutObjectMetadata(ctx, bucket, object, popts) + uobjInfo, _ := objectAPI.PutObjectMetadata(ctx, bucket, object, popts) + if uobjInfo.Name != "" { + objInfo = uobjInfo + } + opType := replication.MetadataReplicationType if rinfos.Action() == replicateAll { opType = replication.ObjectReplicationType @@ -1098,23 +1124,21 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje // The source object is then updated to reflect the replication status. func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { startTime := time.Now() - objInfo := ri.ObjectInfo.Clone() - bucket := objInfo.Bucket - object := objInfo.Name - sz, _ := objInfo.GetActualSize() + bucket := ri.Bucket + object := ri.Name rAction := replicateAll rinfo = replicatedTargetInfo{ - Size: sz, + Size: ri.ActualSize, Arn: tgt.ARN, - PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN), + PrevReplicationStatus: ri.TargetReplicationStatus(tgt.ARN), ReplicationStatus: replication.Failed, OpType: ri.OpType, ReplicationAction: rAction, endpoint: tgt.EndpointURL().Host, secure: tgt.EndpointURL().Scheme == "https", } - if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) { + if ri.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) { rinfo.ReplicationStatus = replication.Completed rinfo.ReplicationResynced = true return @@ -1125,7 +1149,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, - Object: objInfo, + Object: ri.ToObjectInfo(), UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) @@ -1136,12 +1160,13 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{ - VersionID: objInfo.VersionID, + VersionID: ri.VersionID, Versioned: versioned, VersionSuspended: versionSuspended, }) if err != nil { if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) { + objInfo := ri.ToObjectInfo() sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, @@ -1155,7 +1180,8 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj } defer gr.Close() - objInfo = gr.ObjInfo + objInfo := gr.ObjInfo + // make sure we have the latest metadata for metrics calculation rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN) @@ -1217,7 +1243,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj opts := &bandwidth.MonitorReaderOptions{ BucketOptions: bandwidth.BucketOptions{ - Name: objInfo.Bucket, + Name: ri.Bucket, ReplicationARN: tgt.ARN, }, HeaderSize: headerSize, @@ -1256,10 +1282,8 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj // The source object is then updated to reflect the replication status. func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { startTime := time.Now() - objInfo := ri.ObjectInfo.Clone() - bucket := objInfo.Bucket - object := objInfo.Name - sz, _ := objInfo.GetActualSize() + bucket := ri.Bucket + object := ri.Name // set defaults for replication action based on operation being performed - actual // replication action can only be determined after stat on remote. This default is @@ -1267,9 +1291,9 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object rAction := replicateMetadata rinfo = replicatedTargetInfo{ - Size: sz, + Size: ri.ActualSize, Arn: tgt.ARN, - PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN), + PrevReplicationStatus: ri.TargetReplicationStatus(tgt.ARN), ReplicationStatus: replication.Failed, OpType: ri.OpType, ReplicationAction: rAction, @@ -1282,7 +1306,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, - Object: objInfo, + Object: ri.ToObjectInfo(), UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) @@ -1293,12 +1317,13 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{ - VersionID: objInfo.VersionID, + VersionID: ri.VersionID, Versioned: versioned, VersionSuspended: versionSuspended, }) if err != nil { if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) { + objInfo := ri.ToObjectInfo() sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, @@ -1312,7 +1337,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } defer gr.Close() - objInfo = gr.ObjInfo + objInfo := gr.ObjInfo // make sure we have the latest metadata for metrics calculation rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN) @@ -1379,7 +1404,9 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } // object with same VersionID already exists, replication kicked off by // PutObject might have completed - if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending || objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed || ri.OpType == replication.ExistingObjectReplicationType { + if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending || + objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed || + ri.OpType == replication.ExistingObjectReplicationType { // if metadata is not updated for some reason after replication, such as // 503 encountered while updating metadata - make sure to set ReplicationStatus // as Completed. @@ -2233,8 +2260,35 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs return oi, proxy } -func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) { - ri := ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType, Dsc: dsc, EventType: ReplicateIncoming} +func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) { + tgtStatuses := replicationStatusesMap(oi.ReplicationStatusInternal) + purgeStatuses := versionPurgeStatusesMap(oi.VersionPurgeStatusInternal) + tm, _ := time.Parse(time.RFC3339Nano, oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp]) + rstate := oi.ReplicationState() + rstate.ReplicateDecisionStr = dsc.String() + asz, _ := oi.GetActualSize() + + ri := ReplicateObjectInfo{ + Name: oi.Name, + Size: oi.Size, + ActualSize: asz, + Bucket: oi.Bucket, + VersionID: oi.VersionID, + ModTime: oi.ModTime, + ReplicationStatus: oi.ReplicationStatus, + ReplicationStatusInternal: oi.ReplicationStatusInternal, + DeleteMarker: oi.DeleteMarker, + VersionPurgeStatusInternal: oi.VersionPurgeStatusInternal, + VersionPurgeStatus: oi.VersionPurgeStatus, + + ReplicationState: rstate, + OpType: opType, + Dsc: dsc, + TargetStatuses: tgtStatuses, + TargetPurgeStatuses: purgeStatuses, + ReplicationTimestamp: tm, + } + if dsc.Synchronous() { replicateObject(ctx, ri, o) } else { @@ -2263,7 +2317,7 @@ func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool { } // Resync returns true if replication reset is requested -func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) { +func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) { if c.Empty() { return } @@ -2272,8 +2326,6 @@ func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *Repli if oi.DeleteMarker { opts := replication.ObjectOpts{ Name: oi.Name, - SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined), - UserTags: oi.UserTags, DeleteMarker: oi.DeleteMarker, VersionID: oi.VersionID, OpType: replication.DeleteReplicationType, @@ -2294,23 +2346,19 @@ func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *Repli } // Ignore previous replication status when deciding if object can be re-replicated - objInfo := oi.Clone() - objInfo.ReplicationStatusInternal = "" - objInfo.VersionPurgeStatusInternal = "" - objInfo.ReplicationStatus = "" - objInfo.VersionPurgeStatus = "" - delete(objInfo.UserDefined, xhttp.AmzBucketReplicationStatus) - resyncdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType, ObjectOptions{})) - dsc = &resyncdsc - return c.resync(oi, dsc, tgtStatuses) + userDefined := cloneMSS(oi.UserDefined) + delete(userDefined, xhttp.AmzBucketReplicationStatus) + + rdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(userDefined, oi.UserTags, "", replication.ExistingObjectReplicationType, ObjectOptions{})) + return c.resync(oi, rdsc, tgtStatuses) } // wrapper function for testability. Returns true if a new reset is requested on // already replicated objects OR object qualifies for existing object replication // and no reset requested. -func (c replicationConfig) resync(oi ObjectInfo, dsc *ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) { +func (c replicationConfig) resync(oi ObjectInfo, dsc ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) { r = ResyncDecision{ - targets: make(map[string]ResyncTargetDecision), + targets: make(map[string]ResyncTargetDecision, len(dsc.targetsMap)), } if c.remotes == nil { return @@ -2567,7 +2615,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object ObjectName: roi.Name, DeleteMarkerVersionID: dmVersionID, VersionID: versionID, - ReplicationState: roi.getReplicationState(), + ReplicationState: roi.ReplicationState, DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, DeleteMarker: roi.DeleteMarker, }, @@ -3013,7 +3061,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf ObjectName: roi.Name, DeleteMarkerVersionID: dmVersionID, VersionID: versionID, - ReplicationState: roi.getReplicationState(), + ReplicationState: roi.ReplicationState, DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, DeleteMarker: roi.DeleteMarker, }, diff --git a/cmd/bucket-replication_test.go b/cmd/bucket-replication_test.go index c54af60cf..b16cdacfe 100644 --- a/cmd/bucket-replication_test.go +++ b/cmd/bucket-replication_test.go @@ -88,7 +88,7 @@ var replicationConfigTests = []struct { func TestReplicationResync(t *testing.T) { ctx := context.Background() for i, test := range replicationConfigTests { - if sync := test.rcfg.Resync(ctx, test.info, &test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync { + if sync := test.rcfg.Resync(ctx, test.info, test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync { t.Errorf("Test%d (%s): Resync got %t , want %t", i+1, test.name, sync.mustResync(), test.expectedSync) } } @@ -283,7 +283,7 @@ var ( func TestReplicationResyncwrapper(t *testing.T) { for i, test := range replicationConfigTests2 { - if sync := test.rcfg.resync(test.info, &test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync { + if sync := test.rcfg.resync(test.info, test.dsc, test.tgtStatuses); sync.mustResync() != test.expectedSync { t.Errorf("%s (%s): Replicationresync got %t , want %t", fmt.Sprintf("Test%d - %s", i+1, time.Now().Format(http.TimeFormat)), test.name, sync.mustResync(), test.expectedSync) } } diff --git a/cmd/encryption-v1.go b/cmd/encryption-v1.go index 885cc0257..096738980 100644 --- a/cmd/encryption-v1.go +++ b/cmd/encryption-v1.go @@ -728,7 +728,7 @@ func (d *DecryptBlocksReader) Read(p []byte) (int, error) { // DecryptedSize returns the size of the object after decryption in bytes. // It returns an error if the object is not encrypted or marked as encrypted // but has an invalid size. -func (o *ObjectInfo) DecryptedSize() (int64, error) { +func (o ObjectInfo) DecryptedSize() (int64, error) { if _, ok := crypto.IsEncrypted(o.UserDefined); !ok { return 0, errors.New("Cannot compute decrypted size of an unencrypted object") } diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 8f721319e..95b787680 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -68,6 +68,34 @@ type PoolDecommissionInfo struct { BytesFailed int64 `json:"bytesDecommissionedFailed" msg:"bf"` } +// Clone make a copy of PoolDecommissionInfo +func (pd *PoolDecommissionInfo) Clone() *PoolDecommissionInfo { + if pd == nil { + return nil + } + if pd.StartTime.IsZero() { + return nil + } + return &PoolDecommissionInfo{ + StartTime: pd.StartTime, + StartSize: pd.StartSize, + TotalSize: pd.TotalSize, + CurrentSize: pd.CurrentSize, + Complete: pd.Complete, + Failed: pd.Failed, + Canceled: pd.Canceled, + QueuedBuckets: pd.QueuedBuckets, + DecommissionedBuckets: pd.DecommissionedBuckets, + Bucket: pd.Bucket, + Prefix: pd.Prefix, + Object: pd.Object, + ItemsDecommissioned: pd.ItemsDecommissioned, + ItemsDecommissionFailed: pd.ItemsDecommissionFailed, + BytesDone: pd.BytesDone, + BytesFailed: pd.BytesFailed, + } +} + // bucketPop should be called when a bucket is done decommissioning. // Adds the bucket to the list of decommissioned buckets and updates resume numbers. func (pd *PoolDecommissionInfo) bucketPop(bucket string) { @@ -118,6 +146,16 @@ type PoolStatus struct { Decommission *PoolDecommissionInfo `json:"decommissionInfo,omitempty" msg:"dec"` } +// Clone returns a copy of PoolStatus +func (ps PoolStatus) Clone() PoolStatus { + return PoolStatus{ + ID: ps.ID, + CmdLine: ps.CmdLine, + LastUpdate: ps.LastUpdate, + Decommission: ps.Decommission.Clone(), + } +} + //go:generate msgp -file $GOFILE -unexported type poolMeta struct { Version int `msg:"v"` @@ -375,16 +413,17 @@ func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasure func (p *poolMeta) CountItem(idx int, size int64, failed bool) { pd := p.Pools[idx].Decommission - if pd != nil { - if failed { - pd.ItemsDecommissionFailed++ - pd.BytesFailed += size - } else { - pd.ItemsDecommissioned++ - pd.BytesDone += size - } - p.Pools[idx].Decommission = pd + if pd == nil { + return } + if failed { + pd.ItemsDecommissionFailed++ + pd.BytesFailed += size + } else { + pd.ItemsDecommissioned++ + pd.BytesDone += size + } + p.Pools[idx].Decommission = pd } func (p *poolMeta) updateAfter(ctx context.Context, idx int, pools []*erasureSets, duration time.Duration) (bool, error) { @@ -1185,15 +1224,15 @@ func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, e return PoolStatus{}, errInvalidArgument } - z.poolMetaMutex.RLock() - defer z.poolMetaMutex.RUnlock() - pi, err := z.getDecommissionPoolSpaceInfo(idx) if err != nil { return PoolStatus{}, err } - poolInfo := z.poolMeta.Pools[idx] + z.poolMetaMutex.RLock() + defer z.poolMetaMutex.RUnlock() + + poolInfo := z.poolMeta.Pools[idx].Clone() if poolInfo.Decommission != nil { poolInfo.Decommission.TotalSize = pi.Total if poolInfo.Decommission.Failed || poolInfo.Decommission.Canceled { diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 78175355e..b8fedf71a 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -109,6 +109,9 @@ type ObjectInfo struct { // Total object size. Size int64 + // Actual size is the real size of the object uploaded by client. + ActualSize *int64 + // IsDir indicates if the object is prefix. IsDir bool @@ -282,9 +285,44 @@ func (o ObjectInfo) tierStats() tierStats { return ts } +// ToObjectInfo converts a replication object info to a partial ObjectInfo +// do not rely on this function to give you correct ObjectInfo, this +// function is merely and optimization. +func (ri ReplicateObjectInfo) ToObjectInfo() ObjectInfo { + return ObjectInfo{ + Name: ri.Name, + Bucket: ri.Bucket, + VersionID: ri.VersionID, + ModTime: ri.ModTime, + UserTags: ri.UserTags, + Size: ri.Size, + ActualSize: &ri.ActualSize, + ReplicationStatus: ri.ReplicationStatus, + ReplicationStatusInternal: ri.ReplicationStatusInternal, + VersionPurgeStatus: ri.VersionPurgeStatus, + VersionPurgeStatusInternal: ri.VersionPurgeStatusInternal, + DeleteMarker: true, + UserDefined: map[string]string{}, + } +} + // ReplicateObjectInfo represents object info to be replicated type ReplicateObjectInfo struct { - ObjectInfo + Name string + Bucket string + VersionID string + Size int64 + ActualSize int64 + ModTime time.Time + UserTags string + SSEC bool + ReplicationStatus replication.StatusType + ReplicationStatusInternal string + VersionPurgeStatusInternal string + VersionPurgeStatus VersionPurgeStatusType + ReplicationState ReplicationState + DeleteMarker bool + OpType replication.Type EventType string RetryCount uint32 @@ -529,7 +567,7 @@ type PartInfo struct { // Size in bytes of the part. Size int64 - // Decompressed Size. + // Real size of the object uploaded by client. ActualSize int64 // Checksum values diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 5a84d9b88..899fd1b11 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -507,7 +507,10 @@ func (o *ObjectInfo) IsCompressedOK() (bool, error) { } // GetActualSize - returns the actual size of the stored object -func (o *ObjectInfo) GetActualSize() (int64, error) { +func (o ObjectInfo) GetActualSize() (int64, error) { + if o.ActualSize != nil { + return *o.ActualSize, nil + } if o.IsCompressed() { sizeStr, ok := o.UserDefined[ReservedMetadataPrefix+"actual-size"] if !ok { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index f5060e8b5..4a1a58642 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1427,7 +1427,12 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano) srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs } - if dsc := mustReplicate(ctx, dstBucket, dstObject, getMustReplicateOptions(srcInfo, replication.UnsetReplicationType, dstOpts)); dsc.ReplicateAny() { + + op := replication.ObjectReplicationType + if srcInfo.metadataOnly { + op = replication.MetadataReplicationType + } + if dsc := mustReplicate(ctx, dstBucket, dstObject, srcInfo.getMustReplicateOptions(op, dstOpts)); dsc.ReplicateAny() { srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) } @@ -1500,6 +1505,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL) return } + objInfo.UserDefined = cloneMSS(opts.UserMetadata) objInfo.ETag = remoteObjInfo.ETag objInfo.ModTime = remoteObjInfo.LastModified } else { @@ -1533,8 +1539,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) - if dsc := mustReplicate(ctx, dstBucket, dstObject, getMustReplicateOptions(objInfo, replication.UnsetReplicationType, dstOpts)); dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) + if dsc := mustReplicate(ctx, dstBucket, dstObject, objInfo.getMustReplicateOptions(replication.ObjectReplicationType, dstOpts)); dsc.ReplicateAny() { + scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType) } setPutObjHeaders(w, objInfo, false) @@ -1815,9 +1821,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) return } - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ - UserDefined: metadata, - }, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() } @@ -1923,10 +1927,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } } - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ - UserDefined: metadata, - }, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { + scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType) } setPutObjHeaders(w, objInfo, false) @@ -2182,9 +2184,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h return ObjectLocked{} } - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ - UserDefined: metadata, - }, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() @@ -2235,10 +2235,8 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h return err } - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ - UserDefined: metadata, - }, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { + scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType) } // Notify object created event. @@ -2453,7 +2451,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. DeleteMarkerVersionID: dmVersionID, DeleteMarkerMTime: DeleteMarkerMTime{objInfo.ModTime}, DeleteMarker: objInfo.DeleteMarker, - ReplicationState: objInfo.getReplicationState(), + ReplicationState: objInfo.ReplicationState(), }, Bucket: bucket, EventType: ReplicateIncomingDelete, @@ -2528,7 +2526,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano) - dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts)) + dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts)) if dsc.ReplicateAny() { oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() @@ -2543,9 +2541,9 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r return } - dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType, opts)) + dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts)) if dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.MetadataReplicationType) + scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType) } writeSuccessResponseHeadersOnly(w) @@ -2697,7 +2695,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = "" } oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano) - dsc = mustReplicate(ctx, bucket, object, getMustReplicateOptions(*oi, replication.MetadataReplicationType, opts)) + dsc = mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts)) if dsc.ReplicateAny() { oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() @@ -2712,9 +2710,9 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r return } - dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType, opts)) + dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts)) if dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.MetadataReplicationType) + scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType) } writeSuccessResponseHeadersOnly(w) @@ -2923,9 +2921,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h } tagsStr := tags.String() - oi := objInfo.Clone() - oi.UserTags = tagsStr - dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType, opts)) + dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo.UserDefined, tagsStr, objInfo.ReplicationStatus, replication.MetadataReplicationType, opts)) if dsc.ReplicateAny() { opts.UserDefined = make(map[string]string) opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) @@ -2941,7 +2937,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h } if dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objAPI, dsc, replication.MetadataReplicationType) + scheduleReplication(ctx, objInfo, objAPI, dsc, replication.MetadataReplicationType) } if objInfo.VersionID != "" && objInfo.VersionID != nullVersionID { @@ -3003,7 +2999,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r return } - dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType, opts)) + dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts)) if dsc.ReplicateAny() { opts.UserDefined = make(map[string]string) opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) @@ -3017,7 +3013,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r } if dsc.ReplicateAny() { - scheduleReplication(ctx, oi.Clone(), objAPI, dsc, replication.MetadataReplicationType) + scheduleReplication(ctx, oi, objAPI, dsc, replication.MetadataReplicationType) } if oi.VersionID != "" && oi.VersionID != nullVersionID { diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 7eaaf9937..0e4b22ba1 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -164,9 +164,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL) return } - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{ - UserDefined: metadata, - }, replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() { + if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() { metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano) metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus() } @@ -997,8 +995,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } setPutObjHeaders(w, objInfo, false) - if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) + if dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { + scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType) } if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { actualSize, _ := objInfo.GetActualSize() diff --git a/docs/distributed/decom.sh b/docs/distributed/decom.sh index 53c373623..8a2526bbf 100755 --- a/docs/distributed/decom.sh +++ b/docs/distributed/decom.sh @@ -18,7 +18,7 @@ export CI=true (minio server /tmp/xl/{1...10}/disk{0...1} 2>&1 >/tmp/decom.log) & pid=$! -sleep 2 +sleep 10 export MC_HOST_myminio="http://minioadmin:minioadmin@localhost:9000/" @@ -48,26 +48,30 @@ policy_count=$(./mc admin policy list myminio/ | wc -l) ## create a warm tier instance (minio server /tmp/xltier/{1...4}/disk{0...1} --address :9001 2>&1 >/dev/null) & -sleep 2 +sleep 10 export MC_HOST_mytier="http://minioadmin:minioadmin@localhost:9001/" ./mc mb -l myminio/bucket2 ./mc mb -l mytier/tiered + ## create a tier and set up ilm policy to tier immediately ./mc admin tier add minio myminio TIER1 --endpoint http://localhost:9001 --access-key minioadmin --secret-key minioadmin --bucket tiered --prefix prefix5/ ./mc ilm add myminio/bucket2 --transition-days 0 --transition-tier TIER1 --transition-days 0 + ## mirror some content to bucket2 and capture versions tiered ./mc mirror internal myminio/bucket2/ --quiet >/dev/null ./mc ls -r myminio/bucket2/ >bucket2_ns.txt ./mc ls -r --versions myminio/bucket2/ >bucket2_ns_versions.txt -sleep 2 + +sleep 10 + ./mc ls -r --versions mytier/tiered/ >tiered_ns_versions.txt kill $pid (minio server /tmp/xl/{1...10}/disk{0...1} /tmp/xl/{11...30}/disk{0...3} 2>&1 >/tmp/expanded.log) & pid=$! -sleep 2 +sleep 10 expanded_user_count=$(./mc admin user list myminio/ | wc -l) expanded_policy_count=$(./mc admin policy list myminio/ | wc -l) @@ -106,7 +110,7 @@ kill $pid (minio server /tmp/xl/{11...30}/disk{0...3} 2>&1 >/dev/null) & pid=$! -sleep 2 +sleep 10 decom_user_count=$(./mc admin user list myminio/ | wc -l) decom_policy_count=$(./mc admin policy list myminio/ | wc -l)