From 8c9ab85cfa00d9f1e85d245445c7012b34449915 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 9 Sep 2024 09:58:30 -0700 Subject: [PATCH] Add multipart uploads cache for ListMultipartUploads() (#20407) this cache will be honored only when `prefix=""` while performing ListMultipartUploads() operation. This is mainly to satisfy applications like alluxio for their underfs implementation and tests. replaces https://github.com/minio/minio/pull/20181 --- cmd/erasure-server-pool.go | 92 +++++++++++++++++++++++++++++--- cmd/notification.go | 21 ++++++++ cmd/peer-rest-client.go | 11 ++++ cmd/peer-rest-common.go | 1 + cmd/peer-rest-server.go | 21 ++++++++ internal/grid/handlers.go | 2 + internal/grid/handlers_string.go | 11 ++-- 7 files changed, 148 insertions(+), 11 deletions(-) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index db0e8b00e..3ae572cc4 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -45,6 +45,7 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v3/sync/errgroup" "github.com/minio/pkg/v3/wildcard" + "github.com/puzpuzpuz/xsync/v3" ) type erasureServerPools struct { @@ -63,6 +64,8 @@ type erasureServerPools struct { decommissionCancelers []context.CancelFunc s3Peer *S3PeerSys + + mpCache *xsync.MapOf[string, MultipartInfo] } func (z *erasureServerPools) SinglePool() bool { @@ -216,9 +219,37 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ break } + // initialize the incomplete uploads cache + z.mpCache = xsync.NewMapOf[string, MultipartInfo]() + + go z.cleanupStaleMPCache(ctx) + return z, nil } +func (z *erasureServerPools) cleanupStaleMPCache(ctx context.Context) { + timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + z.mpCache.Range(func(id string, info MultipartInfo) bool { + if time.Since(info.Initiated) >= globalAPIConfig.getStaleUploadsExpiry() { + z.mpCache.Delete(id) + // No need to notify to peers, each node will delete its own cache. + } + return true + }) + + // Reset for the next interval + timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) + } + } +} + func (z *erasureServerPools) NewNSLock(bucket string, objects ...string) RWLocker { return z.serverPools[0].NewNSLock(bucket, objects...) } @@ -1702,15 +1733,32 @@ func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, p return ListMultipartsInfo{}, err } - if z.SinglePool() { - return z.serverPools[0].ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) - } - poolResult := ListMultipartsInfo{} poolResult.MaxUploads = maxUploads poolResult.KeyMarker = keyMarker poolResult.Prefix = prefix poolResult.Delimiter = delimiter + + // if no prefix provided, return the list from cache + if prefix == "" { + if _, err := z.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil { + return ListMultipartsInfo{}, toObjectErr(err, bucket) + } + + z.mpCache.Range(func(_ string, mp MultipartInfo) bool { + poolResult.Uploads = append(poolResult.Uploads, mp) + return true + }) + sort.Slice(poolResult.Uploads, func(i int, j int) bool { + return poolResult.Uploads[i].Initiated.Before(poolResult.Uploads[j].Initiated) + }) + return poolResult, nil + } + + if z.SinglePool() { + return z.serverPools[0].ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + } + for idx, pool := range z.serverPools { if z.IsSuspended(idx) { continue @@ -1722,15 +1770,27 @@ func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, p } poolResult.Uploads = append(poolResult.Uploads, result.Uploads...) } + return poolResult, nil } // Initiate a new multipart upload on a hashedSet based on object name. -func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (*NewMultipartUploadResult, error) { +func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (mp *NewMultipartUploadResult, err error) { if err := checkNewMultipartArgs(ctx, bucket, object); err != nil { return nil, err } + defer func() { + if err == nil && mp != nil { + z.mpCache.Store(mp.UploadID, MultipartInfo{ + Bucket: bucket, + Object: object, + UploadID: mp.UploadID, + Initiated: time.Now(), + }) + } + }() + if z.SinglePool() { return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) } @@ -1874,11 +1934,18 @@ func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object } // Aborts an in-progress multipart operation on hashedSet based on the object name. -func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { +func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) { if err := checkAbortMultipartArgs(ctx, bucket, object, uploadID); err != nil { return err } + defer func() { + if err == nil { + z.mpCache.Delete(uploadID) + globalNotificationSys.DeleteUploadID(ctx, uploadID) + } + }() + if z.SinglePool() { return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } @@ -1910,6 +1977,13 @@ func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket return objInfo, err } + defer func() { + if err == nil { + z.mpCache.Delete(uploadID) + globalNotificationSys.DeleteUploadID(ctx, uploadID) + } + }() + if z.SinglePool() { return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) } @@ -1952,6 +2026,12 @@ func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, o return bucketInfo, nil } +// ClearUploadID deletes given uploadID from cache +func (z *erasureServerPools) ClearUploadID(uploadID string) error { + z.mpCache.Delete(uploadID) + return nil +} + // DeleteBucket - deletes a bucket on all serverPools simultaneously, // even if one of the serverPools fail to delete buckets, we proceed to // undo a successful operation. diff --git a/cmd/notification.go b/cmd/notification.go index 5d65a8130..fe9e6b6dd 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -691,6 +691,27 @@ func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) { } } +// DeleteUploadID notifies all the MinIO nodes to remove the +// given uploadID from cache +func (sys *NotificationSys) DeleteUploadID(ctx context.Context, uploadID string) { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(ctx, func() error { + return client.DeleteUploadID(ctx, uploadID) + }, idx, *client.host) + } + for _, nErr := range ng.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) + if nErr.Err != nil { + peersLogOnceIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err, nErr.Host.String()) + } + } +} + // StopRebalance notifies all MinIO nodes to signal any ongoing rebalance // goroutine to stop. func (sys *NotificationSys) StopRebalance(ctx context.Context) { diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 07fc7b6a0..9b69302fe 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -467,6 +467,17 @@ func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error { return err } +func (client *peerRESTClient) DeleteUploadID(ctx context.Context, uploadID string) error { + conn := client.gridConn() + if conn == nil { + return nil + } + _, err := cleanupUploadIDCacheMetaRPC.Call(ctx, conn, grid.NewMSSWith(map[string]string{ + peerRESTUploadID: uploadID, + })) + return err +} + func (client *peerRESTClient) StopRebalance(ctx context.Context) error { conn := client.gridConn() if conn == nil { diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index bcdcaad7d..b643d68a1 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -67,6 +67,7 @@ const ( peerRESTStartRebalance = "start-rebalance" peerRESTMetrics = "metrics" peerRESTDryRun = "dry-run" + peerRESTUploadID = "up-id" peerRESTURL = "url" peerRESTSha256Sum = "sha256sum" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 3a383a156..bdca69224 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -115,6 +115,7 @@ var ( signalServiceRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerSignalService, grid.NewMSS, grid.NewNoPayload) stopRebalanceRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerStopRebalance, grid.NewMSS, grid.NewNoPayload) updateMetacacheListingRPC = grid.NewSingleHandler[*metacache, *metacache](grid.HandlerUpdateMetacacheListing, func() *metacache { return &metacache{} }, func() *metacache { return &metacache{} }) + cleanupUploadIDCacheMetaRPC = grid.NewSingleHandler[*grid.MSS, grid.NoPayload](grid.HandlerClearUploadID, grid.NewMSS, grid.NewNoPayload) // STREAMS // Set an output capacity of 100 for consoleLog and listenRPC @@ -905,6 +906,26 @@ func (s *peerRESTServer) ReloadPoolMetaHandler(mss *grid.MSS) (np grid.NoPayload return } +func (s *peerRESTServer) HandlerClearUploadID(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) { + objAPI := newObjectLayerFn() + if objAPI == nil { + return np, grid.NewRemoteErr(errServerNotInitialized) + } + + pools, ok := objAPI.(*erasureServerPools) + if !ok { + return + } + + // No need to return errors, this is not a highly strict operation. + uploadID := mss.Get(peerRESTUploadID) + if uploadID != "" { + pools.ClearUploadID(uploadID) + } + + return +} + func (s *peerRESTServer) StopRebalanceHandler(mss *grid.MSS) (np grid.NoPayload, nerr *grid.RemoteErr) { objAPI := newObjectLayerFn() if objAPI == nil { diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index e620e0a76..096aaaa52 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -114,6 +114,7 @@ const ( HandlerRenameData2 HandlerCheckParts2 HandlerRenamePart + HandlerClearUploadID // Add more above here ^^^ // If all handlers are used, the type of Handler can be changed. @@ -196,6 +197,7 @@ var handlerPrefixes = [handlerLast]string{ HandlerRenameData2: storagePrefix, HandlerCheckParts2: storagePrefix, HandlerRenamePart: storagePrefix, + HandlerClearUploadID: peerPrefix, } const ( diff --git a/internal/grid/handlers_string.go b/internal/grid/handlers_string.go index 4417e6716..eb471f32a 100644 --- a/internal/grid/handlers_string.go +++ b/internal/grid/handlers_string.go @@ -84,14 +84,15 @@ func _() { _ = x[HandlerRenameData2-73] _ = x[HandlerCheckParts2-74] _ = x[HandlerRenamePart-75] - _ = x[handlerTest-76] - _ = x[handlerTest2-77] - _ = x[handlerLast-78] + _ = x[HandlerClearUploadID-76] + _ = x[handlerTest-77] + _ = x[handlerTest2-78] + _ = x[handlerLast-79] } -const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlineRenameData2CheckParts2RenameParthandlerTesthandlerTest2handlerLast" +const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlineRenameData2CheckParts2RenamePartClearUploadIDhandlerTesthandlerTest2handlerLast" -var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 926, 936, 947, 959, 970} +var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 926, 936, 949, 960, 972, 983} func (i HandlerID) String() string { if i >= HandlerID(len(_HandlerID_index)-1) {