diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 4f4af8148..314dd684e 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -808,52 +808,20 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object ) { storageDisks := er.getDisks() storageEndpoints := er.getEndpoints() - // Check if the object is dangling, if yes and user requested - // remove we simply delete it from namespace. - m, ok := isObjectDangling(metaArr, errs, dataErrs) - if ok { - parityBlocks := er.defaultParityCount - dataBlocks := len(storageDisks) - parityBlocks - if m.IsValid() { - parityBlocks = m.Erasure.ParityBlocks - dataBlocks = m.Erasure.DataBlocks - } - writeQuorum := dataBlocks - if dataBlocks == parityBlocks { - writeQuorum++ - } - - var err error - if opts.Remove { - err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{ - VersionID: versionID, - }, false) - - // If Delete was successful, make sure to return the appropriate error - // and heal result appropriate with delete's error messages - errs = make([]error, len(errs)) - for i := range errs { - errs[i] = err - } - if err == nil { - // Dangling object successfully purged, size is '0' - m.Size = 0 - } - - return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, - errs, bucket, object, versionID), nil - } - - return er.defaultHealResult(m, storageDisks, storageEndpoints, - errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID) + m, err := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, dataErrs, ObjectOptions{ + VersionID: versionID, + }) + errs = make([]error, len(errs)) + for i := range errs { + errs[i] = err } - - readQuorum := len(storageDisks) - er.defaultParityCount - - err := toObjectErr(reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum), - bucket, object, versionID) - return er.defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID), err + if err == nil { + // Dangling object successfully purged, size is '0' + m.Size = 0 + } + return er.defaultHealResult(m, storageDisks, storageEndpoints, + errs, bucket, object, versionID), nil } // Object is considered dangling/corrupted if any only diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 63894d2ea..453795015 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -1043,10 +1043,14 @@ func TestHealObjectErasure(t *testing.T) { z.serverPools[0].erasureDisksMu.Unlock() // Try healing now, expect to receive errDiskNotFound. - _, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan}) - // since majority of xl.meta's are not available, object quorum can't be read properly and error will be errErasureReadQuorum - if _, ok := err.(InsufficientReadQuorum); !ok { - t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err) + _, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ + ScanMode: madmin.HealDeepScan, + }) + // since majority of xl.meta's are not available, object quorum + // can't be read properly will be deleted automatically and + // err is nil + if err != nil { + t.Fatal(err) } } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 3379d4276..2de128654 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -397,6 +397,29 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin return er.getObjectInfo(ctx, bucket, object, opts) } +func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object string, metaArr []FileInfo, errs []error, dataErrs []error, opts ObjectOptions) (FileInfo, error) { + var err error + m, ok := isObjectDangling(metaArr, errs, dataErrs) + if ok { + err = errFileNotFound + if opts.VersionID != "" { + err = errFileVersionNotFound + } + defer NSUpdated(bucket, object) + + if opts.VersionID != "" { + er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{ + VersionID: opts.VersionID, + }, false) + } else { + er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{ + VersionID: m.VersionID, + }, false) + } + } + return m, err +} + func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { disks := er.getDisks() @@ -405,27 +428,20 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) if err != nil { - return fi, nil, nil, err + if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) { + _, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts) + if derr != nil { + err = derr + } + } + return fi, nil, nil, toObjectErr(err, bucket, object) } if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { if errors.Is(reducedErr, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) { - // Skip buckets that live at `.minio.sys` bucket. - if _, ok := isObjectDangling(metaArr, errs, nil); ok { - reducedErr = errFileNotFound - if opts.VersionID != "" { - reducedErr = errFileVersionNotFound - } - // Remove the dangling object only when: - // - This is a non versioned bucket - // - This is a versioned bucket and the version ID is passed, the reason - // is that we cannot fetch the ID of the latest version when we don't trust xl.meta - if !opts.Versioned || opts.VersionID != "" { - er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{ - Name: object, - VersionID: opts.VersionID, - }, false) - } + _, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts) + if derr != nil { + err = derr } } return fi, nil, nil, toObjectErr(reducedErr, bucket, object) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index b2cb19108..9bd0af472 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -335,7 +335,9 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc pinfo := poolObjInfo{ PoolIndex: i, } - opts.VersionID = "" // no need to check for specific versionId + // do not remove this check as it can lead to inconsistencies + // for all callers of bucket replication. + opts.VersionID = "" pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts) poolObjInfos[i] = pinfo }(i, pool, poolOpts[i]) @@ -353,15 +355,15 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc }) for _, pinfo := range poolObjInfos { - if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) { - return -1, pinfo.Err - } - // skip all objects from suspended pools for mutating calls. if z.IsSuspended(pinfo.PoolIndex) && opts.Mutate { continue } + if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) { + return -1, pinfo.Err + } + if isErrObjectNotFound(pinfo.Err) { // No object exists or its a delete marker, // check objInfo to confirm.