diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 059a97b4c..cce0ebe09 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -241,6 +241,8 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht return } } + case madmin.SRBucketMetaTypeVersionConfig: + err = globalSiteReplicationSys.PeerBucketVersioningHandler(ctx, item.Bucket, item.Versioning) case madmin.SRBucketMetaTypeTags: err = globalSiteReplicationSys.PeerBucketTaggingHandler(ctx, item.Bucket, item.Tags) case madmin.SRBucketMetaTypeObjectLockConfig: diff --git a/cmd/bucket-versioning-handler.go b/cmd/bucket-versioning-handler.go index 746bad119..7e96b3a79 100644 --- a/cmd/bucket-versioning-handler.go +++ b/cmd/bucket-versioning-handler.go @@ -18,12 +18,14 @@ package cmd import ( + "encoding/base64" "encoding/xml" "io" "net/http" humanize "github.com/dustin/go-humanize" "github.com/gorilla/mux" + "github.com/minio/madmin-go" "github.com/minio/minio/internal/bucket/versioning" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/bucket/policy" @@ -63,10 +65,10 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r return } - if globalSiteReplicationSys.isEnabled() { + if globalSiteReplicationSys.isEnabled() && !v.Enabled() { writeErrorResponse(ctx, w, APIError{ Code: "InvalidBucketState", - Description: "Cluster replication is enabled for this site, so the versioning state cannot be changed.", + Description: "Cluster replication is enabled for this site, so the versioning cannot be suspended.", HTTPStatusCode: http.StatusConflict, }, r.URL) return @@ -100,6 +102,20 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r return } + // Call site replication hook. + // + // We encode the xml bytes as base64 to ensure there are no encoding + // errors. + cfgStr := base64.StdEncoding.EncodeToString(configData) + if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeVersionConfig, + Bucket: bucket, + Versioning: &cfgStr, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + writeSuccessResponseHeadersOnly(w) } diff --git a/cmd/site-replication.go b/cmd/site-replication.go index eedd3752d..163a8161e 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -1216,6 +1216,23 @@ func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRB return cErr.summaryErr } +// PeerBucketVersioningHandler - updates versioning config to local cluster. +func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bucket string, versioning *string) error { + if versioning != nil { + configData, err := base64.StdEncoding.DecodeString(*versioning) + if err != nil { + return wrapSRErr(err) + } + err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData) + if err != nil { + return wrapSRErr(err) + } + return nil + } + + return nil +} + // PeerBucketPolicyHandler - copies/deletes policy to local cluster. func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *policy.Policy) error { if policy != nil { @@ -2151,6 +2168,7 @@ func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI O for b, stat := range sinfo.BucketStats { for dID, st := range stat { if st.TagMismatch || + st.VersioningConfigMismatch || st.OLockConfigMismatch || st.SSEConfigMismatch || st.PolicyMismatch || @@ -2548,7 +2566,8 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O replCfgs := make([]*sreplication.Config, numSites) quotaCfgs := make([]*madmin.BucketQuota, numSites) sseCfgSet := set.NewStringSet() - var tagCount, olockCfgCount, sseCfgCount int + versionCfgSet := set.NewStringSet() + var tagCount, olockCfgCount, sseCfgCount, versionCfgCount int for i, s := range slc { if s.ReplicationConfig != nil { cfgBytes, err := base64.StdEncoding.DecodeString(*s.ReplicationConfig) @@ -2561,6 +2580,16 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O } replCfgs[i] = cfg } + if s.Versioning != nil { + configData, err := base64.StdEncoding.DecodeString(*s.Versioning) + if err != nil { + continue + } + versionCfgCount++ + if !versionCfgSet.Contains(string(configData)) { + versionCfgSet.Add(string(configData)) + } + } if s.QuotaConfig != nil { cfgBytes, err := base64.StdEncoding.DecodeString(*s.QuotaConfig) if err != nil { @@ -2590,12 +2619,11 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O policies[i] = plcy } if s.ObjectLockConfig != nil { - olockCfgCount++ configData, err := base64.StdEncoding.DecodeString(*s.ObjectLockConfig) if err != nil { continue } - + olockCfgCount++ if !olockConfigSet.Contains(string(configData)) { olockConfigSet.Add(string(configData)) } @@ -2605,10 +2633,10 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O if err != nil { continue } + sseCfgCount++ if !sseCfgSet.Contains(string(configData)) { sseCfgSet.Add(string(configData)) } - sseCfgCount++ } ss, ok := info.StatsSummary[s.DeploymentID] if !ok { @@ -2628,6 +2656,9 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O if sseCfgCount > 0 { ss.TotalSSEConfigCount++ } + if versionCfgCount > 0 { + ss.TotalVersioningConfigCount++ + } if len(policies) > 0 { ss.TotalBucketPoliciesCount++ } @@ -2636,6 +2667,7 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O tagMismatch := !isReplicated(tagCount, numSites, tagSet) olockCfgMismatch := !isReplicated(olockCfgCount, numSites, olockConfigSet) sseCfgMismatch := !isReplicated(sseCfgCount, numSites, sseCfgSet) + versionCfgMismatch := !isReplicated(versionCfgCount, numSites, versionCfgSet) policyMismatch := !isBktPolicyReplicated(numSites, policies) replCfgMismatch := !isBktReplCfgReplicated(numSites, replCfgs) quotaCfgMismatch := !isBktQuotaCfgReplicated(numSites, quotaCfgs) @@ -2648,20 +2680,21 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O } quotaCfgSet := hasBucket && *quotaCfgs[i] != madmin.BucketQuota{} ss := madmin.SRBucketStatsSummary{ - DeploymentID: s.DeploymentID, - HasBucket: hasBucket, - TagMismatch: tagMismatch, - OLockConfigMismatch: olockCfgMismatch, - SSEConfigMismatch: sseCfgMismatch, - PolicyMismatch: policyMismatch, - ReplicationCfgMismatch: replCfgMismatch, - QuotaCfgMismatch: quotaCfgMismatch, - HasReplicationCfg: s.ReplicationConfig != nil, - HasTagsSet: s.Tags != nil, - HasOLockConfigSet: s.ObjectLockConfig != nil, - HasPolicySet: s.Policy != nil, - HasQuotaCfgSet: quotaCfgSet, - HasSSECfgSet: s.SSEConfig != nil, + DeploymentID: s.DeploymentID, + HasBucket: hasBucket, + TagMismatch: tagMismatch, + OLockConfigMismatch: olockCfgMismatch, + SSEConfigMismatch: sseCfgMismatch, + VersioningConfigMismatch: versionCfgMismatch, + PolicyMismatch: policyMismatch, + ReplicationCfgMismatch: replCfgMismatch, + QuotaCfgMismatch: quotaCfgMismatch, + HasReplicationCfg: s.ReplicationConfig != nil, + HasTagsSet: s.Tags != nil, + HasOLockConfigSet: s.ObjectLockConfig != nil, + HasPolicySet: s.Policy != nil, + HasQuotaCfgSet: quotaCfgSet, + HasSSECfgSet: s.SSEConfig != nil, } var m srBucketMetaInfo if len(bucketStats[s.Bucket]) > dIdx { @@ -2678,6 +2711,9 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O if !olockCfgMismatch && olockCfgCount == numSites { sum.ReplicatedLockConfig++ } + if !versionCfgMismatch && versionCfgCount == numSites { + sum.ReplicatedVersioningConfig++ + } if !sseCfgMismatch && sseCfgCount == numSites { sum.ReplicatedSSEConfig++ } @@ -3368,6 +3404,7 @@ func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer for bucket := range info.BucketStats { c.healCreateMissingBucket(ctx, objAPI, bucket, info) + c.healVersioningMetadata(ctx, objAPI, bucket, info) c.healOLockConfigMetadata(ctx, objAPI, bucket, info) c.healSSEMetadata(ctx, objAPI, bucket, info) c.healBucketReplicationConfig(ctx, objAPI, bucket, info) @@ -3588,6 +3625,79 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O return nil } +func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { + c.RLock() + defer c.RUnlock() + if !c.enabled { + return nil + } + var ( + latestID, latestPeerName string + lastUpdate time.Time + latestVersioningConfig *string + ) + + bs := info.BucketStats[bucket] + for dID, ss := range bs { + if lastUpdate.IsZero() { + lastUpdate = ss.meta.VersioningConfigUpdatedAt + latestID = dID + latestVersioningConfig = ss.meta.Versioning + } + // avoid considering just created buckets as latest. Perhaps this site + // just joined cluster replication and yet to be sync'd + if ss.meta.CreatedAt.Equal(ss.meta.VersioningConfigUpdatedAt) { + continue + } + if ss.meta.VersioningConfigUpdatedAt.After(lastUpdate) { + lastUpdate = ss.meta.VersioningConfigUpdatedAt + latestID = dID + latestVersioningConfig = ss.meta.Versioning + } + } + + latestPeerName = info.Sites[latestID].Name + var latestVersioningConfigBytes []byte + var err error + if latestVersioningConfig != nil { + latestVersioningConfigBytes, err = base64.StdEncoding.DecodeString(*latestVersioningConfig) + if err != nil { + return err + } + } + + for dID, bStatus := range bs { + if !bStatus.VersioningConfigMismatch { + continue + } + if isBucketMetadataEqual(latestVersioningConfig, bStatus.meta.Versioning) { + continue + } + if dID == globalDeploymentID { + if err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestVersioningConfigBytes); err != nil { + logger.LogIf(ctx, fmt.Errorf("Error healing sse metadata from peer site %s : %w", latestPeerName, err)) + } + continue + } + + admClient, err := c.getAdminClient(ctx, dID) + if err != nil { + return wrapSRErr(err) + } + peerName := info.Sites[dID].Name + err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeVersionConfig, + Bucket: bucket, + Versioning: latestVersioningConfig, + }) + if err != nil { + logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %s", + peerName, latestPeerName, err.Error()))) + } + } + return nil +} + func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { c.RLock() defer c.RUnlock() diff --git a/go.mod b/go.mod index c75731d45..5fd513956 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/minio/dperf v0.3.6 github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.19.2 - github.com/minio/madmin-go v1.3.13 + github.com/minio/madmin-go v1.3.14 github.com/minio/minio-go/v7 v7.0.24 github.com/minio/pkg v1.1.23 github.com/minio/selfupdate v0.4.0 diff --git a/go.sum b/go.sum index e38a15b1e..d939c53e8 100644 --- a/go.sum +++ b/go.sum @@ -614,8 +614,8 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT github.com/minio/kes v0.19.2 h1:0kdMAgLMSkiDA33k8pMHC7d6erDuseuLrZF+N3017SM= github.com/minio/kes v0.19.2/go.mod h1:X2fMkDbAkjbSKDGOQZvyPkHxoG7nuzP6R78Jw+TzXtM= github.com/minio/madmin-go v1.3.5/go.mod h1:vGKGboQgGIWx4DuDUaXixjlIEZOCIp6ivJkQoiVaACc= -github.com/minio/madmin-go v1.3.13 h1:157u3bFK9qh2EkkqjpJ/bwOw/5KonXUWqhKP3ZczAdY= -github.com/minio/madmin-go v1.3.13/go.mod h1:ez87VmMtsxP7DRxjKJKD4RDNW+nhO2QF9KSzwxBDQ98= +github.com/minio/madmin-go v1.3.14 h1:9f9ZylP5Yn/TcplE/wowsBjb+Czt2+/NRCa2IqpNLcI= +github.com/minio/madmin-go v1.3.14/go.mod h1:ez87VmMtsxP7DRxjKJKD4RDNW+nhO2QF9KSzwxBDQ98= github.com/minio/mc v0.0.0-20220419155441-cc4ff3a0cc82 h1:CiTaWFwpxzjd7A3sUQ0xZEX8sWfZh3/k2qbxuPip05s= github.com/minio/mc v0.0.0-20220419155441-cc4ff3a0cc82/go.mod h1:h6VCl43/2AUA3RP1GWUVMqcUiXq2NWJ4+dSei+ibf70= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=