diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 1bd8eea97..1f9eeb9e4 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -84,7 +84,9 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data if write < int64(len(block)) { n, err := io.Copy(dst, bytes.NewReader(block[:write])) if err != nil { - if err != io.ErrClosedPipe { + // The writer will be closed incase of range queries, which will emit ErrClosedPipe. + // The reader pipe might be closed at ListObjects io.EOF ignore it. + if err != io.ErrClosedPipe && err != io.EOF { logger.LogIf(ctx, err) } return 0, err @@ -97,7 +99,8 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data n, err := io.Copy(dst, bytes.NewReader(block)) if err != nil { // The writer will be closed incase of range queries, which will emit ErrClosedPipe. - if err != io.ErrClosedPipe { + // The reader pipe might be closed at ListObjects io.EOF ignore it. + if err != io.ErrClosedPipe && err != io.EOF { logger.LogIf(ctx, err) } return 0, err diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index db5761d87..7750a45aa 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -418,11 +418,6 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt // We got a stream to start at. loadedPart := 0 - buf := bufferPool.Get().(*bytes.Buffer) - defer func() { - buf.Reset() - bufferPool.Put(buf) - }() for { select { case <-ctx.Done(): @@ -471,9 +466,27 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } } } - buf.Reset() - err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks) - if err != nil { + + pr, pw := io.Pipe() + go func() { + werr := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, + fi.Size, pw, fi, metaArr, onlineDisks) + pw.CloseWithError(werr) + }() + + tmp := newMetacacheReader(pr) + e, err := tmp.filter(o) + pr.CloseWithError(err) + entries.o = append(entries.o, e.o...) + if o.Limit > 0 && entries.len() > o.Limit { + entries.truncate(o.Limit) + return entries, nil + } + if err == nil { + // We stopped within the listing, we are done for now... + return entries, nil + } + if err != nil && err.Error() != io.EOF.Error() { switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) { case ObjectNotFound: retries++ @@ -488,24 +501,6 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt return entries, err } } - tmp, err := newMetacacheReader(buf) - if err != nil { - return entries, err - } - e, err := tmp.filter(o) - entries.o = append(entries.o, e.o...) - if o.Limit > 0 && entries.len() > o.Limit { - entries.truncate(o.Limit) - return entries, nil - } - if err == nil { - // We stopped within the listing, we are done for now... - return entries, nil - } - if !errors.Is(err, io.EOF) { - logger.LogIf(ctx, err) - return entries, err - } // We finished at the end of the block. // And should not expect any more results. @@ -824,10 +819,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { // Make sure we close the pipe so blocked writes doesn't stay around. defer r.CloseWithError(context.Canceled) - readers[i], err = newMetacacheReader(r) - if err != nil { - return err - } + readers[i] = newMetacacheReader(r) d := disks[i] // Send request to each disk. diff --git a/cmd/metacache-stream.go b/cmd/metacache-stream.go index f39215f11..b5e8601ae 100644 --- a/cmd/metacache-stream.go +++ b/cmd/metacache-stream.go @@ -18,7 +18,6 @@ package cmd import ( - "bytes" "context" "errors" "fmt" @@ -30,6 +29,7 @@ import ( "github.com/klauspost/compress/s2" "github.com/minio/minio/cmd/logger" "github.com/tinylib/msgp/msgp" + "github.com/valyala/bytebufferpool" ) // metadata stream format: @@ -246,11 +246,11 @@ type metacacheReader struct { // newMetacacheReader creates a new cache reader. // Nothing will be read from the stream yet. -func newMetacacheReader(r io.Reader) (*metacacheReader, error) { +func newMetacacheReader(r io.Reader) *metacacheReader { dec := s2DecPool.Get().(*s2.Reader) dec.Reset(r) mr := msgp.NewReader(dec) - m := metacacheReader{ + return &metacacheReader{ mr: mr, closer: func() { dec.Reset(nil) @@ -269,7 +269,6 @@ func newMetacacheReader(r io.Reader) (*metacacheReader, error) { return nil }, } - return &m, nil } func (r *metacacheReader) checkInit() { @@ -747,12 +746,6 @@ type metacacheBlockWriter struct { blockEntries int } -var bufferPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} - // newMetacacheBlockWriter provides a streaming block writer. // Each block is the size of the capacity of the input channel. // The caller should close to indicate the stream has ended. @@ -763,11 +756,13 @@ func newMetacacheBlockWriter(in <-chan metaCacheEntry, nextBlock func(b *metacac defer w.wg.Done() var current metacacheBlock var n int - buf := bufferPool.Get().(*bytes.Buffer) + + buf := bytebufferpool.Get() defer func() { buf.Reset() - bufferPool.Put(buf) + bytebufferpool.Put(buf) }() + block := newMetacacheWriter(buf, 1<<20) defer block.Close() finishBlock := func() { diff --git a/cmd/metacache-stream_test.go b/cmd/metacache-stream_test.go index 35a75444e..a79593b47 100644 --- a/cmd/metacache-stream_test.go +++ b/cmd/metacache-stream_test.go @@ -34,11 +34,7 @@ func loadMetacacheSample(t testing.TB) *metacacheReader { if err != nil { t.Fatal(err) } - r, err := newMetacacheReader(bytes.NewReader(b)) - if err != nil { - t.Fatal(err) - } - return r + return newMetacacheReader(bytes.NewReader(b)) } func loadMetacacheSampleEntries(t testing.TB) metaCacheEntriesSorted { @@ -388,10 +384,7 @@ func Test_newMetacacheStream(t *testing.T) { t.Fatal(err) } - r, err = newMetacacheReader(&buf) - if err != nil { - t.Fatal(err) - } + r = newMetacacheReader(&buf) defer r.Close() names, err := r.readNames(-1) if err != io.EOF { diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index f18dc861f..cce154ba7 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -29,30 +29,33 @@ import ( // handle all cases where we have known types of errors returned by // underlying storage layer. func toObjectErr(err error, params ...string) error { - switch err { - case errVolumeNotFound: + if err == nil { + return nil + } + switch err.Error() { + case errVolumeNotFound.Error(): apiErr := BucketNotFound{} if len(params) >= 1 { apiErr.Bucket = params[0] } return apiErr - case errVolumeNotEmpty: + case errVolumeNotEmpty.Error(): apiErr := BucketNotEmpty{} if len(params) >= 1 { apiErr.Bucket = params[0] } return apiErr - case errVolumeExists: + case errVolumeExists.Error(): apiErr := BucketExists{} if len(params) >= 1 { apiErr.Bucket = params[0] } return apiErr - case errDiskFull: + case errDiskFull.Error(): return StorageFull{} - case errTooManyOpenFiles: + case errTooManyOpenFiles.Error(): return SlowDown{} - case errFileAccessDenied: + case errFileAccessDenied.Error(): apiErr := PrefixAccessDenied{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -61,7 +64,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errFileParentIsFile: + case errFileParentIsFile.Error(): apiErr := ParentIsObject{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -70,7 +73,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errIsNotRegular: + case errIsNotRegular.Error(): apiErr := ObjectExistsAsDirectory{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -79,7 +82,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errFileVersionNotFound: + case errFileVersionNotFound.Error(): apiErr := VersionNotFound{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -91,7 +94,7 @@ func toObjectErr(err error, params ...string) error { apiErr.VersionID = params[2] } return apiErr - case errMethodNotAllowed: + case errMethodNotAllowed.Error(): apiErr := MethodNotAllowed{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -100,7 +103,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errFileNotFound: + case errFileNotFound.Error(): apiErr := ObjectNotFound{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -109,7 +112,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errUploadIDNotFound: + case errUploadIDNotFound.Error(): apiErr := InvalidUploadID{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -121,7 +124,7 @@ func toObjectErr(err error, params ...string) error { apiErr.UploadID = params[2] } return apiErr - case errFileNameTooLong: + case errFileNameTooLong.Error(): apiErr := ObjectNameInvalid{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -130,7 +133,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errDataTooLarge: + case errDataTooLarge.Error(): apiErr := ObjectTooLarge{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -139,7 +142,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errDataTooSmall: + case errDataTooSmall.Error(): apiErr := ObjectTooSmall{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -148,7 +151,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errErasureReadQuorum: + case errErasureReadQuorum.Error(): apiErr := InsufficientReadQuorum{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -157,7 +160,7 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case errErasureWriteQuorum: + case errErasureWriteQuorum.Error(): apiErr := InsufficientWriteQuorum{} if len(params) >= 1 { apiErr.Bucket = params[0] @@ -166,9 +169,9 @@ func toObjectErr(err error, params ...string) error { apiErr.Object = decodeDirObject(params[1]) } return apiErr - case io.ErrUnexpectedEOF, io.ErrShortWrite: + case io.ErrUnexpectedEOF.Error(), io.ErrShortWrite.Error(): return IncompleteBody{} - case context.Canceled, context.DeadlineExceeded: + case context.Canceled.Error(), context.DeadlineExceeded.Error(): return IncompleteBody{} } return err diff --git a/go.mod b/go.mod index 8f3329578..7b81b4450 100644 --- a/go.mod +++ b/go.mod @@ -78,7 +78,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/streadway/amqp v1.0.0 github.com/tinylib/msgp v1.1.3 - github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 // indirect + github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/willf/bloom v2.0.3+incompatible github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c diff --git a/go.sum b/go.sum index 2dd262787..c62d07b9d 100644 --- a/go.sum +++ b/go.sum @@ -694,11 +694,11 @@ github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZ github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1gBkr4QyP8= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4= -github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= @@ -797,8 +797,6 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.0 h1:8pl+sMODzuvGJkmj2W4kZihvVb5mKm8pB/X44PIQHv8= -golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -859,8 +857,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -920,7 +918,6 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe h1:WdX7u8s3yOigWAhHEaDl8r9G+4XwFQEQFtBMYyN+kXQ= golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -993,8 +990,6 @@ golang.org/x/tools v0.0.0-20200828161849-5deb26317202/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.1.1-0.20210201201750-4d4ee958a9b7 h1:/wdPW261t381NDQd8TBo63/FyvACfLICwtH8wMRoHJQ= -golang.org/x/tools v0.1.1-0.20210201201750-4d4ee958a9b7/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/ioutil/read_file.go b/pkg/ioutil/read_file.go index 6e4ce1ff0..5c745aa15 100644 --- a/pkg/ioutil/read_file.go +++ b/pkg/ioutil/read_file.go @@ -15,11 +15,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Forked from golang.org/pkg/os.ReadFile with NOATIME support. -// Copyright 2009 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the https://golang.org/LICENSE file. - package ioutil import ( @@ -40,36 +35,5 @@ func ReadFile(name string) ([]byte, error) { } defer f.Close() - var size int - if info, err := f.Stat(); err == nil { - size64 := info.Size() - if int64(int(size64)) == size64 { - size = int(size64) - } - } - size++ // one byte for final read at EOF - - // If a file claims a small size, read at least 512 bytes. - // In particular, files in Linux's /proc claim size 0 but - // then do not work right if read in small pieces, - // so an initial read of 1 byte would not work correctly. - if size < 512 { - size = 512 - } - - data := make([]byte, 0, size) - for { - if len(data) >= cap(data) { - d := append(data[:cap(data)], 0) - data = d[:len(data)] - } - n, err := f.Read(data[len(data):cap(data)]) - data = data[:len(data)+n] - if err != nil { - if err == io.EOF { - err = nil - } - return data, err - } - } + return io.ReadAll(f) } diff --git a/pkg/ioutil/wait_pipe.go b/pkg/ioutil/wait_pipe.go index 236121476..124cde78f 100644 --- a/pkg/ioutil/wait_pipe.go +++ b/pkg/ioutil/wait_pipe.go @@ -15,11 +15,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Forked from golang.org/pkg/os.ReadFile with NOATIME support. -// Copyright 2009 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the https://golang.org/LICENSE file. - package ioutil import (