From d00ff3c453d36079bf0d82a62fb1aaefa9d7d0ea Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 29 Sep 2021 16:40:28 -0700 Subject: [PATCH] use O_DIRECT for all ReadFileStream (#13324) This PR also removes #13312 to ensure that we can use a better mechanism to handle page-cache, using O_DIRECT even for Range GETs. --- cmd/bitrot.go | 8 +- cmd/handler-api.go | 9 +- cmd/os-readdir_unix.go | 12 -- cmd/xl-storage.go | 201 +++++-------------------- internal/disk/fdatasync_linux.go | 13 +- internal/disk/fdatasync_unix.go | 11 +- internal/disk/fdatasync_unsupported.go | 11 +- internal/ioutil/ioutil.go | 6 +- internal/ioutil/odirect_reader.go | 131 ++++++++++++++++ internal/ioutil/read_file.go | 16 +- 10 files changed, 198 insertions(+), 220 deletions(-) create mode 100644 internal/ioutil/odirect_reader.go diff --git a/cmd/bitrot.go b/cmd/bitrot.go index 5a4a971f6..81119c1da 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -27,8 +27,10 @@ import ( "io" "github.com/minio/highwayhash" - "github.com/minio/minio/internal/logger" "golang.org/x/crypto/blake2b" + + xioutil "github.com/minio/minio/internal/ioutil" + "github.com/minio/minio/internal/logger" ) // magic HH-256 key as HH-256 hash of the first 100 decimals of π as utf-8 string with a zero key. @@ -172,8 +174,8 @@ func bitrotVerify(r io.Reader, wantSize, partSize int64, algo BitrotAlgorithm, w return errFileCorrupt } - bufp := xlPoolSmall.Get().(*[]byte) - defer xlPoolSmall.Put(bufp) + bufp := xioutil.ODirectPoolSmall.Get().(*[]byte) + defer xioutil.ODirectPoolSmall.Put(bufp) for left > 0 { // Read expected hash... diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 5e3c43dad..07a460407 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -22,9 +22,11 @@ import ( "sync" "time" - "github.com/minio/minio/internal/config/api" - "github.com/minio/minio/internal/logger" mem "github.com/shirou/gopsutil/v3/mem" + + "github.com/minio/minio/internal/config/api" + xioutil "github.com/minio/minio/internal/ioutil" + "github.com/minio/minio/internal/logger" ) type apiConfig struct { @@ -71,7 +73,8 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { // total_ram / ram_per_request // ram_per_request is (2MiB+128KiB) * driveCount \ // + 2 * 10MiB (default erasure block size v1) + 2 * 1MiB (default erasure block size v2) - apiRequestsMaxPerNode = int(maxMem / uint64(maxSetDrives*(blockSizeLarge+blockSizeSmall)+int(blockSizeV1*2+blockSizeV2*2))) + blockSize := xioutil.BlockSizeLarge + xioutil.BlockSizeSmall + apiRequestsMaxPerNode = int(maxMem / uint64(maxSetDrives*blockSize+int(blockSizeV1*2+blockSizeV2*2))) if globalIsErasure { logger.Info("Automatically configured API requests per node based on available memory on the system: %d", apiRequestsMaxPerNode) diff --git a/cmd/os-readdir_unix.go b/cmd/os-readdir_unix.go index 0b6d82827..437726d52 100644 --- a/cmd/os-readdir_unix.go +++ b/cmd/os-readdir_unix.go @@ -28,7 +28,6 @@ import ( "syscall" "unsafe" - "github.com/minio/minio/internal/disk" "golang.org/x/sys/unix" ) @@ -111,11 +110,6 @@ func readDirFn(dirPath string, fn func(name string, typ os.FileMode) error) erro } return osErrToFileErr(err) } - if err := disk.Fadvise(f, disk.FadvSequential); err != nil { - return err - } - - defer disk.Fadvise(f, disk.FadvNoReuse) defer f.Close() bufp := direntPool.Get().(*[]byte) @@ -191,12 +185,6 @@ func readDirWithOpts(dirPath string, opts readDirOpts) (entries []string, err er if err != nil { return nil, osErrToFileErr(err) } - - if err := disk.Fadvise(f, disk.FadvSequential); err != nil { - return nil, err - } - - defer disk.Fadvise(f, disk.FadvNoReuse) defer f.Close() bufp := direntPool.Get().(*[]byte) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index b6e7975f8..9603d928b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -18,7 +18,6 @@ package cmd import ( - "bufio" "bytes" "context" "crypto/rand" @@ -39,7 +38,6 @@ import ( "github.com/dustin/go-humanize" jsoniter "github.com/json-iterator/go" - "github.com/klauspost/readahead" "github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/config" @@ -51,18 +49,7 @@ import ( ) const ( - nullVersionID = "null" - blockSizeSmall = 128 * humanize.KiByte // Default r/w block size for smaller objects. - blockSizeLarge = 2 * humanize.MiByte // Default r/w block size for larger objects. - blockSizeReallyLarge = 4 * humanize.MiByte // Default write block size for objects per shard >= 64MiB - - // On regular files bigger than this; - readAheadSize = 16 << 20 - // Read this many buffers ahead. - readAheadBuffers = 4 - // Size of each buffer. - readAheadBufSize = 1 << 20 - + nullVersionID = "null" // Really large streams threshold per shard. reallyLargeFileThreshold = 64 * humanize.MiByte // Optimized for HDDs @@ -78,7 +65,7 @@ const ( var alignedBuf []byte func init() { - alignedBuf = disk.AlignedBlock(4096) + alignedBuf = disk.AlignedBlock(xioutil.DirectioAlignSize) _, _ = rand.Read(alignedBuf) } @@ -97,27 +84,6 @@ func isValidVolname(volname string) bool { return true } -var ( - xlPoolReallyLarge = sync.Pool{ - New: func() interface{} { - b := disk.AlignedBlock(blockSizeReallyLarge) - return &b - }, - } - xlPoolLarge = sync.Pool{ - New: func() interface{} { - b := disk.AlignedBlock(blockSizeLarge) - return &b - }, - } - xlPoolSmall = sync.Pool{ - New: func() interface{} { - b := disk.AlignedBlock(blockSizeSmall) - return &b - }, - } -) - // xlStorage - implements StorageAPI interface. type xlStorage struct { diskPath string @@ -410,12 +376,6 @@ func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, if err != nil { return nil, err } - - if err := disk.Fadvise(f, disk.FadvSequential); err != nil { - return nil, err - } - - defer disk.Fadvise(f, disk.FadvNoReuse) defer f.Close() stat, err := f.Stat() if err != nil { @@ -1234,11 +1194,10 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string) (buf []byte, } return nil, err } - if err := disk.Fadvise(f, disk.FadvSequential); err != nil { - return nil, err + r := &xioutil.ODirectReader{ + File: f, + SmallFile: true, } - defer disk.Fadvise(f, disk.FadvNoReuse) - r := &odirectReader{f, nil, nil, true, true, s, nil} defer r.Close() buf, err = ioutil.ReadAll(r) if err != nil { @@ -1428,75 +1387,6 @@ func (s *xlStorage) openFileNoSync(filePath string, mode int) (f *os.File, err e return w, nil } -// To support O_DIRECT reads for erasure backends. -type odirectReader struct { - f *os.File - buf []byte - bufp *[]byte - freshRead bool - smallFile bool - s *xlStorage - err error -} - -// Read - Implements Reader interface. -func (o *odirectReader) Read(buf []byte) (n int, err error) { - if o.err != nil && (len(o.buf) == 0 || o.freshRead) { - return 0, o.err - } - if o.buf == nil { - if o.smallFile { - o.bufp = xlPoolSmall.Get().(*[]byte) - } else { - o.bufp = xlPoolLarge.Get().(*[]byte) - } - } - if o.freshRead { - o.buf = *o.bufp - n, err = o.f.Read(o.buf) - if err != nil && err != io.EOF { - if isSysErrInvalidArg(err) { - if err = disk.DisableDirectIO(o.f); err != nil { - o.err = err - return n, err - } - n, err = o.f.Read(o.buf) - } - if err != nil && err != io.EOF { - o.err = err - return n, err - } - } - if n == 0 { - // err is likely io.EOF - o.err = err - return n, err - } - o.err = err - o.buf = o.buf[:n] - o.freshRead = false - } - if len(buf) >= len(o.buf) { - n = copy(buf, o.buf) - o.freshRead = true - return n, o.err - } - n = copy(buf, o.buf) - o.buf = o.buf[n:] - // There is more left in buffer, do not return any EOF yet. - return n, nil -} - -// Close - Release the buffer and close the file. -func (o *odirectReader) Close() error { - if o.smallFile { - xlPoolSmall.Put(o.bufp) - } else { - xlPoolLarge.Put(o.bufp) - } - return o.f.Close() -} - // ReadFileStream - Returns the read stream of the file. func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { if offset < 0 { @@ -1514,14 +1404,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, err } - var file *os.File - // O_DIRECT only supported if offset is zero - if offset == 0 { - file, err = OpenFileDirectIO(filePath, readMode, 0666) - } else { - // Open the file for reading. - file, err = OpenFile(filePath, readMode, 0666) - } + file, err := OpenFileDirectIO(filePath, readMode, 0666) if err != nil { switch { case osIsNotExist(err): @@ -1557,52 +1440,44 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, errIsNotRegular } - // Enable sequential read access pattern - only applicable on Linux. - if err := disk.Fadvise(file, disk.FadvSequential); err != nil { - return nil, err + alignment := offset%xioutil.DirectioAlignSize == 0 + if !alignment { + if err = disk.DisableDirectIO(file); err != nil { + file.Close() + return nil, err + } } - if offset == 0 { - or := &odirectReader{file, nil, nil, true, false, s, nil} - if length <= smallFileThreshold { - or = &odirectReader{file, nil, nil, true, true, s, nil} + if offset > 0 { + if _, err = file.Seek(offset, io.SeekStart); err != nil { + file.Close() + return nil, err + } + } + + or := &xioutil.ODirectReader{ + File: file, + SmallFile: false, + } + + if length <= smallFileThreshold { + or = &xioutil.ODirectReader{ + File: file, + SmallFile: true, } - r := struct { - io.Reader - io.Closer - }{Reader: io.LimitReader(or, length), Closer: closeWrapper(func() error { - return or.Close() - })} - return r, nil } r := struct { io.Reader io.Closer - }{Reader: io.LimitReader(file, length), Closer: closeWrapper(func() error { - disk.Fadvise(file, disk.FadvNoReuse) - return file.Close() + }{Reader: io.LimitReader(or, length), Closer: closeWrapper(func() error { + if !alignment || offset+length%xioutil.DirectioAlignSize != 0 { + // invalidate page-cache for unaligned reads. + disk.FadviseDontNeed(file) + } + return or.Close() })} - if offset > 0 { - if _, err = file.Seek(offset, io.SeekStart); err != nil { - r.Close() - return nil, err - } - } - - // Add readahead to big reads - if length >= readAheadSize { - rc, err := readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize) - if err != nil { - r.Close() - return nil, err - } - return rc, nil - } - - // Just add a small 64k buffer. - r.Reader = bufio.NewReaderSize(r.Reader, 64<<10) return r, nil } @@ -1683,11 +1558,11 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz var bufp *[]byte if fileSize > 0 && fileSize >= reallyLargeFileThreshold { // use a larger 4MiB buffer for really large streams. - bufp = xlPoolReallyLarge.Get().(*[]byte) - defer xlPoolReallyLarge.Put(bufp) + bufp = xioutil.ODirectPoolXLarge.Get().(*[]byte) + defer xioutil.ODirectPoolXLarge.Put(bufp) } else { - bufp = xlPoolLarge.Get().(*[]byte) - defer xlPoolLarge.Put(bufp) + bufp = xioutil.ODirectPoolLarge.Get().(*[]byte) + defer xioutil.ODirectPoolLarge.Put(bufp) } written, err := xioutil.CopyAligned(w, r, *bufp, fileSize) diff --git a/internal/disk/fdatasync_linux.go b/internal/disk/fdatasync_linux.go index e995a137c..4d3986773 100644 --- a/internal/disk/fdatasync_linux.go +++ b/internal/disk/fdatasync_linux.go @@ -41,14 +41,7 @@ func Fdatasync(f *os.File) error { return syscall.Fdatasync(int(f.Fd())) } -// fdavise advice constants -const ( - FadvSequential = unix.FADV_SEQUENTIAL - FadvNoReuse = unix.FADV_NOREUSE -) - -// Fadvise implements possibility of choosing -// offset: 0, length: 0 -func Fadvise(f *os.File, advice int) error { - return unix.Fadvise(int(f.Fd()), 0, 0, advice) +// FadviseDontNeed invalidates page-cache +func FadviseDontNeed(f *os.File) error { + return unix.Fadvise(int(f.Fd()), 0, 0, unix.FADV_DONTNEED) } diff --git a/internal/disk/fdatasync_unix.go b/internal/disk/fdatasync_unix.go index 6ec9b4356..10c224cdc 100644 --- a/internal/disk/fdatasync_unix.go +++ b/internal/disk/fdatasync_unix.go @@ -30,14 +30,7 @@ func Fdatasync(f *os.File) error { return syscall.Fsync(int(f.Fd())) } -// fdavise advice constants -const ( - FadvSequential = 0 - FadvNoReuse = 0 -) - -// Fadvise implements possibility of choosing -// offset: 0, length: 0 -func Fadvise(f *os.File, advice int) error { +// FadviseDontNeed is a no-op +func FadviseDontNeed(f *os.File) error { return nil } diff --git a/internal/disk/fdatasync_unsupported.go b/internal/disk/fdatasync_unsupported.go index 3c317f7fe..a3beedb5d 100644 --- a/internal/disk/fdatasync_unsupported.go +++ b/internal/disk/fdatasync_unsupported.go @@ -29,14 +29,7 @@ func Fdatasync(f *os.File) error { return nil } -// fdavise advice constants -const ( - FadvSequential = 0 - FadvNoReuse = 0 -) - -// Fadvise implements possibility of choosing -// offset: 0, length: 0 -func Fadvise(f *os.File, advice int) error { +// FadviseDontNeed is a no-op +func FadviseDontNeed(f *os.File) error { return nil } diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index 5d7f57a98..6514e17f6 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -227,9 +227,9 @@ func SameFile(fi1, fi2 os.FileInfo) bool { return true } -// DirectIO alignment needs to be 4K. Defined here as +// DirectioAlignSize - DirectIO alignment needs to be 4K. Defined here as // directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. -const directioAlignSize = 4096 +const DirectioAlignSize = 4096 // CopyAligned - copies from reader to writer using the aligned input // buffer, it is expected that input buffer is page aligned to @@ -269,7 +269,7 @@ func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte, totalSize int64) (i } buf = buf[:nr] var nw int64 - if len(buf)%directioAlignSize == 0 { + if len(buf)%DirectioAlignSize == 0 { var n int // buf is aligned for directio write() n, err = w.Write(buf) diff --git a/internal/ioutil/odirect_reader.go b/internal/ioutil/odirect_reader.go new file mode 100644 index 000000000..aaffca784 --- /dev/null +++ b/internal/ioutil/odirect_reader.go @@ -0,0 +1,131 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ioutil + +import ( + "errors" + "io" + "os" + "sync" + "syscall" + + "github.com/dustin/go-humanize" + "github.com/minio/minio/internal/disk" +) + +// ODirectReader - to support O_DIRECT reads for erasure backends. +type ODirectReader struct { + File *os.File + SmallFile bool + err error + buf []byte + bufp *[]byte + seenRead bool +} + +// Block sizes constant. +const ( + BlockSizeSmall = 128 * humanize.KiByte // Default r/w block size for smaller objects. + BlockSizeLarge = 2 * humanize.MiByte // Default r/w block size for larger objects. + BlockSizeReallyLarge = 4 * humanize.MiByte // Default write block size for objects per shard >= 64MiB +) + +// O_DIRECT aligned sync.Pool's +var ( + ODirectPoolXLarge = sync.Pool{ + New: func() interface{} { + b := disk.AlignedBlock(BlockSizeReallyLarge) + return &b + }, + } + ODirectPoolLarge = sync.Pool{ + New: func() interface{} { + b := disk.AlignedBlock(BlockSizeLarge) + return &b + }, + } + ODirectPoolSmall = sync.Pool{ + New: func() interface{} { + b := disk.AlignedBlock(BlockSizeSmall) + return &b + }, + } +) + +// Invalid argument, unsupported flags such as O_DIRECT +func isSysErrInvalidArg(err error) bool { + return errors.Is(err, syscall.EINVAL) +} + +// Read - Implements Reader interface. +func (o *ODirectReader) Read(buf []byte) (n int, err error) { + if o.err != nil && (len(o.buf) == 0 || !o.seenRead) { + return 0, o.err + } + if o.buf == nil { + if o.SmallFile { + o.bufp = ODirectPoolSmall.Get().(*[]byte) + } else { + o.bufp = ODirectPoolLarge.Get().(*[]byte) + } + } + if !o.seenRead { + o.buf = *o.bufp + n, err = o.File.Read(o.buf) + if err != nil && err != io.EOF { + if isSysErrInvalidArg(err) { + if err = disk.DisableDirectIO(o.File); err != nil { + o.err = err + return n, err + } + n, err = o.File.Read(o.buf) + } + if err != nil && err != io.EOF { + o.err = err + return n, err + } + } + if n == 0 { + // err is likely io.EOF + o.err = err + return n, err + } + o.err = err + o.buf = o.buf[:n] + o.seenRead = true + } + if len(buf) >= len(o.buf) { + n = copy(buf, o.buf) + o.seenRead = false + return n, o.err + } + n = copy(buf, o.buf) + o.buf = o.buf[n:] + // There is more left in buffer, do not return any EOF yet. + return n, nil +} + +// Close - Release the buffer and close the file. +func (o *ODirectReader) Close() error { + if o.SmallFile { + ODirectPoolSmall.Put(o.bufp) + } else { + ODirectPoolLarge.Put(o.bufp) + } + return o.File.Close() +} diff --git a/internal/ioutil/read_file.go b/internal/ioutil/read_file.go index 055800d91..1a5f6d20a 100644 --- a/internal/ioutil/read_file.go +++ b/internal/ioutil/read_file.go @@ -19,7 +19,6 @@ package ioutil import ( "io" - "os" "github.com/minio/minio/internal/disk" ) @@ -31,20 +30,21 @@ import ( // // passes NOATIME flag for reads on Unix systems to avoid atime updates. func ReadFile(name string) ([]byte, error) { - f, err := os.OpenFile(name, readMode, 0) + f, err := disk.OpenFileDirectIO(name, readMode, 0666) if err != nil { return nil, err } - if err := disk.Fadvise(f, disk.FadvSequential); err != nil { - return nil, err + r := &ODirectReader{ + File: f, + SmallFile: true, } - defer disk.Fadvise(f, disk.FadvNoReuse) - defer f.Close() + defer r.Close() + st, err := f.Stat() if err != nil { - return io.ReadAll(f) + return io.ReadAll(r) } dst := make([]byte, st.Size()) - _, err = io.ReadFull(f, dst) + _, err = io.ReadFull(r, dst) return dst, err }