From 38c0840834d43e79c91fb018b8dd616ef21bdab8 Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Wed, 28 Aug 2024 16:32:18 +0100 Subject: [PATCH] bucket-metadata: Reload events/repl-targets for all buckets (#20334) Currently, the bucket events and replication targets are only reloaded with buckets that failed to load during the first cluster startup, which is wrong because if one bucket change was done in one node but that node was not able to notify other nodes; the other nodes will reload the bucket metadata config but fails to set the events and bucket targets in the memory. --- cmd/admin-bucket-handlers.go | 1 + cmd/bucket-metadata-sys.go | 37 ++++++++-------- cmd/bucket-metadata.go | 73 +++++++++++++++++++++++++++---- cmd/bucket-metadata_gen.go | 85 +++++++++++++++++++++++++++++++++--- 4 files changed, 166 insertions(+), 30 deletions(-) diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index bf0c4faad..59d1fbb56 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -811,6 +811,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * } bucketMap[bucket].NotificationConfigXML = configData + bucketMap[bucket].NotificationConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) case bucketPolicyConfig: // Error out if Content-Length is beyond allowed size. diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 8abbf3e1f..06972606b 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -124,6 +124,7 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string, meta.PolicyConfigUpdatedAt = updatedAt case bucketNotificationConfig: meta.NotificationConfigXML = configData + meta.NotificationConfigUpdatedAt = updatedAt case bucketLifecycleConfig: meta.LifecycleConfigXML = configData meta.LifecycleConfigUpdatedAt = updatedAt @@ -153,6 +154,8 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string, if err != nil { return updatedAt, fmt.Errorf("Error encrypting bucket target metadata %w", err) } + meta.BucketTargetsConfigUpdatedAt = updatedAt + meta.BucketTargetsConfigMetaUpdatedAt = updatedAt default: return updatedAt, fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile) } @@ -504,7 +507,7 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []string, objAPI } // concurrently load bucket metadata to speed up loading bucket metadata. -func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []string, failedBuckets map[string]struct{}) { +func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []string) { g := errgroup.WithNErrs(len(buckets)) bucketMetas := make([]BucketMetadata, len(buckets)) for index := range buckets { @@ -545,10 +548,6 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []stri for i, meta := range bucketMetas { if errs[i] != nil { - if failedBuckets == nil { - failedBuckets = make(map[string]struct{}) - } - failedBuckets[buckets[i]] = struct{}{} continue } globalEventNotifier.set(buckets[i], meta) // set notification targets @@ -556,7 +555,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []stri } } -func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, failedBuckets map[string]struct{}) { +func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) { const bucketMetadataRefresh = 15 * time.Minute sleeper := newDynamicSleeper(2, 150*time.Millisecond, false) @@ -586,7 +585,10 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa for i := range buckets { wait := sleeper.Timer(ctx) - meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[i].Name) + bucket := buckets[i].Name + updated := false + + meta, err := loadBucketMetadata(ctx, sys.objAPI, bucket) if err != nil { internalLogIf(ctx, err, logger.WarningKind) wait() // wait to proceed to next entry. @@ -594,14 +596,16 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa } sys.Lock() - sys.metadataMap[buckets[i].Name] = meta + // Update if the bucket metadata in the memory is older than on-disk one + if lu := sys.metadataMap[bucket].lastUpdate(); lu.Before(meta.lastUpdate()) { + updated = true + sys.metadataMap[bucket] = meta + } sys.Unlock() - // Initialize the failed buckets - if _, ok := failedBuckets[buckets[i].Name]; ok { - globalEventNotifier.set(buckets[i].Name, meta) - globalBucketTargetSys.set(buckets[i].Name, meta) - delete(failedBuckets, buckets[i].Name) + if updated { + globalEventNotifier.set(bucket, meta) + globalBucketTargetSys.set(bucket, meta) } wait() // wait to proceed to next entry. @@ -622,13 +626,12 @@ func (sys *BucketMetadataSys) Initialized() bool { // Loads bucket metadata for all buckets into BucketMetadataSys. func (sys *BucketMetadataSys) init(ctx context.Context, buckets []string) { count := globalEndpoints.ESCount() * 10 - failedBuckets := make(map[string]struct{}) for { if len(buckets) < count { - sys.concurrentLoad(ctx, buckets, failedBuckets) + sys.concurrentLoad(ctx, buckets) break } - sys.concurrentLoad(ctx, buckets[:count], failedBuckets) + sys.concurrentLoad(ctx, buckets[:count]) buckets = buckets[count:] } @@ -637,7 +640,7 @@ func (sys *BucketMetadataSys) init(ctx context.Context, buckets []string) { sys.Unlock() if globalIsDistErasure { - go sys.refreshBucketsMetadataLoop(ctx, failedBuckets) + go sys.refreshBucketsMetadataLoop(ctx) } } diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 5e04f08fb..d0132c1c9 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -81,14 +81,19 @@ type BucketMetadata struct { ReplicationConfigXML []byte BucketTargetsConfigJSON []byte BucketTargetsConfigMetaJSON []byte - PolicyConfigUpdatedAt time.Time - ObjectLockConfigUpdatedAt time.Time - EncryptionConfigUpdatedAt time.Time - TaggingConfigUpdatedAt time.Time - QuotaConfigUpdatedAt time.Time - ReplicationConfigUpdatedAt time.Time - VersioningConfigUpdatedAt time.Time - LifecycleConfigUpdatedAt time.Time + + PolicyConfigUpdatedAt time.Time + ObjectLockConfigUpdatedAt time.Time + EncryptionConfigUpdatedAt time.Time + TaggingConfigUpdatedAt time.Time + QuotaConfigUpdatedAt time.Time + ReplicationConfigUpdatedAt time.Time + VersioningConfigUpdatedAt time.Time + LifecycleConfigUpdatedAt time.Time + NotificationConfigUpdatedAt time.Time + BucketTargetsConfigUpdatedAt time.Time + BucketTargetsConfigMetaUpdatedAt time.Time + // Add a new UpdatedAt field and update lastUpdate function // Unexported fields. Must be updated atomically. policyConfig *policy.BucketPolicy @@ -120,6 +125,46 @@ func newBucketMetadata(name string) BucketMetadata { } } +// Return the last update of this bucket metadata, which +// means, the last update of any policy document. +func (b BucketMetadata) lastUpdate() (t time.Time) { + if b.PolicyConfigUpdatedAt.After(t) { + t = b.PolicyConfigUpdatedAt + } + if b.ObjectLockConfigUpdatedAt.After(t) { + t = b.ObjectLockConfigUpdatedAt + } + if b.EncryptionConfigUpdatedAt.After(t) { + t = b.EncryptionConfigUpdatedAt + } + if b.TaggingConfigUpdatedAt.After(t) { + t = b.TaggingConfigUpdatedAt + } + if b.QuotaConfigUpdatedAt.After(t) { + t = b.QuotaConfigUpdatedAt + } + if b.ReplicationConfigUpdatedAt.After(t) { + t = b.ReplicationConfigUpdatedAt + } + if b.VersioningConfigUpdatedAt.After(t) { + t = b.VersioningConfigUpdatedAt + } + if b.LifecycleConfigUpdatedAt.After(t) { + t = b.LifecycleConfigUpdatedAt + } + if b.NotificationConfigUpdatedAt.After(t) { + t = b.NotificationConfigUpdatedAt + } + if b.BucketTargetsConfigUpdatedAt.After(t) { + t = b.BucketTargetsConfigUpdatedAt + } + if b.BucketTargetsConfigMetaUpdatedAt.After(t) { + t = b.BucketTargetsConfigMetaUpdatedAt + } + + return +} + // Versioning returns true if versioning is enabled func (b BucketMetadata) Versioning() bool { return b.LockEnabled || (b.versioningConfig != nil && b.versioningConfig.Enabled()) || (b.objectLockConfig != nil && b.objectLockConfig.Enabled()) @@ -440,6 +485,18 @@ func (b *BucketMetadata) defaultTimestamps() { if b.LifecycleConfigUpdatedAt.IsZero() { b.LifecycleConfigUpdatedAt = b.Created } + + if b.NotificationConfigUpdatedAt.IsZero() { + b.NotificationConfigUpdatedAt = b.Created + } + + if b.BucketTargetsConfigUpdatedAt.IsZero() { + b.BucketTargetsConfigUpdatedAt = b.Created + } + + if b.BucketTargetsConfigMetaUpdatedAt.IsZero() { + b.BucketTargetsConfigMetaUpdatedAt = b.Created + } } // Save config to supplied ObjectLayer api. diff --git a/cmd/bucket-metadata_gen.go b/cmd/bucket-metadata_gen.go index 3e86a80e4..133fda76b 100644 --- a/cmd/bucket-metadata_gen.go +++ b/cmd/bucket-metadata_gen.go @@ -156,6 +156,24 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "LifecycleConfigUpdatedAt") return } + case "NotificationConfigUpdatedAt": + z.NotificationConfigUpdatedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "NotificationConfigUpdatedAt") + return + } + case "BucketTargetsConfigUpdatedAt": + z.BucketTargetsConfigUpdatedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt") + return + } + case "BucketTargetsConfigMetaUpdatedAt": + z.BucketTargetsConfigMetaUpdatedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt") + return + } default: err = dc.Skip() if err != nil { @@ -169,9 +187,9 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 22 + // map header, size 25 // write "Name" - err = en.Append(0xde, 0x0, 0x16, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + err = en.Append(0xde, 0x0, 0x19, 0xa4, 0x4e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -390,15 +408,45 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "LifecycleConfigUpdatedAt") return } + // write "NotificationConfigUpdatedAt" + err = en.Append(0xbb, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.NotificationConfigUpdatedAt) + if err != nil { + err = msgp.WrapError(err, "NotificationConfigUpdatedAt") + return + } + // write "BucketTargetsConfigUpdatedAt" + err = en.Append(0xbc, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.BucketTargetsConfigUpdatedAt) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt") + return + } + // write "BucketTargetsConfigMetaUpdatedAt" + err = en.Append(0xd9, 0x20, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4d, 0x65, 0x74, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.BucketTargetsConfigMetaUpdatedAt) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 22 + // map header, size 25 // string "Name" - o = append(o, 0xde, 0x0, 0x16, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + o = append(o, 0xde, 0x0, 0x19, 0xa4, 0x4e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) // string "Created" o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) @@ -463,6 +511,15 @@ func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) { // string "LifecycleConfigUpdatedAt" o = append(o, 0xb8, 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) o = msgp.AppendTime(o, z.LifecycleConfigUpdatedAt) + // string "NotificationConfigUpdatedAt" + o = append(o, 0xbb, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + o = msgp.AppendTime(o, z.NotificationConfigUpdatedAt) + // string "BucketTargetsConfigUpdatedAt" + o = append(o, 0xbc, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + o = msgp.AppendTime(o, z.BucketTargetsConfigUpdatedAt) + // string "BucketTargetsConfigMetaUpdatedAt" + o = append(o, 0xd9, 0x20, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4d, 0x65, 0x74, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + o = msgp.AppendTime(o, z.BucketTargetsConfigMetaUpdatedAt) return } @@ -616,6 +673,24 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "LifecycleConfigUpdatedAt") return } + case "NotificationConfigUpdatedAt": + z.NotificationConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "NotificationConfigUpdatedAt") + return + } + case "BucketTargetsConfigUpdatedAt": + z.BucketTargetsConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt") + return + } + case "BucketTargetsConfigMetaUpdatedAt": + z.BucketTargetsConfigMetaUpdatedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -630,6 +705,6 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BucketMetadata) Msgsize() (s int) { - s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize + 25 + msgp.TimeSize + s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize + 25 + msgp.TimeSize + 28 + msgp.TimeSize + 29 + msgp.TimeSize + 34 + msgp.TimeSize return }