From f1bbb7fef5a724b618e7726ee508d165e5fc345f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 3 Jan 2023 08:16:39 -0800 Subject: [PATCH] vectorize cluster-wide calls such as bucket operations (#16313) --- cmd/admin-handlers-site-replication.go | 4 +- cmd/admin-handlers.go | 1 - cmd/bucket-handlers.go | 3 +- cmd/bucket-metadata-sys.go | 8 +- cmd/bucket-metadata.go | 2 +- cmd/erasure-bucket.go | 164 --------------- cmd/erasure-object_test.go | 34 ++- cmd/erasure-server-pool.go | 197 +++++++----------- cmd/erasure-sets.go | 77 ------- cmd/erasure.go | 4 - cmd/object-api-interface.go | 4 +- cmd/peer-s3-client.go | 278 +++++++++++++++++++++++++ cmd/peer-s3-server.go | 256 +++++++++++++++++++++++ cmd/routers.go | 3 + cmd/server_test.go | 4 +- 15 files changed, 639 insertions(+), 400 deletions(-) create mode 100644 cmd/peer-s3-client.go create mode 100644 cmd/peer-s3-server.go diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 1864f5ae9..2c89a185f 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -114,8 +114,7 @@ func (a adminAPIHandlers) SRPeerBucketOps(w http.ResponseWriter, r *http.Request default: err = errSRInvalidRequest(errInvalidArgument) case madmin.MakeWithVersioningBktOp: - createdAtStr := strings.TrimSpace(r.Form.Get("createdAt")) - createdAt, cerr := time.Parse(time.RFC3339Nano, createdAtStr) + createdAt, cerr := time.Parse(time.RFC3339Nano, strings.TrimSpace(r.Form.Get("createdAt"))) if cerr != nil { createdAt = timeSentinel } @@ -132,7 +131,6 @@ func (a adminAPIHandlers) SRPeerBucketOps(w http.ResponseWriter, r *http.Request case madmin.DeleteBucketBktOp, madmin.ForceDeleteBucketBktOp: err = globalSiteReplicationSys.PeerBucketDeleteHandler(ctx, bucket, DeleteBucketOptions{ Force: operation == madmin.ForceDeleteBucketBktOp, - NoRecreate: r.Form.Get("noRecreate") == "true", SRDeleteOp: getSRBucketDeleteOp(true), }) case madmin.PurgeDeletedBucketOp: diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 23c23a993..1adc651d6 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1332,7 +1332,6 @@ func makeObjectPerfBucket(ctx context.Context, objectAPI ObjectLayer, bucketName func deleteObjectPerfBucket(objectAPI ObjectLayer) { objectAPI.DeleteBucket(context.Background(), globalObjectPerfBucket, DeleteBucketOptions{ Force: true, - NoRecreate: true, SRDeleteOp: getSRBucketDeleteOp(globalSiteReplicationSys.isEnabled()), }) } diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 4454fa0c1..c29e0f2f0 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -791,8 +791,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req if err = globalDNSConfig.Put(bucket); err != nil { objectAPI.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{ - Force: false, - NoRecreate: true, + Force: true, SRDeleteOp: getSRBucketDeleteOp(globalSiteReplicationSys.isEnabled()), }) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 70c45eb79..c058dff6d 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -66,7 +66,7 @@ func (sys *BucketMetadataSys) Remove(bucket string) { // so they should be replaced atomically and not appended to, etc. // Data is not persisted to disk. func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) { - if bucket != minioMetaBucket { + if !isMinioMetaBucketName(bucket) { sys.Lock() sys.metadataMap[bucket] = meta sys.Unlock() @@ -79,7 +79,7 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string, return updatedAt, errServerNotInitialized } - if bucket == minioMetaBucket { + if isMinioMetaBucketName(bucket) { return updatedAt, errInvalidArgument } @@ -164,7 +164,7 @@ func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configF // For all other bucket specific metadata, use the relevant // calls implemented specifically for each of those features. func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) { - if bucket == minioMetaBucket { + if isMinioMetaBucketName(bucket) { return newBucketMetadata(bucket), errConfigNotFound } @@ -345,7 +345,7 @@ func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (Buc return newBucketMetadata(bucket), errServerNotInitialized } - if bucket == minioMetaBucket { + if isMinioMetaBucketName(bucket) { return newBucketMetadata(bucket), errInvalidArgument } diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 04e285bf6..29a323c7f 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -135,7 +135,7 @@ func (b *BucketMetadata) SetCreatedAt(createdAt time.Time) { func (b *BucketMetadata) Load(ctx context.Context, api ObjectLayer, name string) error { if name == "" { logger.LogIf(ctx, errors.New("bucket name cannot be empty")) - return errors.New("bucket name cannot be empty") + return errInvalidArgument } configFile := path.Join(bucketMetaPrefix, name, bucketMetadataFile) data, err := readConfig(ctx, api, configFile) diff --git a/cmd/erasure-bucket.go b/cmd/erasure-bucket.go index afe26fb83..5eafd169b 100644 --- a/cmd/erasure-bucket.go +++ b/cmd/erasure-bucket.go @@ -21,7 +21,6 @@ import ( "context" "errors" - "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/sync/errgroup" ) @@ -31,169 +30,6 @@ var bucketOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnform // list all errors that can be ignored in a bucket metadata operation. var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound) -// Bucket operations - -// MakeBucket - make a bucket. -func (er erasureObjects) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { - storageDisks := er.getDisks() - - g := errgroup.WithNErrs(len(storageDisks)) - - // Make a volume entry on all underlying storage disks. - for index := range storageDisks { - index := index - g.Go(func() error { - if storageDisks[index] != nil { - if err := storageDisks[index].MakeVol(ctx, bucket); err != nil { - if opts.ForceCreate && errors.Is(err, errVolumeExists) { - // No need to return error when force create was - // requested. - return nil - } - if !errors.Is(err, errVolumeExists) { - logger.LogIf(ctx, err) - } - return err - } - return nil - } - return errDiskNotFound - }, index) - } - - err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, er.defaultWQuorum()) - return toObjectErr(err, bucket) -} - -func undoDeleteBucket(storageDisks []StorageAPI, bucket string) { - g := errgroup.WithNErrs(len(storageDisks)) - // Undo previous make bucket entry on all underlying storage disks. - for index := range storageDisks { - if storageDisks[index] == nil { - continue - } - index := index - g.Go(func() error { - _ = storageDisks[index].MakeVol(context.Background(), bucket) - return nil - }, index) - } - - // Wait for all make vol to finish. - g.Wait() -} - -// getBucketInfo - returns the BucketInfo from one of the load balanced disks. -func (er erasureObjects) getBucketInfo(ctx context.Context, bucketName string, opts BucketOptions) (bucketInfo BucketInfo, err error) { - storageDisks := er.getDisks() - - g := errgroup.WithNErrs(len(storageDisks)) - bucketsInfo := make([]BucketInfo, len(storageDisks)) - // Undo previous make bucket entry on all underlying storage disks. - for index := range storageDisks { - index := index - g.Go(func() error { - if storageDisks[index] == nil { - return errDiskNotFound - } - volInfo, err := storageDisks[index].StatVol(ctx, bucketName) - if err != nil { - if opts.Deleted { - dvi, derr := storageDisks[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucketName)) - if derr != nil { - return err - } - bucketsInfo[index] = BucketInfo{Name: bucketName, Deleted: dvi.Created} - return nil - } - return err - } - bucketsInfo[index] = BucketInfo{Name: volInfo.Name, Created: volInfo.Created} - return nil - }, index) - } - - errs := g.Wait() - - for i, err := range errs { - if err == nil { - return bucketsInfo[i], nil - } - } - - // If all our errors were ignored, then we try to - // reduce to one error based on read quorum. - // `nil` is deliberately passed for ignoredErrs - // because these errors were already ignored. - return BucketInfo{}, reduceReadQuorumErrs(ctx, errs, nil, er.defaultRQuorum()) -} - -// GetBucketInfo - returns BucketInfo for a bucket. -func (er erasureObjects) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bi BucketInfo, e error) { - bucketInfo, err := er.getBucketInfo(ctx, bucket, opts) - if err != nil { - return bi, toObjectErr(err, bucket) - } - return bucketInfo, nil -} - -// DeleteBucket - deletes a bucket. -func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { - storageDisks := er.getDisks() - - g := errgroup.WithNErrs(len(storageDisks)) - - for index := range storageDisks { - index := index - g.Go(func() error { - if storageDisks[index] != nil { - return storageDisks[index].DeleteVol(ctx, bucket, opts.Force) - } - return errDiskNotFound - }, index) - } - - // Wait for all the delete vols to finish. - dErrs := g.Wait() - - if opts.Force { - for _, err := range dErrs { - if err != nil { - undoDeleteBucket(storageDisks, bucket) - return toObjectErr(err, bucket) - } - } - - return nil - } - - err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, er.defaultWQuorum()) - if err == errErasureWriteQuorum && !opts.NoRecreate { - undoDeleteBucket(storageDisks, bucket) - } - - if err == nil || errors.Is(err, errVolumeNotFound) { - var purgedDangling bool - // At this point we have `err == nil` but some errors might be `errVolumeNotEmpty` - // we should proceed to attempt a force delete of such buckets. - for index, err := range dErrs { - if err == errVolumeNotEmpty && storageDisks[index] != nil { - storageDisks[index].RenameFile(ctx, bucket, "", minioMetaTmpDeletedBucket, mustGetUUID()) - purgedDangling = true - } - } - // if we purged dangling buckets, ignore errVolumeNotFound error. - if purgedDangling { - err = nil - } - if opts.SRDeleteOp == MarkDelete { - er.markDelete(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket)) - } - } - - return toObjectErr(err, bucket) -} - // markDelete creates a vol entry in .minio.sys/buckets/.deleted until site replication // syncs the delete to peers func (er erasureObjects) markDelete(ctx context.Context, bucket, prefix string) error { diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index 65d71d742..2f8f65f5a 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -214,22 +214,19 @@ func TestDeleteObjectsVersioned(t *testing.T) { func TestErasureDeleteObjectsErasureSet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var objs []*erasureObjects - for i := 0; i < 32; i++ { - obj, fsDirs, err := prepareErasure(ctx, 16) - if err != nil { - t.Fatal("Unable to initialize 'Erasure' object layer.", err) - } - // Remove all dirs. - for _, dir := range fsDirs { - defer os.RemoveAll(dir) - } - z := obj.(*erasureServerPools) - xl := z.serverPools[0].sets[0] - objs = append(objs, xl) + + obj, fsDirs, err := prepareErasureSets32(ctx) + if err != nil { + t.Fatal("Unable to initialize 'Erasure' object layer.", err) } - erasureSets := &erasureSets{sets: objs, distributionAlgo: "CRCMOD"} + setObjectLayer(obj) + initConfigSubsystem(ctx, obj) + + // Remove all dirs. + for _, dir := range fsDirs { + defer os.RemoveAll(dir) + } type testCaseType struct { bucket string @@ -244,13 +241,12 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) { {bucketName, "obj_4"}, } - err := erasureSets.MakeBucket(ctx, bucketName, MakeBucketOptions{}) - if err != nil { + if err = obj.MakeBucket(ctx, bucketName, MakeBucketOptions{}); err != nil { t.Fatal(err) } for _, testCase := range testCases { - _, err = erasureSets.PutObject(ctx, testCase.bucket, testCase.object, + _, err = obj.PutObject(ctx, testCase.bucket, testCase.object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{}) if err != nil { t.Fatalf("Erasure Object upload failed: %s", err) @@ -270,7 +266,7 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) { } objectNames := toObjectNames(testCases) - _, delErrs := erasureSets.DeleteObjects(ctx, bucketName, objectNames, ObjectOptions{}) + _, delErrs := obj.DeleteObjects(ctx, bucketName, objectNames, ObjectOptions{}) for i := range delErrs { if delErrs[i] != nil { @@ -279,7 +275,7 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) { } for _, test := range testCases { - _, statErr := erasureSets.GetObjectInfo(ctx, test.bucket, test.object, ObjectOptions{}) + _, statErr := obj.GetObjectInfo(ctx, test.bucket, test.object, ObjectOptions{}) switch statErr.(type) { case ObjectNotFound: default: diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index ffad41ee4..6f395d771 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -61,6 +61,8 @@ type erasureServerPools struct { // Active decommission canceler decommissionCancelers []context.CancelFunc + + s3Peer *S3PeerSys } func (z *erasureServerPools) SinglePool() bool { @@ -79,6 +81,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ storageDisks = make([][]StorageAPI, len(endpointServerPools)) z = &erasureServerPools{ serverPools: make([]*erasureSets, len(endpointServerPools)), + s3Peer: NewS3PeerSys(endpointServerPools), } ) @@ -710,49 +713,34 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { defer NSUpdated(bucket, slashSeparator) - g := errgroup.WithNErrs(len(z.serverPools)) - + // Verify if bucket is valid. if !isMinioMetaBucketName(bucket) { - // Verify if bucket is valid. if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil { return BucketNameInvalid{Bucket: bucket} } - // Lock the bucket name before creating. - lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck") - lkctx, err := lk.GetLock(ctx, globalOperationTimeout) - if err != nil { - return err - } + if !opts.NoLock { + // Lock the bucket name before creating. + lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck") + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return err + } - ctx = lkctx.Context() - defer lk.Unlock(lkctx) + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + } } - // Create buckets in parallel across all sets. - for index := range z.serverPools { - index := index - g.Go(func() error { - if z.IsSuspended(index) { - return nil - } - return z.serverPools[index].MakeBucket(ctx, bucket, opts) - }, index) - } - - errs := g.Wait() - // Return the first encountered error - for _, err := range errs { - if err != nil { - if _, ok := err.(BucketExists); !ok { - // Delete created buckets, ignoring errors. - z.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{ - Force: false, - NoRecreate: true, - }) - } - return err + if err := z.s3Peer.MakeBucket(ctx, bucket, opts); err != nil { + if _, ok := err.(BucketExists); !ok { + // Delete created buckets, ignoring errors. + z.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{ + NoLock: true, + NoRecreate: true, + }) } + return err } // If it doesn't exist we get a new, so ignore errors @@ -1609,38 +1597,18 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket // GetBucketInfo - returns bucket info from one of the erasure coded serverPools. func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bucketInfo BucketInfo, err error) { - if z.SinglePool() { - bucketInfo, err = z.serverPools[0].GetBucketInfo(ctx, bucket, opts) - if err != nil { - return bucketInfo, err - } - meta, err := globalBucketMetadataSys.Get(bucket) - if err == nil { - bucketInfo.Created = meta.Created - bucketInfo.Versioning = meta.LockEnabled || globalBucketVersioningSys.Enabled(bucket) - bucketInfo.ObjectLocking = meta.LockEnabled - } - return bucketInfo, nil + bucketInfo, err = z.s3Peer.GetBucketInfo(ctx, bucket, opts) + if err != nil { + return bucketInfo, toObjectErr(err, bucket) } - for _, pool := range z.serverPools { - bucketInfo, err = pool.GetBucketInfo(ctx, bucket, opts) - if err != nil { - if isErrBucketNotFound(err) { - continue - } - return bucketInfo, err - } - meta, err := globalBucketMetadataSys.Get(bucket) - if err == nil { - bucketInfo.Created = meta.Created - bucketInfo.Versioning = meta.LockEnabled || globalBucketVersioningSys.Enabled(bucket) - bucketInfo.ObjectLocking = meta.LockEnabled - } - return bucketInfo, nil - } - return bucketInfo, BucketNotFound{ - Bucket: bucket, + + meta, err := globalBucketMetadataSys.Get(bucket) + if err == nil { + bucketInfo.Created = meta.Created + bucketInfo.Versioning = meta.LockEnabled || globalBucketVersioningSys.Enabled(bucket) + bucketInfo.ObjectLocking = meta.LockEnabled } + return bucketInfo, nil } // IsNotificationSupported returns whether bucket notification is applicable for this layer. @@ -1671,43 +1639,48 @@ func (z *erasureServerPools) IsTaggingSupported() bool { // even if one of the serverPools fail to delete buckets, we proceed to // undo a successful operation. func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { - defer NSUpdated(bucket, slashSeparator) - - g := errgroup.WithNErrs(len(z.serverPools)) - - // Delete buckets in parallel across all serverPools. - for index := range z.serverPools { - index := index - g.Go(func() error { - if z.IsSuspended(index) { - return nil - } - return z.serverPools[index].DeleteBucket(ctx, bucket, opts) - }, index) + if isMinioMetaBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} } - errs := g.Wait() + // Verify if bucket is valid. + if err := s3utils.CheckValidBucketName(bucket); err != nil { + return BucketNameInvalid{Bucket: bucket} + } - // For any write quorum failure, we undo all the delete - // buckets operation by creating all the buckets again. - for _, err := range errs { + defer NSUpdated(bucket, slashSeparator) + if !opts.NoLock { + // Lock the bucket name before creating. + lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck") + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { - if !z.SinglePool() && !opts.NoRecreate { - undoDeleteBucketServerPools(context.Background(), bucket, z.serverPools, errs) - } return err } + ctx = lkctx.Context() + defer lk.Unlock(lkctx) + } + + err := z.s3Peer.DeleteBucket(ctx, bucket, opts) + if err == nil || errors.Is(err, errVolumeNotFound) { + // If site replication is configured, hold on to deleted bucket state until sites sync + switch opts.SRDeleteOp { + case MarkDelete: + z.markDelete(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket)) + } } - // Purge the entire bucket metadata entirely. - z.deleteAll(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, bucket)) - // If site replication is configured, hold on to deleted bucket state until sites sync - switch opts.SRDeleteOp { - case MarkDelete: - z.markDelete(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket)) + if err != nil && !errors.Is(err, errVolumeNotFound) { + if !opts.NoRecreate { + z.s3Peer.MakeBucket(ctx, bucket, MakeBucketOptions{}) + } } - // Success. - return nil + + if err == nil { + // Purge the entire bucket metadata entirely. + z.deleteAll(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, bucket)) + } + + return toObjectErr(err, bucket) } // deleteAll will rename bucket+prefix unconditionally across all disks to @@ -1744,42 +1717,20 @@ func (z *erasureServerPools) purgeDelete(ctx context.Context, bucket, prefix str } } -// This function is used to undo a successful DeleteBucket operation. -func undoDeleteBucketServerPools(ctx context.Context, bucket string, serverPools []*erasureSets, errs []error) { - g := errgroup.WithNErrs(len(serverPools)) - - // Undo previous delete bucket on all underlying serverPools. - for index := range serverPools { - index := index - g.Go(func() error { - if errs[index] == nil { - return serverPools[index].MakeBucket(ctx, bucket, MakeBucketOptions{}) - } - return nil - }, index) - } - - g.Wait() -} - // List all buckets from one of the serverPools, we are not doing merge // sort here just for simplification. As per design it is assumed // that all buckets are present on all serverPools. func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) { - if z.SinglePool() { - buckets, err = z.serverPools[0].ListBuckets(ctx, opts) - } else { - for idx, pool := range z.serverPools { - if z.IsSuspended(idx) { - continue - } - buckets, err = pool.ListBuckets(ctx, opts) - if err != nil { - logger.LogIf(ctx, err) - continue - } - break + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue } + buckets, err = pool.ListBuckets(ctx, opts) + if err != nil { + logger.LogIf(ctx, err) + continue + } + break } if err != nil { return nil, err diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 9251e03c5..fd0bb0601 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -677,32 +677,6 @@ func (s *erasureSets) Shutdown(ctx context.Context) error { return nil } -// MakeBucketLocation - creates a new bucket across all sets simultaneously, -// then return the first encountered error -func (s *erasureSets) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { - g := errgroup.WithNErrs(len(s.sets)) - - // Create buckets in parallel across all sets. - for index := range s.sets { - index := index - g.Go(func() error { - return s.sets[index].MakeBucket(ctx, bucket, opts) - }, index) - } - - errs := g.Wait() - - // Return the first encountered error - for _, err := range errs { - if err != nil { - return err - } - } - - // Success. - return nil -} - // hashes the key returning an integer based on the input algorithm. // This function currently supports // - CRCMOD @@ -749,11 +723,6 @@ func (s *erasureSets) getHashedSet(input string) (set *erasureObjects) { return s.sets[s.getHashedSetIndex(input)] } -// GetBucketInfo - returns bucket info from one of the erasure coded set. -func (s *erasureSets) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bucketInfo BucketInfo, err error) { - return s.getHashedSet("").GetBucketInfo(ctx, bucket, opts) -} - // IsNotificationSupported returns whether bucket notification is applicable for this layer. func (s *erasureSets) IsNotificationSupported() bool { return s.getHashedSet("").IsNotificationSupported() @@ -778,52 +747,6 @@ func (s *erasureSets) IsTaggingSupported() bool { return true } -// DeleteBucket - deletes a bucket on all sets simultaneously, -// even if one of the sets fail to delete buckets, we proceed to -// undo a successful operation. -func (s *erasureSets) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { - g := errgroup.WithNErrs(len(s.sets)) - - // Delete buckets in parallel across all sets. - for index := range s.sets { - index := index - g.Go(func() error { - return s.sets[index].DeleteBucket(ctx, bucket, opts) - }, index) - } - - errs := g.Wait() - // For any failure, we attempt undo all the delete buckets operation - // by creating buckets again on all sets which were successfully deleted. - for _, err := range errs { - if err != nil && !opts.NoRecreate { - undoDeleteBucketSets(ctx, bucket, s.sets, errs) - return err - } - } - - // Success. - return nil -} - -// This function is used to undo a successful DeleteBucket operation. -func undoDeleteBucketSets(ctx context.Context, bucket string, sets []*erasureObjects, errs []error) { - g := errgroup.WithNErrs(len(sets)) - - // Undo previous delete bucket on all underlying sets. - for index := range sets { - index := index - g.Go(func() error { - if errs[index] == nil { - return sets[index].MakeBucket(ctx, bucket, MakeBucketOptions{}) - } - return nil - }, index) - } - - g.Wait() -} - // List all buckets from one of the set, we are not doing merge // sort here just for simplification. As per design it is assumed // that all buckets are present on all sets. diff --git a/cmd/erasure.go b/cmd/erasure.go index 1cf7f01a9..f9b115b85 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -89,10 +89,6 @@ func (er erasureObjects) defaultWQuorum() int { return dataCount } -func (er erasureObjects) defaultRQuorum() int { - return er.setDriveCount - er.defaultParityCount -} - // byDiskTotal is a collection satisfying sort.Interface. type byDiskTotal []madmin.Disk diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 91ed9672a..62b6f0c4e 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -121,12 +121,14 @@ type MakeBucketOptions struct { VersioningEnabled bool ForceCreate bool // Create buckets even if they are already created. CreatedAt time.Time // only for site replication + NoLock bool // does not lock the make bucket call if set to 'true' } // DeleteBucketOptions provides options for DeleteBucket calls. type DeleteBucketOptions struct { + NoLock bool // does not lock the delete bucket call if set to 'true' + NoRecreate bool // do not recreate bucket on delete failures Force bool // Force deletion - NoRecreate bool // Do not recreate on delete failures SRDeleteOp SRBucketDeleteOp // only when site replication is enabled } diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go new file mode 100644 index 000000000..4cad20291 --- /dev/null +++ b/cmd/peer-s3-client.go @@ -0,0 +1,278 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "encoding/gob" + "errors" + "fmt" + "io" + "net/url" + "strconv" + + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/rest" + "github.com/minio/minio/internal/sync/errgroup" + xnet "github.com/minio/pkg/net" +) + +var errPeerOffline = errors.New("peer is offline") + +// client to talk to peer Nodes. +type peerS3Client struct { + host *xnet.Host + restClient *rest.Client +} + +// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected +// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() +// after verifying format.json +func (client *peerS3Client) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { + return client.callWithContext(GlobalContext, method, values, body, length) +} + +// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected +// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() +// after verifying format.json +func (client *peerS3Client) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { + if values == nil { + values = make(url.Values) + } + + respBody, err = client.restClient.Call(ctx, method, values, body, length) + if err == nil { + return respBody, nil + } + + err = toStorageErr(err) + return nil, err +} + +// S3PeerSys - S3 peer call system. +type S3PeerSys struct { + peerClients []*peerS3Client // Excludes self + allPeerClients []*peerS3Client // Includes nil client for self +} + +// NewS3PeerSys - creates new S3 peer calls. +func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys { + remote, all := newPeerS3Clients(endpoints) + return &S3PeerSys{ + peerClients: remote, + allPeerClients: all, + } +} + +// GetBucketInfo returns bucket stat info about bucket on disk across all peers +func (sys *S3PeerSys) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (binfo BucketInfo, err error) { + g := errgroup.WithNErrs(len(sys.peerClients)) + + bucketInfos := make([]BucketInfo, len(sys.peerClients)+1) + + bucketInfo, err := getBucketInfoLocal(ctx, bucket, opts) + if err != nil { + return BucketInfo{}, err + } + + errs := []error{nil} + bucketInfos[0] = bucketInfo + + for idx, client := range sys.peerClients { + idx := idx + client := client + g.Go(func() error { + if client == nil { + return errPeerOffline + } + bucketInfo, err := client.GetBucketInfo(ctx, bucket, opts) + if err != nil { + return err + } + bucketInfos[idx] = bucketInfo + return nil + }, idx) + } + + errs = append(errs, g.Wait()...) + + quorum := (len(sys.allPeerClients) / 2) + if err = reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum); err != nil { + return BucketInfo{}, err + } + + for i, err := range errs { + if err == nil { + bucketInfo = bucketInfos[i] + break + } + } + + return bucketInfo, nil +} + +// GetBucketInfo returns bucket stat info from a peer +func (client *peerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { + v := url.Values{} + v.Set(peerS3Bucket, bucket) + v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Deleted)) + + respBody, err := client.call(peerS3MethodGetBucketInfo, v, nil, -1) + if err != nil { + return BucketInfo{}, err + } + defer xhttp.DrainBody(respBody) + + var bucketInfo BucketInfo + err = gob.NewDecoder(respBody).Decode(&bucketInfo) + return bucketInfo, err +} + +// MakeBucket creates bucket across all peers +func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { + g := errgroup.WithNErrs(len(sys.peerClients)) + + for idx, client := range sys.peerClients { + client := client + g.Go(func() error { + if client == nil { + return errPeerOffline + } + return client.MakeBucket(ctx, bucket, opts) + }, idx) + } + + errs := g.Wait() + errs = append(errs, makeBucketLocal(ctx, bucket, opts)) + + quorum := (len(sys.allPeerClients) / 2) + 1 + err := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, quorum) + return toObjectErr(err, bucket) +} + +// MakeBucket creates a bucket on a peer +func (client *peerS3Client) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error { + v := url.Values{} + v.Set(peerS3Bucket, bucket) + v.Set(peerS3BucketForceCreate, strconv.FormatBool(opts.ForceCreate)) + + respBody, err := client.call(peerS3MethodMakeBucket, v, nil, -1) + if err != nil { + return err + } + defer xhttp.DrainBody(respBody) + + return nil +} + +// DeleteBucket deletes bucket across all peers +func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { + g := errgroup.WithNErrs(len(sys.peerClients)) + + for idx, client := range sys.peerClients { + client := client + g.Go(func() error { + if client == nil { + return errPeerOffline + } + return client.DeleteBucket(ctx, bucket, opts) + }, idx) + } + + errs := g.Wait() + errs = append(errs, deleteBucketLocal(ctx, bucket, opts)) + + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +// DeleteBucket deletes bucket on a peer +func (client *peerS3Client) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { + v := url.Values{} + v.Set(peerS3Bucket, bucket) + v.Set(peerS3BucketForceDelete, strconv.FormatBool(opts.Force)) + + respBody, err := client.call(peerS3MethodDeleteBucket, v, nil, -1) + if err != nil { + return err + } + defer xhttp.DrainBody(respBody) + + return nil +} + +// newPeerS3Clients creates new peer clients. +// The two slices will point to the same clients, +// but 'all' will contain nil entry for local client. +// The 'all' slice will be in the same order across the cluster. +func newPeerS3Clients(endpoints EndpointServerPools) (remote, all []*peerS3Client) { + if !globalIsDistErasure { + // Only useful in distributed setups + return nil, nil + } + hosts := endpoints.hostsSorted() + remote = make([]*peerS3Client, 0, len(hosts)) + all = make([]*peerS3Client, len(hosts)) + for i, host := range hosts { + if host == nil { + continue + } + all[i] = newPeerS3Client(host) + remote = append(remote, all[i]) + } + if len(all) != len(remote)+1 { + logger.LogIf(context.Background(), fmt.Errorf("WARNING: Expected number of all hosts (%v) to be remote +1 (%v)", len(all), len(remote))) + } + return remote, all +} + +// Returns a peer S3 client. +func newPeerS3Client(peer *xnet.Host) *peerS3Client { + scheme := "http" + if globalIsTLS { + scheme = "https" + } + + serverURL := &url.URL{ + Scheme: scheme, + Host: peer.String(), + Path: peerS3Path, + } + + restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) + // Use a separate client to avoid recursive calls. + healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) + healthClient.ExpectTimeouts = true + healthClient.NoMetrics = true + + // Construct a new health function. + restClient.HealthCheckFn = func() bool { + ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout) + defer cancel() + respBody, err := healthClient.Call(ctx, peerS3MethodHealth, nil, nil, -1) + xhttp.DrainBody(respBody) + return !isNetworkError(err) + } + + return &peerS3Client{host: peer, restClient: restClient} +} diff --git a/cmd/peer-s3-server.go b/cmd/peer-s3-server.go new file mode 100644 index 000000000..e9a7818e1 --- /dev/null +++ b/cmd/peer-s3-server.go @@ -0,0 +1,256 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "encoding/gob" + "errors" + "net/http" + + "github.com/gorilla/mux" + "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/sync/errgroup" +) + +const ( + peerS3Version = "v1" // First implementation + + peerS3VersionPrefix = SlashSeparator + peerS3Version + peerS3Prefix = minioReservedBucketPath + "/peer" + peerS3Path = peerS3Prefix + peerS3VersionPrefix +) + +const ( + peerS3MethodHealth = "/health" + peerS3MethodMakeBucket = "/make-bucket" + peerS3MethodGetBucketInfo = "/get-bucket-info" + peerS3MethodDeleteBucket = "/delete-bucket" +) + +const ( + peerS3Bucket = "bucket" + peerS3BucketDeleted = "bucket-deleted" + peerS3BucketForceCreate = "force-create" + peerS3BucketForceDelete = "force-delete" +) + +type peerS3Server struct{} + +func (s *peerS3Server) writeErrorResponse(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusForbidden) + w.Write([]byte(err.Error())) +} + +// IsValid - To authenticate and verify the time difference. +func (s *peerS3Server) IsValid(w http.ResponseWriter, r *http.Request) bool { + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return false + } + + if err := storageServerRequestValidate(r); err != nil { + s.writeErrorResponse(w, err) + return false + } + return true +} + +// HealthHandler - returns true of health +func (s *peerS3Server) HealthHandler(w http.ResponseWriter, r *http.Request) { + s.IsValid(w, r) +} + +func getBucketInfoLocal(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { + g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32) + bucketsInfo := make([]BucketInfo, len(globalLocalDrives)) + + // Make a volume entry on all underlying storage disks. + for index := range globalLocalDrives { + index := index + g.Go(func() error { + if globalLocalDrives[index] == nil { + return errDiskNotFound + } + volInfo, err := globalLocalDrives[index].StatVol(ctx, bucket) + if err != nil { + if opts.Deleted { + dvi, derr := globalLocalDrives[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket)) + if derr != nil { + return err + } + bucketsInfo[index] = BucketInfo{Name: bucket, Deleted: dvi.Created} + return nil + } + return err + } + + bucketsInfo[index] = BucketInfo{Name: bucket, Created: volInfo.Created} + return nil + }, index) + } + + errs := g.Wait() + if err := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives) / 2)); err != nil { + return BucketInfo{}, err + } + + var bucketInfo BucketInfo + for i, err := range errs { + if err == nil { + bucketInfo = bucketsInfo[i] + break + } + } + + return bucketInfo, nil +} + +func deleteBucketLocal(ctx context.Context, bucket string, opts DeleteBucketOptions) error { + g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32) + + // Make a volume entry on all underlying storage disks. + for index := range globalLocalDrives { + index := index + g.Go(func() error { + if globalLocalDrives[index] == nil { + return errDiskNotFound + } + return globalLocalDrives[index].DeleteVol(ctx, bucket, opts.Force) + }, index) + } + + var recreate bool + errs := g.Wait() + for index, err := range errs { + if errors.Is(err, errVolumeNotEmpty) { + recreate = true + } + if err == nil && recreate { + // ignore any errors + globalLocalDrives[index].MakeVol(ctx, bucket) + } + } + + for _, err := range errs { + if err != nil { + return err + } + } + + return nil +} + +func makeBucketLocal(ctx context.Context, bucket string, opts MakeBucketOptions) error { + g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32) + + // Make a volume entry on all underlying storage disks. + for index := range globalLocalDrives { + index := index + g.Go(func() error { + if globalLocalDrives[index] == nil { + return errDiskNotFound + } + err := globalLocalDrives[index].MakeVol(ctx, bucket) + if opts.ForceCreate && errors.Is(err, errVolumeExists) { + // No need to return error when force create was + // requested. + return nil + } + if err != nil && !errors.Is(err, errVolumeExists) { + logger.LogIf(ctx, err) + } + return err + }, index) + } + + errs := g.Wait() + return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives)/2)+1) +} + +// GetBucketInfoHandler implements peer BuckeInfo call, returns bucket create date. +func (s *peerS3Server) GetBucketInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + + bucket := r.Form.Get(peerS3Bucket) + bucketDeleted := r.Form.Get(peerS3BucketDeleted) == "true" + bucketInfo, err := getBucketInfoLocal(r.Context(), bucket, BucketOptions{ + Deleted: bucketDeleted, + }) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(bucketInfo)) +} + +// DeleteBucketHandler implements peer delete bucket call. +func (s *peerS3Server) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + + bucket := r.Form.Get(peerS3Bucket) + if isMinioMetaBucket(bucket) { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + forceDelete := r.Form.Get(peerS3BucketForceDelete) == "true" + + err := deleteBucketLocal(r.Context(), bucket, DeleteBucketOptions{ + Force: forceDelete, + }) + if err != nil { + s.writeErrorResponse(w, err) + return + } +} + +// MakeBucketHandler implements peer create bucket call. +func (s *peerS3Server) MakeBucketHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + + bucket := r.Form.Get(peerS3Bucket) + forceCreate := r.Form.Get(peerS3BucketForceCreate) == "true" + + err := makeBucketLocal(r.Context(), bucket, MakeBucketOptions{ + ForceCreate: forceCreate, + }) + if err != nil { + s.writeErrorResponse(w, err) + return + } +} + +// registerPeerS3Handlers - register peer s3 router. +func registerPeerS3Handlers(router *mux.Router) { + server := &peerS3Server{} + subrouter := router.PathPrefix(peerS3Prefix).Subrouter() + + subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodHealth).HandlerFunc(httpTraceHdrs(server.HealthHandler)) + subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodMakeBucket).HandlerFunc(httpTraceHdrs(server.MakeBucketHandler)) + subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodDeleteBucket).HandlerFunc(httpTraceHdrs(server.DeleteBucketHandler)) + subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodGetBucketInfo).HandlerFunc(httpTraceHdrs(server.GetBucketInfoHandler)) +} diff --git a/cmd/routers.go b/cmd/routers.go index 3d59afc43..2e047bbf5 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -31,6 +31,9 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint // Register peer REST router only if its a distributed setup. registerPeerRESTHandlers(router) + // Register peer S3 router only if its a distributed setup. + registerPeerS3Handlers(router) + // Register bootstrap REST router for distributed setups. registerBootstrapRESTHandlers(router) diff --git a/cmd/server_test.go b/cmd/server_test.go index 010024940..418823a86 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -59,12 +59,14 @@ type check struct { // Assert - checks if gotValue is same as expectedValue, if not fails the test. func (c *check) Assert(gotValue interface{}, expectedValue interface{}) { + c.Helper() if !reflect.DeepEqual(gotValue, expectedValue) { - c.Fatalf("Test %s:%s expected %v, got %v", getSource(2), c.testType, expectedValue, gotValue) + c.Fatalf("Test %s expected %v, got %v", c.testType, expectedValue, gotValue) } } func verifyError(c *check, response *http.Response, code, description string, statusCode int) { + c.Helper() data, err := io.ReadAll(response.Body) c.Assert(err, nil) errorResponse := APIErrorResponse{}