From 731e03fe5abdb568086957f97e09dfce617cb2b7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 28 Jul 2023 15:37:53 -0700 Subject: [PATCH] add ReadFileStream deadline for disk call (#17745) timeout the reader side if hung via disk max timeout --- cmd/xl-storage-disk-id-check.go | 9 ++++- cmd/xl-storage.go | 10 +++++- internal/ioutil/ioutil.go | 59 +++++++++++++++++++++++++++++---- internal/ioutil/ioutil_test.go | 44 +++++++++++++++++++++--- 4 files changed, 110 insertions(+), 12 deletions(-) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 543e76ab8..574cc2c47 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -359,6 +359,7 @@ func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath stri return p.storage.ListDir(ctx, volume, dirPath, count) } +// Legacy API - does not have any deadlines func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadFile, volume, path) if err != nil { @@ -369,6 +370,7 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path return p.storage.ReadFile(ctx, volume, path, offset, buf, verifier) } +// Legacy API - does not have any deadlines func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricAppendFile, volume, path) if err != nil { @@ -396,7 +398,12 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path } defer done(&err) - return p.storage.ReadFileStream(ctx, volume, path, offset, length) + rc, err := p.storage.ReadFileStream(ctx, volume, path, offset, length) + if err != nil { + return rc, err + } + + return xioutil.NewDeadlineReader(rc, diskMaxTimeout), nil } func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 42b67c932..f81b560d3 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1885,6 +1885,10 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz } func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSize int64, r io.Reader, flags int) (err error) { + if contextCanceled(ctx) { + return ctx.Err() + } + // Create top level directories if they don't exist. // with mode 0777 mkdir honors system umask. parentFilePath := pathutil.Dir(filePath) @@ -1939,6 +1943,10 @@ func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSiz } func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b []byte, sync bool) (err error) { + if contextCanceled(ctx) { + return ctx.Err() + } + volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -2837,7 +2845,7 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path newBuf, err := xl.AppendTo(metaDataPoolGet()) if err == nil { defer metaDataPoolPut(newBuf) - return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, false) + return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) } } } diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index 093385dd2..9692a09bd 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -73,7 +73,54 @@ type ioret struct { err error } -// DeadlineWriter deadline writer with context +// DeadlineReader deadline reader with timeout +type DeadlineReader struct { + io.ReadCloser + timeout time.Duration + err error +} + +// NewDeadlineReader wraps a writer to make it respect given deadline +// value per Write(). If there is a blocking write, the returned Reader +// will return whenever the timer hits (the return values are n=0 +// and err=context.DeadlineExceeded.) +func NewDeadlineReader(r io.ReadCloser, timeout time.Duration) io.ReadCloser { + return &DeadlineReader{ReadCloser: r, timeout: timeout} +} + +func (r *DeadlineReader) Read(buf []byte) (int, error) { + if r.err != nil { + return 0, r.err + } + + c := make(chan ioret, 1) + t := time.NewTimer(r.timeout) + go func() { + n, err := r.ReadCloser.Read(buf) + c <- ioret{n, err} + close(c) + }() + + select { + case res := <-c: + if !t.Stop() { + <-t.C + } + r.err = res.err + return res.n, res.err + case <-t.C: + r.ReadCloser.Close() + r.err = context.DeadlineExceeded + return 0, context.DeadlineExceeded + } +} + +// Close closer interface to close the underlying closer +func (r *DeadlineReader) Close() error { + return r.ReadCloser.Close() +} + +// DeadlineWriter deadline writer with timeout type DeadlineWriter struct { io.WriteCloser timeout time.Duration @@ -119,15 +166,15 @@ func (d *DeadlineWorker) Run(work func() error) error { d.err = r.err return r.err case <-t.C: - d.err = context.Canceled - return context.Canceled + d.err = context.DeadlineExceeded + return context.DeadlineExceeded } } // NewDeadlineWriter wraps a writer to make it respect given deadline // value per Write(). If there is a blocking write, the returned Writer // will return whenever the timer hits (the return values are n=0 -// and err=context.Canceled.) +// and err=context.DeadlineExceeded.) func NewDeadlineWriter(w io.WriteCloser, timeout time.Duration) io.WriteCloser { return &DeadlineWriter{WriteCloser: w, timeout: timeout} } @@ -154,8 +201,8 @@ func (w *DeadlineWriter) Write(buf []byte) (int, error) { return r.n, r.err case <-t.C: w.WriteCloser.Close() - w.err = context.Canceled - return 0, context.Canceled + w.err = context.DeadlineExceeded + return 0, context.DeadlineExceeded } } diff --git a/internal/ioutil/ioutil_test.go b/internal/ioutil/ioutil_test.go index c89ec1a0a..921e78ecb 100644 --- a/internal/ioutil/ioutil_test.go +++ b/internal/ioutil/ioutil_test.go @@ -26,6 +26,19 @@ import ( "time" ) +type sleepReader struct { + timeout time.Duration +} + +func (r *sleepReader) Read(p []byte) (n int, err error) { + time.Sleep(r.timeout) + return len(p), nil +} + +func (r *sleepReader) Close() error { + return nil +} + type sleepWriter struct { timeout time.Duration } @@ -39,16 +52,39 @@ func (w *sleepWriter) Close() error { return nil } +func TestDeadlineReader(t *testing.T) { + r := NewDeadlineReader(&sleepReader{timeout: 500 * time.Millisecond}, 450*time.Millisecond) + buf := make([]byte, 4) + _, err := r.Read(buf) + r.Close() + if err != context.DeadlineExceeded { + t.Errorf("DeadlineReader shouldn't be successful %v - should return context.DeadlineExceeded", err) + } + _, err = r.Read(buf) + if err != context.DeadlineExceeded { + t.Errorf("DeadlineReader shouldn't be successful %v - should return context.DeadlineExceeded", err) + } + r = NewDeadlineReader(&sleepReader{timeout: 100 * time.Millisecond}, 600*time.Millisecond) + n, err := r.Read(buf) + r.Close() + if err != nil { + t.Errorf("DeadlineReader should succeed but failed with %s", err) + } + if n != 4 { + t.Errorf("DeadlineReader should succeed but should have only read 4 bytes, but returned %d instead", n) + } +} + func TestDeadlineWriter(t *testing.T) { w := NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 450*time.Millisecond) _, err := w.Write([]byte("1")) w.Close() - if err != context.Canceled { - t.Error("DeadlineWriter shouldn't be successful - should return context.Canceled") + if err != context.DeadlineExceeded { + t.Error("DeadlineWriter shouldn't be successful - should return context.DeadlineExceeded") } _, err = w.Write([]byte("1")) - if err != context.Canceled { - t.Error("DeadlineWriter shouldn't be successful - should return context.Canceled") + if err != context.DeadlineExceeded { + t.Error("DeadlineWriter shouldn't be successful - should return context.DeadlineExceeded") } w = NewDeadlineWriter(&sleepWriter{timeout: 100 * time.Millisecond}, 600*time.Millisecond) n, err := w.Write([]byte("abcd"))