From e07c2ab8683e22a5d3a57d228b2b2ff8b05fe940 Mon Sep 17 00:00:00 2001 From: Poorna Date: Fri, 12 May 2023 11:19:08 -0700 Subject: [PATCH] Use hash.NewLimitReader for internal multipart calls (#17191) --- cmd/batch-handlers.go | 2 +- cmd/bucket-replication.go | 2 +- cmd/erasure-object.go | 2 +- cmd/erasure-server-pool-decom.go | 8 ++--- cmd/erasure-server-pool-rebalance.go | 4 +-- docs/site-replication/run-multi-site-ldap.sh | 18 ++++++++++++ .../run-multi-site-minio-idp.sh | 19 ++++++++++++ docs/site-replication/run-multi-site-oidc.sh | 18 ++++++++++++ internal/hash/reader.go | 29 +++++++++++++++++-- 9 files changed, 91 insertions(+), 11 deletions(-) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 7c1af59cc..eaf410044 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -413,7 +413,7 @@ func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, a } defer rd.Close() - hr, err = hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size) + hr, err = hash.NewLimitReader(rd, objInfo.Size, "", "", objInfo.Size) if err != nil { return err } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 479a5efed..68c225ec0 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1485,7 +1485,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob ) for _, partInfo := range objInfo.Parts { - hr, err = hash.NewReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize) + hr, err = hash.NewLimitReader(r, partInfo.ActualSize, "", "", partInfo.ActualSize) if err != nil { return err } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 81e96fdb1..9b2ceae34 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -2088,7 +2088,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s // rehydrate the parts back on disk as per the original xl.meta prior to transition for _, partInfo := range oi.Parts { - hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size) + hr, err := hash.NewLimitReader(gr, partInfo.Size, "", "", partInfo.Size) if err != nil { return setRestoreHeaderFn(oi, err) } diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 66321b1fe..3dfd4d5f7 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -604,9 +604,9 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, ObjectOptions{}) parts := make([]CompletePart, len(objInfo.Parts)) for i, part := range objInfo.Parts { - hr, err := hash.NewReader(gr, part.Size, "", "", part.ActualSize) + hr, err := hash.NewLimitReader(gr, part.Size, "", "", part.ActualSize) if err != nil { - return fmt.Errorf("decommissionObject: hash.NewReader() %w", err) + return fmt.Errorf("decommissionObject: hash.NewLimitReader() %w", err) } pi, err := z.PutObjectPart(ctx, bucket, objInfo.Name, res.UploadID, part.Number, @@ -638,9 +638,9 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri return err } - hr, err := hash.NewReader(gr, objInfo.Size, "", "", actualSize) + hr, err := hash.NewLimitReader(gr, objInfo.Size, "", "", actualSize) if err != nil { - return fmt.Errorf("decommissionObject: hash.NewReader() %w", err) + return fmt.Errorf("decommissionObject: hash.NewLimitReader() %w", err) } _, err = z.PutObject(ctx, bucket, diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index 0e8ceccc1..69fbe5a03 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -721,9 +721,9 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, parts := make([]CompletePart, len(oi.Parts)) for i, part := range oi.Parts { - hr, err := hash.NewReader(gr, part.Size, "", "", part.ActualSize) + hr, err := hash.NewLimitReader(gr, part.Size, "", "", part.ActualSize) if err != nil { - return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err) + return fmt.Errorf("rebalanceObject: hash.NewLimitReader() %w", err) } pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID, part.Number, diff --git a/docs/site-replication/run-multi-site-ldap.sh b/docs/site-replication/run-multi-site-ldap.sh index f58d2764b..2c881a5a7 100755 --- a/docs/site-replication/run-multi-site-ldap.sh +++ b/docs/site-replication/run-multi-site-ldap.sh @@ -149,6 +149,11 @@ if [ $? -eq 0 ]; then fi ./mc mb minio1/newbucket +# copy large upload to newbucket on minio1 +truncate -s 17M lrgfile +expected_checksum=$(cat ./lrgfile | md5sum) + +./mc cp ./lrgfile minio1/newbucket # create a bucket bucket2 on minio1. ./mc mb minio1/bucket2 @@ -181,6 +186,19 @@ if [ $? -ne 0 ]; then exit_1; fi +sleep 10 +./mc stat minio3/newbucket/lrgfile +if [ $? -ne 0 ]; then + echo "expected object to be present, exiting.." + exit_1; +fi +actual_checksum=$(./mc cat minio3/newbucket/lrgfile | md5sum) +if [ "${expected_checksum}" != "${actual_checksum}" ]; then + echo "replication failed on multipart objects expected ${expected_checksum} got ${actual_checksum}" + exit +fi +rm ./lrgfile + vID=$(./mc stat minio2/newbucket/README.md --json | jq .versionID) if [ $? -ne 0 ]; then echo "expecting object to be present. exiting.." diff --git a/docs/site-replication/run-multi-site-minio-idp.sh b/docs/site-replication/run-multi-site-minio-idp.sh index 529d010dd..71625c38b 100755 --- a/docs/site-replication/run-multi-site-minio-idp.sh +++ b/docs/site-replication/run-multi-site-minio-idp.sh @@ -170,6 +170,11 @@ if [ $? -eq 0 ]; then fi ./mc mb minio1/newbucket +# copy large upload to newbucket on minio1 +truncate -s 17M lrgfile +expected_checksum=$(cat ./lrgfile | md5sum) + +./mc cp ./lrgfile minio1/newbucket sleep 5 ./mc stat minio2/newbucket @@ -210,6 +215,20 @@ if [ $? -ne 0 ]; then exit_1; fi +sleep 10 +./mc stat minio3/newbucket/lrgfile +if [ $? -ne 0 ]; then + echo "expected object to be present, exiting.." + exit_1; +fi + +actual_checksum=$(./mc cat minio3/newbucket/lrgfile | md5sum) +if [ "${expected_checksum}" != "${actual_checksum}" ]; then + echo "replication failed on multipart objects expected ${expected_checksum} got ${actual_checksum}" + exit +fi +rm ./lrgfile + vID=$(./mc stat minio2/newbucket/README.md --json | jq .versionID) if [ $? -ne 0 ]; then echo "expecting object to be present. exiting.." diff --git a/docs/site-replication/run-multi-site-oidc.sh b/docs/site-replication/run-multi-site-oidc.sh index ff0b04f81..97010cea4 100755 --- a/docs/site-replication/run-multi-site-oidc.sh +++ b/docs/site-replication/run-multi-site-oidc.sh @@ -168,6 +168,11 @@ fi ./mc mb minio1/newbucket +# copy large upload to newbucket on minio1 +truncate -s 17M lrgfile +expected_checksum=$(cat ./lrgfile | md5sum) + +./mc cp ./lrgfile minio1/newbucket sleep 5 ./mc stat minio2/newbucket if [ $? -ne 0 ]; then @@ -211,6 +216,19 @@ if [ $? -eq 0 ]; then exit_1; fi +sleep 10 +./mc stat minio3/newbucket/lrgfile +if [ $? -ne 0 ]; then + echo "expected object to be present, exiting.." + exit_1; +fi +actual_checksum=$(./mc cat minio3/newbucket/lrgfile | md5sum) +if [ "${expected_checksum}" != "${actual_checksum}" ]; then + echo "replication failed on multipart objects expected ${expected_checksum} got ${actual_checksum}" + exit +fi +rm ./lrgfile + ./mc mb --with-lock minio3/newbucket-olock sleep 5 diff --git a/internal/hash/reader.go b/internal/hash/reader.go index b70f06922..419eed3b3 100644 --- a/internal/hash/reader.go +++ b/internal/hash/reader.go @@ -69,7 +69,14 @@ type Reader struct { // NewReader may try merge the given size, MD5 and SHA256 values // into src - if src is a Reader - to avoid computing the same // checksums multiple times. +// NewReader enforces S3 compatibility strictly by ensuring caller +// does not send more content than specified size. func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize int64) (*Reader, error) { + // return hard limited reader + return newReader(src, size, md5Hex, sha256Hex, actualSize, true) +} + +func newReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize int64, hardLimitReader bool) (*Reader, error) { MD5, err := hex.DecodeString(md5Hex) if err != nil { return nil, BadDigest{ // TODO(aead): Return an error that indicates that an invalid ETag has been specified @@ -110,7 +117,12 @@ func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize i r.checksum = MD5 r.contentSHA256 = SHA256 if r.size < 0 && size >= 0 { - r.src = etag.Wrap(ioutil.HardLimitReader(r.src, size), r.src) + switch hardLimitReader { + case true: + r.src = etag.Wrap(ioutil.HardLimitReader(r.src, size), r.src) + default: + r.src = etag.Wrap(io.LimitReader(r.src, size), r.src) + } r.size = size } if r.actualSize <= 0 && actualSize >= 0 { @@ -120,7 +132,13 @@ func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize i } if size >= 0 { - r := ioutil.HardLimitReader(src, size) + var r io.Reader + switch hardLimitReader { + case true: + r = ioutil.HardLimitReader(src, size) + default: + r = io.LimitReader(src, size) + } if _, ok := src.(etag.Tagger); !ok { src = etag.NewReader(r, MD5) } else { @@ -143,6 +161,13 @@ func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize i }, nil } +// NewLimitReader is similar to NewReader but it will read only up to actualsize specified. It will return +// EOF if actualsize is reached. +func NewLimitReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize int64) (*Reader, error) { + // return io limited reader + return newReader(src, size, md5Hex, sha256Hex, actualSize, false) +} + // ErrInvalidChecksum is returned when an invalid checksum is provided in headers. var ErrInvalidChecksum = errors.New("invalid checksum")