diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index e5faab713..9bc3970d5 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -4,9 +4,7 @@ on: pull_request: branches: - master - push: - branches: - - master + - release jobs: build: diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index ae44f575b..e84518459 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -401,11 +401,12 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin } if fi.Deleted { + objInfo = fi.ToObjectInfo(bucket, object) if opts.VersionID == "" { return objInfo, toObjectErr(errFileNotFound, bucket, object) } // Make sure to return object info to provide extra information. - return fi.ToObjectInfo(bucket, object), toObjectErr(errMethodNotAllowed, bucket, object) + return objInfo, toObjectErr(errMethodNotAllowed, bucket, object) } return fi.ToObjectInfo(bucket, object), nil diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 05c69e434..c40e91243 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -151,6 +151,36 @@ func (z *erasureZones) getZonesAvailableSpace(ctx context.Context) zonesAvailabl return zones } +// getZoneIdx returns the found previous object and its corresponding zone idx, +// if none are found falls back to most available space zone. +func (z *erasureZones) getZoneIdx(ctx context.Context, bucket, object string, opts ObjectOptions) (idx int, err error) { + if z.SingleZone() { + return 0, nil + } + for i, zone := range z.zones { + objInfo, err := zone.GetObjectInfo(ctx, bucket, object, opts) + switch err.(type) { + case ObjectNotFound: + // VersionId was not specified but found delete marker or no versions exist. + case MethodNotAllowed: + // VersionId was specified but found delete marker + default: + if err != nil { + // any other un-handled errors return right here. + return -1, err + } + } + // delete marker not specified means no versions + // exist continue to next zone. + if !objInfo.DeleteMarker && err != nil { + continue + } + // Success case and when DeleteMarker is true return. + return i, nil + } + return z.getAvailableZoneIdx(ctx), nil +} + func (z *erasureZones) Shutdown(ctx context.Context) error { if z.SingleZone() { return z.zones[0].Shutdown(ctx) @@ -477,19 +507,13 @@ func (z *erasureZones) PutObject(ctx context.Context, bucket string, object stri return z.zones[0].PutObject(ctx, bucket, object, data, opts) } - for _, zone := range z.zones { - objInfo, err := zone.GetObjectInfo(ctx, bucket, object, opts) - if err != nil { - if isErrObjectNotFound(err) { - continue - } - return objInfo, err - } - // Overwrite request upload to right zone. - return zone.PutObject(ctx, bucket, object, data, opts) + idx, err := z.getZoneIdx(ctx, bucket, object, opts) + if err != nil { + return ObjectInfo{}, err } - // Object not found pick the least used and upload to this zone. - return z.zones[z.getAvailableZoneIdx(ctx)].PutObject(ctx, bucket, object, data, opts) + + // Overwrite the object at the right zone + return z.zones[idx].PutObject(ctx, bucket, object, data, opts) } func (z *erasureZones) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { @@ -565,27 +589,17 @@ func (z *erasureZones) CopyObject(ctx context.Context, srcBucket, srcObject, dst return z.zones[0].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) } - zoneIndex := -1 - for i, zone := range z.zones { - objInfo, err := zone.GetObjectInfo(ctx, dstBucket, dstObject, srcOpts) - if err != nil { - if isErrObjectNotFound(err) { - continue - } - return objInfo, err - } - zoneIndex = i - break + zoneIdx, err := z.getZoneIdx(ctx, dstBucket, dstObject, dstOpts) + if err != nil { + return objInfo, err } putOpts := ObjectOptions{ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined} - if zoneIndex >= 0 { - if cpSrcDstSame && srcInfo.metadataOnly { - return z.zones[zoneIndex].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) - } - return z.zones[zoneIndex].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) + if cpSrcDstSame && srcInfo.metadataOnly { + return z.zones[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) } - return z.zones[z.getAvailableZoneIdx(ctx)].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) + + return z.zones[zoneIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) } func (z *erasureZones) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) { @@ -1283,7 +1297,13 @@ func (z *erasureZones) NewMultipartUpload(ctx context.Context, bucket, object st if z.SingleZone() { return z.zones[0].NewMultipartUpload(ctx, bucket, object, opts) } - return z.zones[z.getAvailableZoneIdx(ctx)].NewMultipartUpload(ctx, bucket, object, opts) + + idx, err := z.getZoneIdx(ctx, bucket, object, opts) + if err != nil { + return "", err + } + + return z.zones[idx].NewMultipartUpload(ctx, bucket, object, opts) } // Copies a part of an object from source hashedSet to destination hashedSet.