From def75ffcfe3f79d84f787a2fdf979ebce15c4a66 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 7 May 2022 18:39:40 -0700 Subject: [PATCH] allow versioning config changes under site replication (#14876) PR #14828 introduced prefix-level exclusion of versioning and replication - however our site replication implementation since it defaults versioning on all buckets did not allow changing versioning configuration once the bucket was created. This PR changes this and ensures that such changes are honored and also propagated/healed across sites appropriately. --- cmd/admin-handlers-site-replication.go | 2 + cmd/bucket-versioning-handler.go | 20 +++- cmd/site-replication.go | 146 ++++++++++++++++++++++--- go.mod | 2 +- go.sum | 4 +- 5 files changed, 151 insertions(+), 23 deletions(-) diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 059a97b4c..cce0ebe09 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -241,6 +241,8 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht return } } + case madmin.SRBucketMetaTypeVersionConfig: + err = globalSiteReplicationSys.PeerBucketVersioningHandler(ctx, item.Bucket, item.Versioning) case madmin.SRBucketMetaTypeTags: err = globalSiteReplicationSys.PeerBucketTaggingHandler(ctx, item.Bucket, item.Tags) case madmin.SRBucketMetaTypeObjectLockConfig: diff --git a/cmd/bucket-versioning-handler.go b/cmd/bucket-versioning-handler.go index 746bad119..7e96b3a79 100644 --- a/cmd/bucket-versioning-handler.go +++ b/cmd/bucket-versioning-handler.go @@ -18,12 +18,14 @@ package cmd import ( + "encoding/base64" "encoding/xml" "io" "net/http" humanize "github.com/dustin/go-humanize" "github.com/gorilla/mux" + "github.com/minio/madmin-go" "github.com/minio/minio/internal/bucket/versioning" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/bucket/policy" @@ -63,10 +65,10 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r return } - if globalSiteReplicationSys.isEnabled() { + if globalSiteReplicationSys.isEnabled() && !v.Enabled() { writeErrorResponse(ctx, w, APIError{ Code: "InvalidBucketState", - Description: "Cluster replication is enabled for this site, so the versioning state cannot be changed.", + Description: "Cluster replication is enabled for this site, so the versioning cannot be suspended.", HTTPStatusCode: http.StatusConflict, }, r.URL) return @@ -100,6 +102,20 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r return } + // Call site replication hook. + // + // We encode the xml bytes as base64 to ensure there are no encoding + // errors. + cfgStr := base64.StdEncoding.EncodeToString(configData) + if err = globalSiteReplicationSys.BucketMetaHook(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeVersionConfig, + Bucket: bucket, + Versioning: &cfgStr, + }); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + writeSuccessResponseHeadersOnly(w) } diff --git a/cmd/site-replication.go b/cmd/site-replication.go index eedd3752d..163a8161e 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -1216,6 +1216,23 @@ func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRB return cErr.summaryErr } +// PeerBucketVersioningHandler - updates versioning config to local cluster. +func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bucket string, versioning *string) error { + if versioning != nil { + configData, err := base64.StdEncoding.DecodeString(*versioning) + if err != nil { + return wrapSRErr(err) + } + err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData) + if err != nil { + return wrapSRErr(err) + } + return nil + } + + return nil +} + // PeerBucketPolicyHandler - copies/deletes policy to local cluster. func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *policy.Policy) error { if policy != nil { @@ -2151,6 +2168,7 @@ func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI O for b, stat := range sinfo.BucketStats { for dID, st := range stat { if st.TagMismatch || + st.VersioningConfigMismatch || st.OLockConfigMismatch || st.SSEConfigMismatch || st.PolicyMismatch || @@ -2548,7 +2566,8 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O replCfgs := make([]*sreplication.Config, numSites) quotaCfgs := make([]*madmin.BucketQuota, numSites) sseCfgSet := set.NewStringSet() - var tagCount, olockCfgCount, sseCfgCount int + versionCfgSet := set.NewStringSet() + var tagCount, olockCfgCount, sseCfgCount, versionCfgCount int for i, s := range slc { if s.ReplicationConfig != nil { cfgBytes, err := base64.StdEncoding.DecodeString(*s.ReplicationConfig) @@ -2561,6 +2580,16 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O } replCfgs[i] = cfg } + if s.Versioning != nil { + configData, err := base64.StdEncoding.DecodeString(*s.Versioning) + if err != nil { + continue + } + versionCfgCount++ + if !versionCfgSet.Contains(string(configData)) { + versionCfgSet.Add(string(configData)) + } + } if s.QuotaConfig != nil { cfgBytes, err := base64.StdEncoding.DecodeString(*s.QuotaConfig) if err != nil { @@ -2590,12 +2619,11 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O policies[i] = plcy } if s.ObjectLockConfig != nil { - olockCfgCount++ configData, err := base64.StdEncoding.DecodeString(*s.ObjectLockConfig) if err != nil { continue } - + olockCfgCount++ if !olockConfigSet.Contains(string(configData)) { olockConfigSet.Add(string(configData)) } @@ -2605,10 +2633,10 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O if err != nil { continue } + sseCfgCount++ if !sseCfgSet.Contains(string(configData)) { sseCfgSet.Add(string(configData)) } - sseCfgCount++ } ss, ok := info.StatsSummary[s.DeploymentID] if !ok { @@ -2628,6 +2656,9 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O if sseCfgCount > 0 { ss.TotalSSEConfigCount++ } + if versionCfgCount > 0 { + ss.TotalVersioningConfigCount++ + } if len(policies) > 0 { ss.TotalBucketPoliciesCount++ } @@ -2636,6 +2667,7 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O tagMismatch := !isReplicated(tagCount, numSites, tagSet) olockCfgMismatch := !isReplicated(olockCfgCount, numSites, olockConfigSet) sseCfgMismatch := !isReplicated(sseCfgCount, numSites, sseCfgSet) + versionCfgMismatch := !isReplicated(versionCfgCount, numSites, versionCfgSet) policyMismatch := !isBktPolicyReplicated(numSites, policies) replCfgMismatch := !isBktReplCfgReplicated(numSites, replCfgs) quotaCfgMismatch := !isBktQuotaCfgReplicated(numSites, quotaCfgs) @@ -2648,20 +2680,21 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O } quotaCfgSet := hasBucket && *quotaCfgs[i] != madmin.BucketQuota{} ss := madmin.SRBucketStatsSummary{ - DeploymentID: s.DeploymentID, - HasBucket: hasBucket, - TagMismatch: tagMismatch, - OLockConfigMismatch: olockCfgMismatch, - SSEConfigMismatch: sseCfgMismatch, - PolicyMismatch: policyMismatch, - ReplicationCfgMismatch: replCfgMismatch, - QuotaCfgMismatch: quotaCfgMismatch, - HasReplicationCfg: s.ReplicationConfig != nil, - HasTagsSet: s.Tags != nil, - HasOLockConfigSet: s.ObjectLockConfig != nil, - HasPolicySet: s.Policy != nil, - HasQuotaCfgSet: quotaCfgSet, - HasSSECfgSet: s.SSEConfig != nil, + DeploymentID: s.DeploymentID, + HasBucket: hasBucket, + TagMismatch: tagMismatch, + OLockConfigMismatch: olockCfgMismatch, + SSEConfigMismatch: sseCfgMismatch, + VersioningConfigMismatch: versionCfgMismatch, + PolicyMismatch: policyMismatch, + ReplicationCfgMismatch: replCfgMismatch, + QuotaCfgMismatch: quotaCfgMismatch, + HasReplicationCfg: s.ReplicationConfig != nil, + HasTagsSet: s.Tags != nil, + HasOLockConfigSet: s.ObjectLockConfig != nil, + HasPolicySet: s.Policy != nil, + HasQuotaCfgSet: quotaCfgSet, + HasSSECfgSet: s.SSEConfig != nil, } var m srBucketMetaInfo if len(bucketStats[s.Bucket]) > dIdx { @@ -2678,6 +2711,9 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O if !olockCfgMismatch && olockCfgCount == numSites { sum.ReplicatedLockConfig++ } + if !versionCfgMismatch && versionCfgCount == numSites { + sum.ReplicatedVersioningConfig++ + } if !sseCfgMismatch && sseCfgCount == numSites { sum.ReplicatedSSEConfig++ } @@ -3368,6 +3404,7 @@ func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer for bucket := range info.BucketStats { c.healCreateMissingBucket(ctx, objAPI, bucket, info) + c.healVersioningMetadata(ctx, objAPI, bucket, info) c.healOLockConfigMetadata(ctx, objAPI, bucket, info) c.healSSEMetadata(ctx, objAPI, bucket, info) c.healBucketReplicationConfig(ctx, objAPI, bucket, info) @@ -3588,6 +3625,79 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O return nil } +func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { + c.RLock() + defer c.RUnlock() + if !c.enabled { + return nil + } + var ( + latestID, latestPeerName string + lastUpdate time.Time + latestVersioningConfig *string + ) + + bs := info.BucketStats[bucket] + for dID, ss := range bs { + if lastUpdate.IsZero() { + lastUpdate = ss.meta.VersioningConfigUpdatedAt + latestID = dID + latestVersioningConfig = ss.meta.Versioning + } + // avoid considering just created buckets as latest. Perhaps this site + // just joined cluster replication and yet to be sync'd + if ss.meta.CreatedAt.Equal(ss.meta.VersioningConfigUpdatedAt) { + continue + } + if ss.meta.VersioningConfigUpdatedAt.After(lastUpdate) { + lastUpdate = ss.meta.VersioningConfigUpdatedAt + latestID = dID + latestVersioningConfig = ss.meta.Versioning + } + } + + latestPeerName = info.Sites[latestID].Name + var latestVersioningConfigBytes []byte + var err error + if latestVersioningConfig != nil { + latestVersioningConfigBytes, err = base64.StdEncoding.DecodeString(*latestVersioningConfig) + if err != nil { + return err + } + } + + for dID, bStatus := range bs { + if !bStatus.VersioningConfigMismatch { + continue + } + if isBucketMetadataEqual(latestVersioningConfig, bStatus.meta.Versioning) { + continue + } + if dID == globalDeploymentID { + if err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestVersioningConfigBytes); err != nil { + logger.LogIf(ctx, fmt.Errorf("Error healing sse metadata from peer site %s : %w", latestPeerName, err)) + } + continue + } + + admClient, err := c.getAdminClient(ctx, dID) + if err != nil { + return wrapSRErr(err) + } + peerName := info.Sites[dID].Name + err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ + Type: madmin.SRBucketMetaTypeVersionConfig, + Bucket: bucket, + Versioning: latestVersioningConfig, + }) + if err != nil { + logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %s", + peerName, latestPeerName, err.Error()))) + } + } + return nil +} + func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { c.RLock() defer c.RUnlock() diff --git a/go.mod b/go.mod index c75731d45..5fd513956 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/minio/dperf v0.3.6 github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.19.2 - github.com/minio/madmin-go v1.3.13 + github.com/minio/madmin-go v1.3.14 github.com/minio/minio-go/v7 v7.0.24 github.com/minio/pkg v1.1.23 github.com/minio/selfupdate v0.4.0 diff --git a/go.sum b/go.sum index e38a15b1e..d939c53e8 100644 --- a/go.sum +++ b/go.sum @@ -614,8 +614,8 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT github.com/minio/kes v0.19.2 h1:0kdMAgLMSkiDA33k8pMHC7d6erDuseuLrZF+N3017SM= github.com/minio/kes v0.19.2/go.mod h1:X2fMkDbAkjbSKDGOQZvyPkHxoG7nuzP6R78Jw+TzXtM= github.com/minio/madmin-go v1.3.5/go.mod h1:vGKGboQgGIWx4DuDUaXixjlIEZOCIp6ivJkQoiVaACc= -github.com/minio/madmin-go v1.3.13 h1:157u3bFK9qh2EkkqjpJ/bwOw/5KonXUWqhKP3ZczAdY= -github.com/minio/madmin-go v1.3.13/go.mod h1:ez87VmMtsxP7DRxjKJKD4RDNW+nhO2QF9KSzwxBDQ98= +github.com/minio/madmin-go v1.3.14 h1:9f9ZylP5Yn/TcplE/wowsBjb+Czt2+/NRCa2IqpNLcI= +github.com/minio/madmin-go v1.3.14/go.mod h1:ez87VmMtsxP7DRxjKJKD4RDNW+nhO2QF9KSzwxBDQ98= github.com/minio/mc v0.0.0-20220419155441-cc4ff3a0cc82 h1:CiTaWFwpxzjd7A3sUQ0xZEX8sWfZh3/k2qbxuPip05s= github.com/minio/mc v0.0.0-20220419155441-cc4ff3a0cc82/go.mod h1:h6VCl43/2AUA3RP1GWUVMqcUiXq2NWJ4+dSei+ibf70= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=