diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 91f572b76..8d0add523 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -231,7 +231,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( go func(o listPathOptions) { defer wg.Done() - o.Limit = 0 + o.StopDiskAtLimit = true listErr = z.listMerged(listCtx, o, filterCh) o.debugln("listMerged returned with", listErr) }(*o) diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 6b8a2d2e6..1eb1e8c01 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -104,6 +104,10 @@ type listPathOptions struct { // Replication configuration Replication replicationConfig + + // StopDiskAtLimit will stop listing on each disk when limit number off objects has been returned. + StopDiskAtLimit bool + // pool and set of where the cache is located. pool, set int } @@ -762,6 +766,12 @@ func (es *erasureSingle) listPathInner(ctx context.Context, o listPathOptions, r resolver.requestedVersions = 1 } + var limit int + if o.Limit > 0 && o.StopDiskAtLimit { + // Over-read by 1 to know if we truncate results. + limit = o.Limit + 1 + } + ctxDone := ctx.Done() return listPathRaw(ctx, listPathRawOptions{ disks: []StorageAPI{es.disk}, @@ -771,6 +781,7 @@ func (es *erasureSingle) listPathInner(ctx context.Context, o listPathOptions, r filterPrefix: o.FilterPrefix, minDisks: 1, forwardTo: o.Marker, + perDiskLimit: limit, agreed: func(entry metaCacheEntry) { select { case <-ctxDone: @@ -829,7 +840,12 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul if !o.Versioned { resolver.requestedVersions = 1 } - + var limit int + if o.Limit > 0 && o.StopDiskAtLimit { + // Over-read by 2 + 1 for every 16 in limit to give some space for resolver + // And know if we have truncated. + limit = o.Limit + 2 + (o.Limit / 16) + } ctxDone := ctx.Done() return listPathRaw(ctx, listPathRawOptions{ disks: disks, @@ -840,6 +856,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul filterPrefix: o.FilterPrefix, minDisks: listingQuorum, forwardTo: o.Marker, + perDiskLimit: limit, agreed: func(entry metaCacheEntry) { select { case <-ctxDone: @@ -1153,6 +1170,10 @@ type listPathRawOptions struct { minDisks int reportNotFound bool + // perDiskLimit will limit each disk to return n objects. + // If <= 0 all results will be returned until canceled. + perDiskLimit int + // Callbacks with results: // If set to nil, it will not be called. @@ -1227,6 +1248,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { werr = errDiskNotFound } else { werr = d.WalkDir(ctx, WalkDirOptions{ + Limit: opts.perDiskLimit, Bucket: opts.bucket, BaseDir: opts.path, Recursive: opts.recursive, @@ -1246,6 +1268,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { // askDisks is less than total // number of disks per set. werr = fd.WalkDir(ctx, WalkDirOptions{ + Limit: opts.perDiskLimit, Bucket: opts.bucket, BaseDir: opts.path, Recursive: opts.recursive, diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 5da3c6baa..8fda73118 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -53,6 +53,9 @@ type WalkDirOptions struct { // ForwardTo will forward to the given object path. ForwardTo string + + // Limit the number of returned objects if > 0. + Limit int } // WalkDir will traverse a directory and return all entries found. @@ -84,6 +87,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ return err } defer close(out) + var objsReturned int // Fast exit track to check if we are listing an object with // a trailing slash, this will avoid to list the object content. @@ -93,12 +97,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ xlStorageFormatFile)) if err == nil { // if baseDir is already a directory object, consider it - // as part of the list call, this is a AWS S3 specific + // as part of the list call, this is AWS S3 specific // behavior. out <- metaCacheEntry{ name: opts.BaseDir, metadata: metadata, } + objsReturned++ } else { st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)) if sterr == nil && st.Mode().IsRegular() { @@ -123,6 +128,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if contextCanceled(ctx) { return ctx.Err() } + if opts.Limit > 0 && objsReturned >= opts.Limit { + return nil + } s.walkMu.Lock() entries, err := s.ListDir(ctx, opts.Bucket, current, -1) @@ -144,6 +152,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } dirObjects := make(map[string]struct{}) for i, entry := range entries { + if opts.Limit > 0 && objsReturned >= opts.Limit { + return nil + } if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { // Do do not retain the file, since it doesn't // match the prefix. @@ -194,6 +205,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = pathJoin(current, meta.name) meta.name = decodeDirObject(meta.name) + objsReturned++ out <- meta return nil } @@ -213,6 +225,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1) meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = pathJoin(current, meta.name) + objsReturned++ out <- meta return nil } @@ -234,6 +247,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } for _, entry := range entries { + if opts.Limit > 0 && objsReturned >= opts.Limit { + return nil + } if entry == "" { continue } @@ -273,12 +289,14 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if isDirObj { meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator } + objsReturned++ out <- meta case osIsNotExist(err), isSysErrIsDir(err): meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) diskHealthCheckOK(ctx, err) if err == nil { // It was an object + objsReturned++ out <- meta continue } @@ -299,6 +317,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // If directory entry left on stack, pop it now. for len(dirStack) > 0 { + if opts.Limit > 0 && objsReturned >= opts.Limit { + return nil + } + if contextCanceled(ctx) { + return ctx.Err() + } pop := dirStack[len(dirStack)-1] out <- metaCacheEntry{name: pop} if opts.Recursive { diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index a3d1c618f..4247191dc 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -1769,7 +1769,7 @@ func testListObjectsContinuation(obj ObjectLayer, instanceType string, t1 TestEr } - // Formualting the result data set to be expected from ListObjects call inside the tests, + // Formulating the result data set to be expected from ListObjects call inside the tests, // This will be used in testCases and used for asserting the correctness of ListObjects output in the tests. resultCases := []ListObjectsInfo{