diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index d6109686c..85b369162 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1266,13 +1266,17 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec return z.serverPools[poolIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) } +func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { + return z.listObjectsGeneric(ctx, bucket, prefix, marker, delimiter, maxKeys, true) +} + func (z *erasureServerPools) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) { marker := continuationToken if marker == "" { marker = startAfter } - loi, err := z.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys) + loi, err := z.listObjectsGeneric(ctx, bucket, prefix, marker, delimiter, maxKeys, false) if err != nil { return ListObjectsV2Info{}, err } @@ -1381,9 +1385,10 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int { return maxKeys } -func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { +func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, v1 bool) (ListObjectsInfo, error) { var loi ListObjectsInfo opts := listPathOptions{ + V1: v1, Bucket: bucket, Prefix: prefix, Separator: delimiter, @@ -1554,8 +1559,23 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } if loi.IsTruncated { last := objects[len(objects)-1] - loi.NextMarker = opts.encodeMarker(last.Name) + loi.NextMarker = last.Name } + + if merged.lastSkippedEntry != "" { + if merged.lastSkippedEntry > loi.NextMarker { + // An object hidden by ILM was found during listing. Since the number of entries + // fetched from drives is limited, set IsTruncated to true to ask the s3 client + // to continue listing if it wishes in order to find if there is more objects. + loi.IsTruncated = true + loi.NextMarker = merged.lastSkippedEntry + } + } + + if loi.NextMarker != "" { + loi.NextMarker = opts.encodeMarker(loi.NextMarker) + } + return loi, nil } diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index 22598d20b..b2d553a93 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -473,6 +473,8 @@ type metaCacheEntriesSorted struct { listID string // Reuse buffers reuse bool + // Contain the last skipped object after an ILM expiry evaluation + lastSkippedEntry string } // shallowClone will create a shallow clone of the array objects, diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index e2b35aa2c..92bcc1e73 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -291,14 +291,6 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, } mu.Unlock() - // Do lifecycle filtering. - if o.Lifecycle != nil || o.Replication.Config != nil { - filterIn := make(chan metaCacheEntry, 10) - go applyBucketActions(ctx, o, filterIn, results) - // Replace results. - results = filterIn - } - // Gather results to a single channel. // Quorum is one since we are merging across sets. err := mergeEntryChannels(ctx, inputs, results, 1) @@ -339,84 +331,50 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, return nil } -// applyBucketActions applies lifecycle and replication actions on the listing -// It will filter out objects if the most recent version should be deleted by lifecycle. -// Entries that failed replication will be queued if no lifecycle rules got applied. -// out will be closed when there are no more results. -// When 'in' is closed or the context is canceled the -// function closes 'out' and exits. -func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCacheEntry, out chan<- metaCacheEntry) { - defer xioutil.SafeClose(out) +// triggerExpiryAndRepl applies lifecycle and replication actions on the listing +// It returns true if the listing is non-versioned and the given object is expired. +func triggerExpiryAndRepl(ctx context.Context, o listPathOptions, obj metaCacheEntry) (skip bool) { + versioned := o.Versioning != nil && o.Versioning.Versioned(obj.name) - for { - var obj metaCacheEntry - var ok bool - select { - case <-ctx.Done(): - return - case obj, ok = <-in: - if !ok { - return - } - } - - var skip bool - - versioned := o.Versioning != nil && o.Versioning.Versioned(obj.name) - - // skip latest object from listing only for regular - // listObjects calls, versioned based listing cannot - // filter out between versions 'obj' cannot be truncated - // in such a manner, so look for skipping an object only - // for regular ListObjects() call only. - if !o.Versioned { - fi, err := obj.fileInfo(o.Bucket) - if err != nil { - continue - } - - objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned) - if o.Lifecycle != nil { - act := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo).Action - skip = act.Delete() - if act.DeleteRestored() { - // do not skip DeleteRestored* actions - skip = false - } - } - } - - // Skip entry only if needed via ILM, skipping is never true for versioned listing. - if !skip { - select { - case <-ctx.Done(): - return - case out <- obj: - } - } - - fiv, err := obj.fileInfoVersions(o.Bucket) + // skip latest object from listing only for regular + // listObjects calls, versioned based listing cannot + // filter out between versions 'obj' cannot be truncated + // in such a manner, so look for skipping an object only + // for regular ListObjects() call only. + if !o.Versioned && !o.V1 { + fi, err := obj.fileInfo(o.Bucket) if err != nil { - continue + return } - - // Expire all versions if needed, if not attempt to queue for replication. - for _, version := range fiv.Versions { - objInfo := version.ToObjectInfo(o.Bucket, obj.name, versioned) - - if o.Lifecycle != nil { - evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo) - if evt.Action.Delete() { - globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects) - if !evt.Action.DeleteRestored() { - continue - } // queue version for replication upon expired restored copies if needed. - } - } - - queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication, 0) + objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned) + if o.Lifecycle != nil { + act := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo).Action + skip = act.Delete() && !act.DeleteRestored() } } + + fiv, err := obj.fileInfoVersions(o.Bucket) + if err != nil { + return + } + + // Expire all versions if needed, if not attempt to queue for replication. + for _, version := range fiv.Versions { + objInfo := version.ToObjectInfo(o.Bucket, obj.name, versioned) + + if o.Lifecycle != nil { + evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo) + if evt.Action.Delete() { + globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects) + if !evt.Action.DeleteRestored() { + continue + } // queue version for replication upon expired restored copies if needed. + } + } + + queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication, 0) + } + return } func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) { diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index e2b336ec7..d698ad10e 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -98,6 +98,8 @@ type listPathOptions struct { // Versioned is this a ListObjectVersions call. Versioned bool + // V1 listing type + V1 bool // Versioning config is used for if the path // has versioning enabled. @@ -172,7 +174,8 @@ func (o *listPathOptions) debugln(data ...interface{}) { } } -// gatherResults will collect all results on the input channel and filter results according to the options. +// gatherResults will collect all results on the input channel and filter results according +// to the options or to the current bucket ILM expiry rules. // Caller should close the channel when done. // The returned function will return the results once there is enough or input is closed, // or the context is canceled. @@ -214,6 +217,12 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() { continue } + if o.Lifecycle != nil || o.Replication.Config != nil { + if skipped := triggerExpiryAndRepl(ctx, *o, entry); skipped == true { + results.lastSkippedEntry = entry.name + continue + } + } if o.Limit > 0 && results.len() >= o.Limit { // We have enough and we have more. // Do not return io.EOF diff --git a/cmd/metacache-set_gen.go b/cmd/metacache-set_gen.go index 3633edc65..e46b2f046 100644 --- a/cmd/metacache-set_gen.go +++ b/cmd/metacache-set_gen.go @@ -114,6 +114,12 @@ func (z *listPathOptions) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Versioned") return } + case "V1": + z.V1, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "V1") + return + } case "StopDiskAtLimit": z.StopDiskAtLimit, err = dc.ReadBool() if err != nil { @@ -145,9 +151,9 @@ func (z *listPathOptions) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *listPathOptions) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 18 + // map header, size 19 // write "ID" - err = en.Append(0xde, 0x0, 0x12, 0xa2, 0x49, 0x44) + err = en.Append(0xde, 0x0, 0x13, 0xa2, 0x49, 0x44) if err != nil { return } @@ -296,6 +302,16 @@ func (z *listPathOptions) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Versioned") return } + // write "V1" + err = en.Append(0xa2, 0x56, 0x31) + if err != nil { + return + } + err = en.WriteBool(z.V1) + if err != nil { + err = msgp.WrapError(err, "V1") + return + } // write "StopDiskAtLimit" err = en.Append(0xaf, 0x53, 0x74, 0x6f, 0x70, 0x44, 0x69, 0x73, 0x6b, 0x41, 0x74, 0x4c, 0x69, 0x6d, 0x69, 0x74) if err != nil { @@ -332,9 +348,9 @@ func (z *listPathOptions) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *listPathOptions) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 18 + // map header, size 19 // string "ID" - o = append(o, 0xde, 0x0, 0x12, 0xa2, 0x49, 0x44) + o = append(o, 0xde, 0x0, 0x13, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) // string "Bucket" o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) @@ -378,6 +394,9 @@ func (z *listPathOptions) MarshalMsg(b []byte) (o []byte, err error) { // string "Versioned" o = append(o, 0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x65, 0x64) o = msgp.AppendBool(o, z.Versioned) + // string "V1" + o = append(o, 0xa2, 0x56, 0x31) + o = msgp.AppendBool(o, z.V1) // string "StopDiskAtLimit" o = append(o, 0xaf, 0x53, 0x74, 0x6f, 0x70, 0x44, 0x69, 0x73, 0x6b, 0x41, 0x74, 0x4c, 0x69, 0x6d, 0x69, 0x74) o = msgp.AppendBool(o, z.StopDiskAtLimit) @@ -498,6 +517,12 @@ func (z *listPathOptions) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Versioned") return } + case "V1": + z.V1, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "V1") + return + } case "StopDiskAtLimit": z.StopDiskAtLimit, bts, err = msgp.ReadBoolBytes(bts) if err != nil { @@ -530,6 +555,6 @@ func (z *listPathOptions) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *listPathOptions) Msgsize() (s int) { - s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 8 + msgp.StringPrefixSize + len(z.BaseDir) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 13 + msgp.StringPrefixSize + len(z.FilterPrefix) + 7 + msgp.StringPrefixSize + len(z.Marker) + 6 + msgp.IntSize + 9 + msgp.StringPrefixSize + len(z.AskDisks) + 12 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.StringPrefixSize + len(z.Separator) + 7 + msgp.BoolSize + 19 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.BoolSize + 16 + msgp.BoolSize + 5 + msgp.IntSize + 4 + msgp.IntSize + s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 8 + msgp.StringPrefixSize + len(z.BaseDir) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 13 + msgp.StringPrefixSize + len(z.FilterPrefix) + 7 + msgp.StringPrefixSize + len(z.Marker) + 6 + msgp.IntSize + 9 + msgp.StringPrefixSize + len(z.AskDisks) + 12 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.StringPrefixSize + len(z.Separator) + 7 + msgp.BoolSize + 19 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.BoolSize + 3 + msgp.BoolSize + 16 + msgp.BoolSize + 5 + msgp.IntSize + 4 + msgp.IntSize return }