diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index de8b0831d..edf3f1106 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -26,15 +26,17 @@ import ( xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/ioutil" + "github.com/minio/minio/internal/ringbuffer" ) // Calculates bitrot in chunks and writes the hash into the stream. type streamingBitrotWriter struct { iow io.WriteCloser - closeWithErr func(err error) error + closeWithErr func(err error) h hash.Hash shardSize int64 canClose *sync.WaitGroup + byteBuf []byte } func (b *streamingBitrotWriter) Write(p []byte) (int, error) { @@ -62,7 +64,10 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) { } func (b *streamingBitrotWriter) Close() error { + // Close the underlying writer. + // This will also flush the ring buffer if used. err := b.iow.Close() + // Wait for all data to be written before returning else it causes race conditions. // Race condition is because of io.PipeWriter implementation. i.e consider the following // sequent of operations: @@ -73,29 +78,34 @@ func (b *streamingBitrotWriter) Close() error { if b.canClose != nil { b.canClose.Wait() } + + // Recycle the buffer. + if b.byteBuf != nil { + globalBytePoolCap.Load().Put(b.byteBuf) + b.byteBuf = nil + } return err } // newStreamingBitrotWriterBuffer returns streaming bitrot writer implementation. // The output is written to the supplied writer w. func newStreamingBitrotWriterBuffer(w io.Writer, algo BitrotAlgorithm, shardSize int64) io.Writer { - return &streamingBitrotWriter{iow: ioutil.NopCloser(w), h: algo.New(), shardSize: shardSize, canClose: nil, closeWithErr: func(err error) error { - // Similar to CloseWithError on pipes we always return nil. - return nil - }} + return &streamingBitrotWriter{iow: ioutil.NopCloser(w), h: algo.New(), shardSize: shardSize, canClose: nil, closeWithErr: func(err error) {}} } // Returns streaming bitrot writer implementation. func newStreamingBitrotWriter(disk StorageAPI, origvolume, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer { - r, w := io.Pipe() h := algo.New() + buf := globalBytePoolCap.Load().Get() + rb := ringbuffer.NewBuffer(buf[:cap(buf)]).SetBlocking(true) bw := &streamingBitrotWriter{ - iow: ioutil.NewDeadlineWriter(w, globalDriveConfig.GetMaxTimeout()), - closeWithErr: w.CloseWithError, + iow: ioutil.NewDeadlineWriter(rb.WriteCloser(), globalDriveConfig.GetMaxTimeout()), + closeWithErr: rb.CloseWithError, h: h, shardSize: shardSize, canClose: &sync.WaitGroup{}, + byteBuf: buf, } bw.canClose.Add(1) go func() { @@ -106,7 +116,7 @@ func newStreamingBitrotWriter(disk StorageAPI, origvolume, volume, filePath stri bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. totalFileSize = bitrotSumsTotalSize + length } - r.CloseWithError(disk.CreateFile(context.TODO(), origvolume, volume, filePath, totalFileSize, r)) + rb.CloseWithError(disk.CreateFile(context.TODO(), origvolume, volume, filePath, totalFileSize, rb)) }() return bw } diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 7f352f0b2..0ac308e72 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -346,7 +346,7 @@ func (e Erasure) Heal(ctx context.Context, writers []io.Writer, readers []io.Rea return err } - w := parallelWriter{ + w := multiWriter{ writers: writers, writeQuorum: 1, errs: make([]error, len(writers)), diff --git a/cmd/erasure-encode.go b/cmd/erasure-encode.go index 56f5869b0..215ac172e 100644 --- a/cmd/erasure-encode.go +++ b/cmd/erasure-encode.go @@ -21,44 +21,36 @@ import ( "context" "fmt" "io" - "sync" ) -// Writes in parallel to writers -type parallelWriter struct { +// Writes to multiple writers +type multiWriter struct { writers []io.Writer writeQuorum int errs []error } -// Write writes data to writers in parallel. -func (p *parallelWriter) Write(ctx context.Context, blocks [][]byte) error { - var wg sync.WaitGroup - +// Write writes data to writers. +func (p *multiWriter) Write(ctx context.Context, blocks [][]byte) error { for i := range p.writers { + if p.errs[i] != nil { + continue + } if p.writers[i] == nil { p.errs[i] = errDiskNotFound continue } - if p.errs[i] != nil { - continue - } - wg.Add(1) - go func(i int) { - defer wg.Done() - var n int - n, p.errs[i] = p.writers[i].Write(blocks[i]) - if p.errs[i] == nil { - if n != len(blocks[i]) { - p.errs[i] = io.ErrShortWrite - p.writers[i] = nil - } - } else { + var n int + n, p.errs[i] = p.writers[i].Write(blocks[i]) + if p.errs[i] == nil { + if n != len(blocks[i]) { + p.errs[i] = io.ErrShortWrite p.writers[i] = nil } - }(i) + } else { + p.writers[i] = nil + } } - wg.Wait() // If nilCount >= p.writeQuorum, we return nil. This is because HealFile() uses // CreateFile with p.writeQuorum=1 to accommodate healing of single disk. @@ -75,7 +67,7 @@ func (p *parallelWriter) Write(ctx context.Context, blocks [][]byte) error { // Encode reads from the reader, erasure-encodes the data and writes to the writers. func (e *Erasure) Encode(ctx context.Context, src io.Reader, writers []io.Writer, buf []byte, quorum int) (total int64, err error) { - writer := ¶llelWriter{ + writer := &multiWriter{ writers: writers, writeQuorum: quorum, errs: make([]error, len(writers)), diff --git a/internal/http/transports.go b/internal/http/transports.go index 1c3fe5c76..f88473727 100644 --- a/internal/http/transports.go +++ b/internal/http/transports.go @@ -31,6 +31,11 @@ import ( // tlsClientSessionCacheSize is the cache size for client sessions. var tlsClientSessionCacheSize = 100 +const ( + WriteBufferSize = 64 << 10 // WriteBufferSize 64KiB moving up from 4KiB default + ReadBufferSize = 64 << 10 // ReadBufferSize 64KiB moving up from 4KiB default +) + // ConnSettings - contains connection settings. type ConnSettings struct { DialContext DialContext // Custom dialContext, DialTimeout is ignored if this is already setup. @@ -72,8 +77,8 @@ func (s ConnSettings) getDefaultTransport(maxIdleConnsPerHost int) *http.Transpo Proxy: http.ProxyFromEnvironment, DialContext: dialContext, MaxIdleConnsPerHost: maxIdleConnsPerHost, - WriteBufferSize: 64 << 10, // 64KiB moving up from 4KiB default - ReadBufferSize: 64 << 10, // 64KiB moving up from 4KiB default + WriteBufferSize: WriteBufferSize, + ReadBufferSize: ReadBufferSize, IdleConnTimeout: 15 * time.Second, ResponseHeaderTimeout: 15 * time.Minute, // Conservative timeout is the default (for MinIO internode) TLSHandshakeTimeout: 10 * time.Second, diff --git a/internal/ringbuffer/LICENSE b/internal/ringbuffer/LICENSE new file mode 100644 index 000000000..c4852bb45 --- /dev/null +++ b/internal/ringbuffer/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 smallnest + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/internal/ringbuffer/README.md b/internal/ringbuffer/README.md new file mode 100644 index 000000000..83266952a --- /dev/null +++ b/internal/ringbuffer/README.md @@ -0,0 +1,60 @@ +# ringbuffer + +[![License](https://img.shields.io/:license-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![GoDoc](https://godoc.org/github.com/smallnest/ringbuffer?status.png)](http://godoc.org/github.com/smallnest/ringbuffer) [![Go Report Card](https://goreportcard.com/badge/github.com/smallnest/ringbuffer)](https://goreportcard.com/report/github.com/smallnest/ringbuffer) [![coveralls](https://coveralls.io/repos/smallnest/ringbuffer/badge.svg?branch=master&service=github)](https://coveralls.io/github/smallnest/ringbuffer?branch=master) + +A circular buffer (ring buffer) in Go, implemented io.ReaderWriter interface + +[![wikipedia](Circular_Buffer_Animation.gif)](https://github.com/smallnest/ringbuffer) + +# Usage + +```go +package main + +import ( + "fmt" + + "github.com/smallnest/ringbuffer" +) + +func main() { + rb := ringbuffer.New(1024) + + // write + rb.Write([]byte("abcd")) + fmt.Println(rb.Length()) + fmt.Println(rb.Free()) + + // read + buf := make([]byte, 4) + rb.Read(buf) + fmt.Println(string(buf)) +} +``` + +It is possible to use an existing buffer with by replacing `New` with `NewBuffer`. + + +# Blocking vs Non-blocking + +The default behavior of the ring buffer is non-blocking, +meaning that reads and writes will return immediately with an error if the operation cannot be completed. +If you want to block when reading or writing, you must enable it: + +```go + rb := ringbuffer.New(1024).SetBlocking(true) +``` + +Enabling blocking will cause the ring buffer to behave like a buffered [io.Pipe](https://pkg.go.dev/io#Pipe). + +Regular Reads will block until data is available, but not wait for a full buffer. +Writes will block until there is space available and writes bigger than the buffer will wait for reads to make space. + +`TryRead` and `TryWrite` are still available for non-blocking reads and writes. + +To signify the end of the stream, close the ring buffer from the writer side with `rb.CloseWriter()` + +Either side can use `rb.CloseWithError(err error)` to signal an error and close the ring buffer. +Any reads or writes will return the error on next call. + +In blocking mode errors are stateful and the same error will be returned until `rb.Reset()` is called. \ No newline at end of file diff --git a/internal/ringbuffer/ring_buffer.go b/internal/ringbuffer/ring_buffer.go new file mode 100644 index 000000000..314a0d748 --- /dev/null +++ b/internal/ringbuffer/ring_buffer.go @@ -0,0 +1,618 @@ +// Copyright 2019 smallnest. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package ringbuffer + +import ( + "context" + "errors" + "io" + "sync" + "unsafe" +) + +var ( + // ErrTooMuchDataToWrite is returned when the data to write is more than the buffer size. + ErrTooMuchDataToWrite = errors.New("too much data to write") + + // ErrIsFull is returned when the buffer is full and not blocking. + ErrIsFull = errors.New("ringbuffer is full") + + // ErrIsEmpty is returned when the buffer is empty and not blocking. + ErrIsEmpty = errors.New("ringbuffer is empty") + + // ErrIsNotEmpty is returned when the buffer is not empty and not blocking. + ErrIsNotEmpty = errors.New("ringbuffer is not empty") + + // ErrAcquireLock is returned when the lock is not acquired on Try operations. + ErrAcquireLock = errors.New("unable to acquire lock") + + // ErrWriteOnClosed is returned when write on a closed ringbuffer. + ErrWriteOnClosed = errors.New("write on closed ringbuffer") +) + +// RingBuffer is a circular buffer that implement io.ReaderWriter interface. +// It operates like a buffered pipe, where data written to a RingBuffer +// and can be read back from another goroutine. +// It is safe to concurrently read and write RingBuffer. +type RingBuffer struct { + buf []byte + size int + r int // next position to read + w int // next position to write + isFull bool + err error + block bool + mu sync.Mutex + wg sync.WaitGroup + readCond *sync.Cond // Signaled when data has been read. + writeCond *sync.Cond // Signaled when data has been written. +} + +// New returns a new RingBuffer whose buffer has the given size. +func New(size int) *RingBuffer { + return &RingBuffer{ + buf: make([]byte, size), + size: size, + } +} + +// NewBuffer returns a new RingBuffer whose buffer is provided. +func NewBuffer(b []byte) *RingBuffer { + return &RingBuffer{ + buf: b, + size: len(b), + } +} + +// SetBlocking sets the blocking mode of the ring buffer. +// If block is true, Read and Write will block when there is no data to read or no space to write. +// If block is false, Read and Write will return ErrIsEmpty or ErrIsFull immediately. +// By default, the ring buffer is not blocking. +// This setting should be called before any Read or Write operation or after a Reset. +func (r *RingBuffer) SetBlocking(block bool) *RingBuffer { + r.block = block + if block { + r.readCond = sync.NewCond(&r.mu) + r.writeCond = sync.NewCond(&r.mu) + } + return r +} + +// WithCancel sets a context to cancel the ring buffer. +// When the context is canceled, the ring buffer will be closed with the context error. +// A goroutine will be started and run until the provided context is canceled. +func (r *RingBuffer) WithCancel(ctx context.Context) *RingBuffer { + go func() { + select { + case <-ctx.Done(): + r.CloseWithError(ctx.Err()) + } + }() + return r +} + +func (r *RingBuffer) setErr(err error, locked bool) error { + if !locked { + r.mu.Lock() + defer r.mu.Unlock() + } + if r.err != nil && r.err != io.EOF { + return r.err + } + + switch err { + // Internal errors are transient + case nil, ErrIsEmpty, ErrIsFull, ErrAcquireLock, ErrTooMuchDataToWrite, ErrIsNotEmpty: + return err + default: + r.err = err + if r.block { + r.readCond.Broadcast() + r.writeCond.Broadcast() + } + } + return err +} + +func (r *RingBuffer) readErr(locked bool) error { + if !locked { + r.mu.Lock() + defer r.mu.Unlock() + } + if r.err != nil { + if r.err == io.EOF { + if r.w == r.r && !r.isFull { + return io.EOF + } + return nil + } + return r.err + } + return nil +} + +// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. +// Even if Read returns n < len(p), it may use all of p as scratch space during the call. +// If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more. +// When Read encounters an error or end-of-file condition after successfully reading n > 0 bytes, it returns the number of bytes read. +// It may return the (non-nil) error from the same call or return the error (and n == 0) from a subsequent call. +// Callers should always process the n > 0 bytes returned before considering the error err. +// Doing so correctly handles I/O errors that happen after reading some bytes and also both of the allowed EOF behaviors. +func (r *RingBuffer) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, r.readErr(false) + } + + r.mu.Lock() + defer r.mu.Unlock() + if err := r.readErr(true); err != nil { + return 0, err + } + + r.wg.Add(1) + defer r.wg.Done() + n, err = r.read(p) + for err == ErrIsEmpty && r.block { + r.writeCond.Wait() + if err = r.readErr(true); err != nil { + break + } + n, err = r.read(p) + } + if r.block && n > 0 { + r.readCond.Broadcast() + } + return n, err +} + +// TryRead read up to len(p) bytes into p like Read but it is not blocking. +// If it has not succeeded to acquire the lock, it return 0 as n and ErrAcquireLock. +func (r *RingBuffer) TryRead(p []byte) (n int, err error) { + ok := r.mu.TryLock() + if !ok { + return 0, ErrAcquireLock + } + defer r.mu.Unlock() + if err := r.readErr(true); err != nil { + return 0, err + } + if len(p) == 0 { + return 0, r.readErr(true) + } + + n, err = r.read(p) + if r.block && n > 0 { + r.readCond.Broadcast() + } + return n, err +} + +func (r *RingBuffer) read(p []byte) (n int, err error) { + if r.w == r.r && !r.isFull { + return 0, ErrIsEmpty + } + + if r.w > r.r { + n = r.w - r.r + if n > len(p) { + n = len(p) + } + copy(p, r.buf[r.r:r.r+n]) + r.r = (r.r + n) % r.size + return + } + + n = r.size - r.r + r.w + if n > len(p) { + n = len(p) + } + + if r.r+n <= r.size { + copy(p, r.buf[r.r:r.r+n]) + } else { + c1 := r.size - r.r + copy(p, r.buf[r.r:r.size]) + c2 := n - c1 + copy(p[c1:], r.buf[0:c2]) + } + r.r = (r.r + n) % r.size + + r.isFull = false + + return n, r.readErr(true) +} + +// ReadByte reads and returns the next byte from the input or ErrIsEmpty. +func (r *RingBuffer) ReadByte() (b byte, err error) { + r.mu.Lock() + defer r.mu.Unlock() + if err = r.readErr(true); err != nil { + return 0, err + } + for r.w == r.r && !r.isFull { + if r.block { + r.writeCond.Wait() + err = r.readErr(true) + if err != nil { + return 0, err + } + continue + } + return 0, ErrIsEmpty + } + b = r.buf[r.r] + r.r++ + if r.r == r.size { + r.r = 0 + } + + r.isFull = false + return b, r.readErr(true) +} + +// Write writes len(p) bytes from p to the underlying buf. +// It returns the number of bytes written from p (0 <= n <= len(p)) +// and any error encountered that caused the write to stop early. +// If blocking n < len(p) will be returned only if an error occurred. +// Write returns a non-nil error if it returns n < len(p). +// Write will not modify the slice data, even temporarily. +func (r *RingBuffer) Write(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, r.setErr(nil, false) + } + r.mu.Lock() + defer r.mu.Unlock() + if err := r.err; err != nil { + if err == io.EOF { + err = ErrWriteOnClosed + } + return 0, err + } + wrote := 0 + for len(p) > 0 { + n, err = r.write(p) + wrote += n + if !r.block || err == nil { + break + } + err = r.setErr(err, true) + if r.block && (err == ErrIsFull || err == ErrTooMuchDataToWrite) { + r.writeCond.Broadcast() + r.readCond.Wait() + p = p[n:] + err = nil + continue + } + break + } + if r.block && wrote > 0 { + r.writeCond.Broadcast() + } + + return wrote, r.setErr(err, true) +} + +// TryWrite writes len(p) bytes from p to the underlying buf like Write, but it is not blocking. +// If it has not succeeded to acquire the lock, it return 0 as n and ErrAcquireLock. +func (r *RingBuffer) TryWrite(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, r.setErr(nil, false) + } + ok := r.mu.TryLock() + if !ok { + return 0, ErrAcquireLock + } + defer r.mu.Unlock() + if err := r.err; err != nil { + if err == io.EOF { + err = ErrWriteOnClosed + } + return 0, err + } + + n, err = r.write(p) + if r.block && n > 0 { + r.writeCond.Broadcast() + } + return n, r.setErr(err, true) +} + +func (r *RingBuffer) write(p []byte) (n int, err error) { + if r.isFull { + return 0, ErrIsFull + } + + var avail int + if r.w >= r.r { + avail = r.size - r.w + r.r + } else { + avail = r.r - r.w + } + + if len(p) > avail { + err = ErrTooMuchDataToWrite + p = p[:avail] + } + n = len(p) + + if r.w >= r.r { + c1 := r.size - r.w + if c1 >= n { + copy(r.buf[r.w:], p) + r.w += n + } else { + copy(r.buf[r.w:], p[:c1]) + c2 := n - c1 + copy(r.buf[0:], p[c1:]) + r.w = c2 + } + } else { + copy(r.buf[r.w:], p) + r.w += n + } + + if r.w == r.size { + r.w = 0 + } + if r.w == r.r { + r.isFull = true + } + + return n, err +} + +// WriteByte writes one byte into buffer, and returns ErrIsFull if buffer is full. +func (r *RingBuffer) WriteByte(c byte) error { + r.mu.Lock() + defer r.mu.Unlock() + if err := r.err; err != nil { + if err == io.EOF { + err = ErrWriteOnClosed + } + return err + } + err := r.writeByte(c) + for err == ErrIsFull && r.block { + r.readCond.Wait() + err = r.setErr(r.writeByte(c), true) + } + if r.block && err == nil { + r.writeCond.Broadcast() + } + return err +} + +// TryWriteByte writes one byte into buffer without blocking. +// If it has not succeeded to acquire the lock, it return ErrAcquireLock. +func (r *RingBuffer) TryWriteByte(c byte) error { + ok := r.mu.TryLock() + if !ok { + return ErrAcquireLock + } + defer r.mu.Unlock() + if err := r.err; err != nil { + if err == io.EOF { + err = ErrWriteOnClosed + } + return err + } + + err := r.writeByte(c) + if err == nil && r.block { + r.writeCond.Broadcast() + } + return err +} + +func (r *RingBuffer) writeByte(c byte) error { + if r.w == r.r && r.isFull { + return ErrIsFull + } + r.buf[r.w] = c + r.w++ + + if r.w == r.size { + r.w = 0 + } + if r.w == r.r { + r.isFull = true + } + + return nil +} + +// Length return the length of available read bytes. +func (r *RingBuffer) Length() int { + r.mu.Lock() + defer r.mu.Unlock() + + if r.w == r.r { + if r.isFull { + return r.size + } + return 0 + } + + if r.w > r.r { + return r.w - r.r + } + + return r.size - r.r + r.w +} + +// Capacity returns the size of the underlying buffer. +func (r *RingBuffer) Capacity() int { + return r.size +} + +// Free returns the length of available bytes to write. +func (r *RingBuffer) Free() int { + r.mu.Lock() + defer r.mu.Unlock() + + if r.w == r.r { + if r.isFull { + return 0 + } + return r.size + } + + if r.w < r.r { + return r.r - r.w + } + + return r.size - r.w + r.r +} + +// WriteString writes the contents of the string s to buffer, which accepts a slice of bytes. +func (r *RingBuffer) WriteString(s string) (n int, err error) { + x := (*[2]uintptr)(unsafe.Pointer(&s)) + h := [3]uintptr{x[0], x[1], x[1]} + buf := *(*[]byte)(unsafe.Pointer(&h)) + return r.Write(buf) +} + +// Bytes returns all available read bytes. +// It does not move the read pointer and only copy the available data. +// If the dst is big enough it will be used as destination, +// otherwise a new buffer will be allocated. +func (r *RingBuffer) Bytes(dst []byte) []byte { + r.mu.Lock() + defer r.mu.Unlock() + getDst := func(n int) []byte { + if cap(dst) < n { + return make([]byte, n) + } + return dst[:n] + } + + if r.w == r.r { + if r.isFull { + buf := getDst(r.size) + copy(buf, r.buf[r.r:]) + copy(buf[r.size-r.r:], r.buf[:r.w]) + return buf + } + return nil + } + + if r.w > r.r { + buf := getDst(r.w - r.r) + copy(buf, r.buf[r.r:r.w]) + return buf + } + + n := r.size - r.r + r.w + buf := getDst(n) + + if r.r+n < r.size { + copy(buf, r.buf[r.r:r.r+n]) + } else { + c1 := r.size - r.r + copy(buf, r.buf[r.r:r.size]) + c2 := n - c1 + copy(buf[c1:], r.buf[0:c2]) + } + + return buf +} + +// IsFull returns this ringbuffer is full. +func (r *RingBuffer) IsFull() bool { + r.mu.Lock() + defer r.mu.Unlock() + + return r.isFull +} + +// IsEmpty returns this ringbuffer is empty. +func (r *RingBuffer) IsEmpty() bool { + r.mu.Lock() + defer r.mu.Unlock() + + return !r.isFull && r.w == r.r +} + +// CloseWithError closes the writer; reads will return +// no bytes and the error err, or EOF if err is nil. +// +// CloseWithError never overwrites the previous error if it exists +// and always returns nil. +func (r *RingBuffer) CloseWithError(err error) { + if err == nil { + err = io.EOF + } + r.setErr(err, false) +} + +// CloseWriter closes the writer. +// Reads will return any remaining bytes and io.EOF. +func (r *RingBuffer) CloseWriter() { + r.setErr(io.EOF, false) +} + +// Flush waits for the buffer to be empty and fully read. +// If not blocking ErrIsNotEmpty will be returned if the buffer still contains data. +func (r *RingBuffer) Flush() error { + r.mu.Lock() + defer r.mu.Unlock() + for r.w != r.r || r.isFull { + err := r.readErr(true) + if err != nil { + if err == io.EOF { + err = nil + } + return err + } + if !r.block { + return ErrIsNotEmpty + } + r.readCond.Wait() + } + + err := r.readErr(true) + if err == io.EOF { + return nil + } + return err +} + +// Reset the read pointer and writer pointer to zero. +func (r *RingBuffer) Reset() { + r.mu.Lock() + defer r.mu.Unlock() + + // Set error so any readers/writers will return immediately. + r.setErr(errors.New("reset called"), true) + if r.block { + r.readCond.Broadcast() + r.writeCond.Broadcast() + } + + // Unlock the mutex so readers/writers can finish. + r.mu.Unlock() + r.wg.Wait() + r.mu.Lock() + r.r = 0 + r.w = 0 + r.err = nil + r.isFull = false +} + +// WriteCloser returns a WriteCloser that writes to the ring buffer. +// When the returned WriteCloser is closed, it will wait for all data to be read before returning. +func (r *RingBuffer) WriteCloser() io.WriteCloser { + return &writeCloser{RingBuffer: r} +} + +type writeCloser struct { + *RingBuffer +} + +// Close provides a close method for the WriteCloser. +func (wc *writeCloser) Close() error { + wc.CloseWriter() + return wc.Flush() +} diff --git a/internal/ringbuffer/ring_buffer_benchmark_test.go b/internal/ringbuffer/ring_buffer_benchmark_test.go new file mode 100644 index 000000000..65f34571b --- /dev/null +++ b/internal/ringbuffer/ring_buffer_benchmark_test.go @@ -0,0 +1,111 @@ +package ringbuffer + +import ( + "io" + "strings" + "testing" +) + +func BenchmarkRingBuffer_Sync(b *testing.B) { + rb := New(1024) + data := []byte(strings.Repeat("a", 512)) + buf := make([]byte, 512) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rb.Write(data) + rb.Read(buf) + } +} + +func BenchmarkRingBuffer_AsyncRead(b *testing.B) { + // Pretty useless benchmark, but it's here for completeness. + rb := New(1024) + data := []byte(strings.Repeat("a", 512)) + buf := make([]byte, 512) + + go func() { + for { + rb.Read(buf) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rb.Write(data) + } +} + +func BenchmarkRingBuffer_AsyncReadBlocking(b *testing.B) { + const sz = 512 + const buffers = 10 + rb := New(sz * buffers) + rb.SetBlocking(true) + data := []byte(strings.Repeat("a", sz)) + buf := make([]byte, sz) + + go func() { + for { + rb.Read(buf) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rb.Write(data) + } +} + +func BenchmarkRingBuffer_AsyncWrite(b *testing.B) { + rb := New(1024) + data := []byte(strings.Repeat("a", 512)) + buf := make([]byte, 512) + + go func() { + for { + rb.Write(data) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rb.Read(buf) + } +} + +func BenchmarkRingBuffer_AsyncWriteBlocking(b *testing.B) { + const sz = 512 + const buffers = 10 + rb := New(sz * buffers) + rb.SetBlocking(true) + data := []byte(strings.Repeat("a", sz)) + buf := make([]byte, sz) + + go func() { + for { + rb.Write(data) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rb.Read(buf) + } +} + +func BenchmarkIoPipeReader(b *testing.B) { + pr, pw := io.Pipe() + data := []byte(strings.Repeat("a", 512)) + buf := make([]byte, 512) + + go func() { + for { + pw.Write(data) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pr.Read(buf) + } +} diff --git a/internal/ringbuffer/ring_buffer_test.go b/internal/ringbuffer/ring_buffer_test.go new file mode 100644 index 000000000..a7f7c219f --- /dev/null +++ b/internal/ringbuffer/ring_buffer_test.go @@ -0,0 +1,1049 @@ +package ringbuffer + +import ( + "bytes" + "errors" + "fmt" + "hash/crc32" + "io" + "math/rand" + "os" + "runtime" + "strings" + "sync" + "testing" + "time" +) + +func TestRingBuffer_interface(t *testing.T) { + rb := New(1) + var _ io.Writer = rb + var _ io.Reader = rb + // var _ io.StringWriter = rb + var _ io.ByteReader = rb + var _ io.ByteWriter = rb +} + +func TestRingBuffer_Write(t *testing.T) { + rb := New(64) + + // check empty or full + if !rb.IsEmpty() { + t.Fatalf("expect IsEmpty is true but got false") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 64 { + t.Fatalf("expect free 64 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + + // write 4 * 4 = 16 bytes + n, err := rb.Write([]byte(strings.Repeat("abcd", 4))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 16 { + t.Fatalf("expect write 16 bytes but got %d", n) + } + if rb.Length() != 16 { + t.Fatalf("expect len 16 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 48 { + t.Fatalf("expect free 48 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte(strings.Repeat("abcd", 4))) { + t.Fatalf("expect 4 abcd but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + + // write 48 bytes, should full + n, err = rb.Write([]byte(strings.Repeat("abcd", 12))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 48 { + t.Fatalf("expect write 48 bytes but got %d", n) + } + if rb.Length() != 64 { + t.Fatalf("expect len 64 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if rb.w != 0 { + t.Fatalf("expect r.w=0 but got %d. r.r=%d", rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte(strings.Repeat("abcd", 16))) { + t.Fatalf("expect 16 abcd but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if !rb.IsFull() { + t.Fatalf("expect IsFull is true but got false") + } + + // write more 4 bytes, should reject + n, err = rb.Write([]byte(strings.Repeat("abcd", 1))) + if err == nil { + t.Fatalf("expect an error but got nil. n=%d, r.w=%d, r.r=%d", n, rb.w, rb.r) + } + if err != ErrIsFull { + t.Fatalf("expect ErrIsFull but got nil") + } + if n != 0 { + t.Fatalf("expect write 0 bytes but got %d", n) + } + if rb.Length() != 64 { + t.Fatalf("expect len 64 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if !rb.IsFull() { + t.Fatalf("expect IsFull is true but got false") + } + + // reset this ringbuffer and set a long slice + rb.Reset() + n, err = rb.Write([]byte(strings.Repeat("abcd", 20))) + if err == nil { + t.Fatalf("expect ErrTooManyDataToWrite but got nil") + } + if n != 64 { + t.Fatalf("expect write 64 bytes but got %d", n) + } + if rb.Length() != 64 { + t.Fatalf("expect len 64 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if rb.w != 0 { + t.Fatalf("expect r.w=0 but got %d. r.r=%d", rb.w, rb.r) + } + + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if !rb.IsFull() { + t.Fatalf("expect IsFull is true but got false") + } + + if !bytes.Equal(rb.Bytes(nil), []byte(strings.Repeat("abcd", 16))) { + t.Fatalf("expect 16 abcd but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + + rb.Reset() + // write 4 * 2 = 8 bytes + n, err = rb.Write([]byte(strings.Repeat("abcd", 2))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 8 { + t.Fatalf("expect write 16 bytes but got %d", n) + } + if rb.Length() != 8 { + t.Fatalf("expect len 16 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 56 { + t.Fatalf("expect free 48 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + buf := make([]byte, 5) + rb.Read(buf) + if rb.Length() != 3 { + t.Fatalf("expect len 3 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + rb.Write([]byte(strings.Repeat("abcd", 15))) + + if !bytes.Equal(rb.Bytes(nil), []byte("bcd"+strings.Repeat("abcd", 15))) { + t.Fatalf("expect 63 ... but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + + rb.Reset() + n, err = rb.Write([]byte(strings.Repeat("abcd", 16))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 64 { + t.Fatalf("expect write 64 bytes but got %d", n) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + buf = make([]byte, 16) + rb.Read(buf) + n, err = rb.Write([]byte(strings.Repeat("1234", 4))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 16 { + t.Fatalf("expect write 16 bytes but got %d", n) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(append(buf, rb.Bytes(nil)...), []byte(strings.Repeat("abcd", 16)+strings.Repeat("1234", 4))) { + t.Fatalf("expect 16 abcd and 4 1234 but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } +} + +func TestRingBuffer_WriteBlocking(t *testing.T) { + rb := New(64).SetBlocking(true) + + // check empty or full + if !rb.IsEmpty() { + t.Fatalf("expect IsEmpty is true but got false") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 64 { + t.Fatalf("expect free 64 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + + // write 4 * 4 = 16 bytes + n, err := rb.Write([]byte(strings.Repeat("abcd", 4))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 16 { + t.Fatalf("expect write 16 bytes but got %d", n) + } + if rb.Length() != 16 { + t.Fatalf("expect len 16 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 48 { + t.Fatalf("expect free 48 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte(strings.Repeat("abcd", 4))) { + t.Fatalf("expect 4 abcd but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + + // write 48 bytes, should full + n, err = rb.Write([]byte(strings.Repeat("abcd", 12))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 48 { + t.Fatalf("expect write 48 bytes but got %d", n) + } + if rb.Length() != 64 { + t.Fatalf("expect len 64 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if rb.w != 0 { + t.Fatalf("expect r.w=0 but got %d. r.r=%d", rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte(strings.Repeat("abcd", 16))) { + t.Fatalf("expect 16 abcd but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if !rb.IsFull() { + t.Fatalf("expect IsFull is true but got false") + } + + rb.Reset() + // write 4 * 2 = 8 bytes + n, err = rb.Write([]byte(strings.Repeat("abcd", 2))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 8 { + t.Fatalf("expect write 16 bytes but got %d", n) + } + if rb.Length() != 8 { + t.Fatalf("expect len 16 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 56 { + t.Fatalf("expect free 48 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + buf := make([]byte, 5) + rb.Read(buf) + if rb.Length() != 3 { + t.Fatalf("expect len 3 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + rb.Write([]byte(strings.Repeat("abcd", 15))) + + if !bytes.Equal(rb.Bytes(nil), []byte("bcd"+strings.Repeat("abcd", 15))) { + t.Fatalf("expect 63 ... but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + + rb.Reset() + n, err = rb.Write([]byte(strings.Repeat("abcd", 16))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 64 { + t.Fatalf("expect write 64 bytes but got %d", n) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + buf = make([]byte, 16) + rb.Read(buf) + n, err = rb.Write([]byte(strings.Repeat("1234", 4))) + if err != nil { + t.Fatalf("write failed: %v", err) + } + if n != 16 { + t.Fatalf("expect write 16 bytes but got %d", n) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(append(buf, rb.Bytes(nil)...), []byte(strings.Repeat("abcd", 16)+strings.Repeat("1234", 4))) { + t.Fatalf("expect 16 abcd and 4 1234 but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } +} + +func TestRingBuffer_Read(t *testing.T) { + defer timeout(5 * time.Second)() + rb := New(64) + + // check empty or full + if !rb.IsEmpty() { + t.Fatalf("expect IsEmpty is true but got false") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 64 { + t.Fatalf("expect free 64 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + + // read empty + buf := make([]byte, 1024) + n, err := rb.Read(buf) + if err == nil { + t.Fatalf("expect an error but got nil") + } + if err != ErrIsEmpty { + t.Fatalf("expect ErrIsEmpty but got nil") + } + if n != 0 { + t.Fatalf("expect read 0 bytes but got %d", n) + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 64 { + t.Fatalf("expect free 64 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if rb.r != 0 { + t.Fatalf("expect r.r=0 but got %d. r.w=%d", rb.r, rb.w) + } + + // write 16 bytes to read + rb.Write([]byte(strings.Repeat("abcd", 4))) + n, err = rb.Read(buf) + if err != nil { + t.Fatalf("read failed: %v", err) + } + if n != 16 { + t.Fatalf("expect read 16 bytes but got %d", n) + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 64 { + t.Fatalf("expect free 64 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if rb.r != 16 { + t.Fatalf("expect r.r=16 but got %d. r.w=%d", rb.r, rb.w) + } + + // write long slice to read + rb.Write([]byte(strings.Repeat("abcd", 20))) + n, err = rb.Read(buf) + if err != nil { + t.Fatalf("read failed: %v", err) + } + if n != 64 { + t.Fatalf("expect read 64 bytes but got %d", n) + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 64 { + t.Fatalf("expect free 64 bytes but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if rb.r != 16 { + t.Fatalf("expect r.r=16 but got %d. r.w=%d", rb.r, rb.w) + } +} + +func TestRingBuffer_Blocking(t *testing.T) { + // Typical runtime is ~5-10s. + defer timeout(60 * time.Second)() + const debug = false + + var readBytes int + var wroteBytes int + var readBuf bytes.Buffer + var wroteBuf bytes.Buffer + readHash := crc32.NewIEEE() + wroteHash := crc32.NewIEEE() + read := io.Writer(readHash) + wrote := io.Writer(wroteHash) + if debug { + read = io.MultiWriter(read, &readBuf) + wrote = io.MultiWriter(wrote, &wroteBuf) + } + debugln := func(args ...interface{}) { + if debug { + fmt.Println(args...) + } + } + // Inject random reader/writer sleeps. + const maxSleep = int(1 * time.Millisecond) + doSleep := !testing.Short() + rb := New(4 << 10).SetBlocking(true) + + // Reader + var readErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + readRng := rand.New(rand.NewSource(1)) + defer wg.Done() + defer rb.CloseWithError(readErr) + buf := make([]byte, 1024) + for { + // Read + n, err := rb.Read(buf[:readRng.Intn(len(buf))]) + readBytes += n + read.Write(buf[:n]) + debugln("READ 1\t", n, readBytes) + if err != nil { + readErr = err + break + } + + // ReadByte + b, err := rb.ReadByte() + if err != nil { + readErr = err + break + } + readBytes++ + read.Write([]byte{b}) + debugln("READ 2\t", 1, readBytes) + + // TryRead + n, err = rb.TryRead(buf[:readRng.Intn(len(buf))]) + readBytes += n + read.Write(buf[:n]) + debugln("READ 3\t", n, readBytes) + if err != nil && err != ErrAcquireLock && err != ErrIsEmpty { + readErr = err + break + } + if doSleep && readRng.Intn(20) == 0 { + time.Sleep(time.Duration(readRng.Intn(maxSleep))) + } + } + }() + + // Writer + { + buf := make([]byte, 1024) + writeRng := rand.New(rand.NewSource(2)) + for i := 0; i < 2500; i++ { + writeRng.Read(buf) + // Write + n, err := rb.Write(buf[:writeRng.Intn(len(buf))]) + if err != nil { + t.Fatalf("write failed: %v", err) + } + wroteBytes += n + wrote.Write(buf[:n]) + debugln("WRITE 1\t", n, wroteBytes) + + // WriteString + n, err = rb.WriteString(string(buf[:writeRng.Intn(len(buf))])) + if err != nil { + t.Fatalf("write failed: %v", err) + } + wroteBytes += n + wrote.Write(buf[:n]) + debugln("WRITE 2\t", writeRng.Intn(len(buf)), wroteBytes) + + // WriteByte + err = rb.WriteByte(buf[0]) + if err != nil { + t.Fatalf("write failed: %v", err) + } + wroteBytes++ + wrote.Write(buf[:1]) + debugln("WRITE 3\t", 1, wroteBytes) + + // TryWrite + n, err = rb.TryWrite(buf[:writeRng.Intn(len(buf))]) + if err != nil && err != ErrAcquireLock && err != ErrTooMuchDataToWrite && err != ErrIsFull { + t.Fatalf("write failed: %v", err) + } + wroteBytes += n + wrote.Write(buf[:n]) + debugln("WRITE 4\t", n, wroteBytes) + + // TryWriteByte + err = rb.TryWriteByte(buf[0]) + if err != nil && err != ErrAcquireLock && err != ErrTooMuchDataToWrite && err != ErrIsFull { + t.Fatalf("write failed: %v", err) + } + if err == nil { + wroteBytes++ + wrote.Write(buf[:1]) + debugln("WRITE 5\t", 1, wroteBytes) + } + if doSleep && writeRng.Intn(10) == 0 { + time.Sleep(time.Duration(writeRng.Intn(maxSleep))) + } + } + if err := rb.Flush(); err != nil { + t.Fatalf("flush failed: %v", err) + } + rb.CloseWriter() + } + wg.Wait() + if !errors.Is(readErr, io.EOF) { + t.Fatalf("expect io.EOF but got %v", readErr) + } + if readBytes != wroteBytes { + a, b := readBuf.Bytes(), wroteBuf.Bytes() + if debug && !bytes.Equal(a, b) { + common := len(a) + for i := range a { + if a[i] != b[i] { + common = i + break + } + } + a, b = a[common:], b[common:] + if len(a) > 64 { + a = a[:64] + } + if len(b) > 64 { + b = b[:64] + } + t.Errorf("after %d common bytes, difference \nread: %x\nwrote:%x", common, a, b) + } + t.Fatalf("expect read %d bytes but got %d", wroteBytes, readBytes) + } + if readHash.Sum32() != wroteHash.Sum32() { + t.Fatalf("expect read hash 0x%08x but got 0x%08x", readHash.Sum32(), wroteHash.Sum32()) + } +} + +func TestRingBuffer_BlockingBig(t *testing.T) { + // Typical runtime is ~5-10s. + defer timeout(60 * time.Second)() + const debug = false + + var readBytes int + var wroteBytes int + readHash := crc32.NewIEEE() + wroteHash := crc32.NewIEEE() + var readBuf bytes.Buffer + var wroteBuf bytes.Buffer + read := io.Writer(readHash) + wrote := io.Writer(wroteHash) + if debug { + read = io.MultiWriter(read, &readBuf) + wrote = io.MultiWriter(wrote, &wroteBuf) + } + debugln := func(args ...interface{}) { + if debug { + fmt.Println(args...) + } + } + // Inject random reader/writer sleeps. + const maxSleep = int(1 * time.Millisecond) + doSleep := !testing.Short() + rb := New(4 << 10).SetBlocking(true) + + // Reader + var readErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + defer rb.CloseWithError(readErr) + readRng := rand.New(rand.NewSource(1)) + buf := make([]byte, 64<<10) + for { + // Read + n, err := rb.Read(buf[:readRng.Intn(len(buf))]) + readBytes += n + read.Write(buf[:n]) + if err != nil { + readErr = err + break + } + debugln("READ 1\t", n, readBytes) + + // ReadByte + b, err := rb.ReadByte() + if err != nil { + readErr = err + break + } + readBytes++ + read.Write([]byte{b}) + debugln("READ 2\t", 1, readBytes) + + // TryRead + n, err = rb.TryRead(buf[:readRng.Intn(len(buf))]) + readBytes += n + read.Write(buf[:n]) + if err != nil && err != ErrAcquireLock && err != ErrIsEmpty { + readErr = err + break + } + debugln("READ 3\t", n, readBytes) + if doSleep && readRng.Intn(20) == 0 { + time.Sleep(time.Duration(readRng.Intn(maxSleep))) + } + } + }() + + // Writer + { + writeRng := rand.New(rand.NewSource(2)) + buf := make([]byte, 64<<10) + for i := 0; i < 500; i++ { + writeRng.Read(buf) + // Write + n, err := rb.Write(buf[:writeRng.Intn(len(buf))]) + if err != nil { + t.Fatalf("write failed: %v", err) + } + wroteBytes += n + wrote.Write(buf[:n]) + debugln("WRITE 1\t", n, wroteBytes) + + // WriteString + n, err = rb.WriteString(string(buf[:writeRng.Intn(len(buf))])) + if err != nil { + t.Fatalf("write failed: %v", err) + } + wroteBytes += n + wrote.Write(buf[:n]) + debugln("WRITE 2\t", writeRng.Intn(len(buf)), wroteBytes) + + // WriteByte + err = rb.WriteByte(buf[0]) + if err != nil { + t.Fatalf("write failed: %v", err) + } + wroteBytes++ + wrote.Write(buf[:1]) + debugln("WRITE 3\t", 1, wroteBytes) + + // TryWrite + n, err = rb.TryWrite(buf[:writeRng.Intn(len(buf))]) + if err != nil && err != ErrAcquireLock && err != ErrTooMuchDataToWrite && err != ErrIsFull { + t.Fatalf("write failed: %v", err) + } + wroteBytes += n + wrote.Write(buf[:n]) + debugln("WRITE 4\t", n, wroteBytes) + + // TryWriteByte + err = rb.TryWriteByte(buf[0]) + if err != nil && err != ErrAcquireLock && err != ErrTooMuchDataToWrite && err != ErrIsFull { + t.Fatalf("write failed: %v", err) + } + if err == nil { + wroteBytes++ + wrote.Write(buf[:1]) + debugln("WRITE 5\t", 1, wroteBytes) + } + if doSleep && writeRng.Intn(10) == 0 { + time.Sleep(time.Duration(writeRng.Intn(maxSleep))) + } + } + if err := rb.Flush(); err != nil { + t.Fatalf("flush failed: %v", err) + } + rb.CloseWriter() + } + wg.Wait() + if !errors.Is(readErr, io.EOF) { + t.Fatalf("expect io.EOF but got %v", readErr) + } + if readBytes != wroteBytes { + a, b := readBuf.Bytes(), wroteBuf.Bytes() + if debug && !bytes.Equal(a, b) { + common := len(a) + for i := range a { + if a[i] != b[i] { + t.Errorf("%x != %x", a[i], b[i]) + common = i + break + } + } + a, b = a[common:], b[common:] + if len(a) > 64 { + a = a[:64] + } + if len(b) > 64 { + b = b[:64] + } + t.Errorf("after %d common bytes, difference \nread: %x\nwrote:%x", common, a, b) + } + t.Fatalf("expect read %d bytes but got %d", wroteBytes, readBytes) + } + if readHash.Sum32() != wroteHash.Sum32() { + t.Fatalf("expect read hash 0x%08x but got 0x%08x", readHash.Sum32(), wroteHash.Sum32()) + } +} + +func TestRingBuffer_ByteInterface(t *testing.T) { + defer timeout(5 * time.Second)() + rb := New(2) + + // write one + err := rb.WriteByte('a') + if err != nil { + t.Fatalf("WriteByte failed: %v", err) + } + if rb.Length() != 1 { + t.Fatalf("expect len 1 byte but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 1 { + t.Fatalf("expect free 1 byte but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte{'a'}) { + t.Fatalf("expect a but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + + // write to, isFull + err = rb.WriteByte('b') + if err != nil { + t.Fatalf("WriteByte failed: %v", err) + } + if rb.Length() != 2 { + t.Fatalf("expect len 2 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 byte but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte{'a', 'b'}) { + t.Fatalf("expect a but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if !rb.IsFull() { + t.Fatalf("expect IsFull is true but got false") + } + + // write + err = rb.WriteByte('c') + if err == nil { + t.Fatalf("expect ErrIsFull but got nil") + } + if rb.Length() != 2 { + t.Fatalf("expect len 2 bytes but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 0 { + t.Fatalf("expect free 0 byte but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte{'a', 'b'}) { + t.Fatalf("expect a but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if !rb.IsFull() { + t.Fatalf("expect IsFull is true but got false") + } + + // read one + b, err := rb.ReadByte() + if err != nil { + t.Fatalf("ReadByte failed: %v", err) + } + if b != 'a' { + t.Fatalf("expect a but got %c. r.w=%d, r.r=%d", b, rb.w, rb.r) + } + if rb.Length() != 1 { + t.Fatalf("expect len 1 byte but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 1 { + t.Fatalf("expect free 1 byte but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + if !bytes.Equal(rb.Bytes(nil), []byte{'b'}) { + t.Fatalf("expect a but got %s. r.w=%d, r.r=%d", rb.Bytes(nil), rb.w, rb.r) + } + // check empty or full + if rb.IsEmpty() { + t.Fatalf("expect IsEmpty is false but got true") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + + // read two, empty + b, err = rb.ReadByte() + if err != nil { + t.Fatalf("ReadByte failed: %v", err) + } + if b != 'b' { + t.Fatalf("expect b but got %c. r.w=%d, r.r=%d", b, rb.w, rb.r) + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 byte but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 2 { + t.Fatalf("expect free 2 byte but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + // check empty or full + if !rb.IsEmpty() { + t.Fatalf("expect IsEmpty is true but got false") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } + + // read three, error + _, err = rb.ReadByte() + if err == nil { + t.Fatalf("expect ErrIsEmpty but got nil") + } + if rb.Length() != 0 { + t.Fatalf("expect len 0 byte but got %d. r.w=%d, r.r=%d", rb.Length(), rb.w, rb.r) + } + if rb.Free() != 2 { + t.Fatalf("expect free 2 byte but got %d. r.w=%d, r.r=%d", rb.Free(), rb.w, rb.r) + } + // check empty or full + if !rb.IsEmpty() { + t.Fatalf("expect IsEmpty is true but got false") + } + if rb.IsFull() { + t.Fatalf("expect IsFull is false but got true") + } +} + +func TestRingBufferCloseError(t *testing.T) { + type testError1 struct{ error } + type testError2 struct{ error } + + rb := New(100) + rb.CloseWithError(testError1{}) + if _, err := rb.Write(nil); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + if _, err := rb.Write([]byte{1}); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + if err := rb.WriteByte(0); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + if _, err := rb.TryWrite(nil); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + if _, err := rb.TryWrite([]byte{1}); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + if err := rb.TryWriteByte(0); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + if err := rb.Flush(); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + + rb.CloseWithError(testError2{}) + if _, err := rb.Write(nil); err != (testError1{}) { + t.Errorf("Write error: got %T, want testError1", err) + } + + rb.Reset() + rb.CloseWithError(testError1{}) + if _, err := rb.Read(nil); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.Read([]byte{0}); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.ReadByte(); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.TryRead(nil); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.TryRead([]byte{0}); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + rb.CloseWithError(testError2{}) + if _, err := rb.Read(nil); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.Read([]byte{0}); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.ReadByte(); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.TryRead(nil); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } + if _, err := rb.TryRead([]byte{0}); err != (testError1{}) { + t.Errorf("Read error: got %T, want testError1", err) + } +} + +func TestRingBufferCloseErrorUnblocks(t *testing.T) { + const sz = 100 + rb := New(sz).SetBlocking(true) + + testCancel := func(fn func()) { + t.Helper() + defer timeout(5 * time.Second)() + rb.Reset() + done := make(chan struct{}) + go func() { + defer close(done) + time.Sleep(10 * time.Millisecond) + fn() + }() + rb.CloseWithError(errors.New("test error")) + <-done + + rb.Reset() + done = make(chan struct{}) + go func() { + defer close(done) + fn() + }() + time.Sleep(10 * time.Millisecond) + rb.CloseWithError(errors.New("test error")) + <-done + } + testCancel(func() { + rb.Write([]byte{sz + 5: 1}) + }) + testCancel(func() { + rb.Write(make([]byte, sz)) + rb.WriteByte(0) + }) + testCancel(func() { + rb.Read([]byte{10: 1}) + }) + testCancel(func() { + rb.ReadByte() + }) + testCancel(func() { + rb.Write(make([]byte, sz)) + rb.Flush() + }) +} + +func TestWriteAfterWriterClose(t *testing.T) { + rb := New(100).SetBlocking(true) + + done := make(chan error) + go func() { + defer close(done) + _, err := rb.Write([]byte("hello")) + if err != nil { + t.Errorf("got error: %q; expected none", err) + } + rb.CloseWriter() + _, err = rb.Write([]byte("world")) + done <- err + err = rb.WriteByte(0) + done <- err + _, err = rb.TryWrite([]byte("world")) + done <- err + err = rb.TryWriteByte(0) + done <- err + }() + + buf := make([]byte, 100) + n, err := io.ReadFull(rb, buf) + if err != nil && err != io.ErrUnexpectedEOF { + t.Fatalf("got: %q; want: %q", err, io.ErrUnexpectedEOF) + } + for writeErr := range done { + if writeErr != ErrWriteOnClosed { + t.Errorf("got: %q; want: %q", writeErr, ErrWriteOnClosed) + } else { + t.Log("ok") + } + } + result := string(buf[0:n]) + if result != "hello" { + t.Errorf("got: %q; want: %q", result, "hello") + } +} + +func timeout(after time.Duration) (cancel func()) { + c := time.After(after) + cc := make(chan struct{}) + go func() { + select { + case <-cc: + return + case <-c: + buf := make([]byte, 1<<20) + stacklen := runtime.Stack(buf, true) + fmt.Printf("=== Timeout, assuming deadlock ===\n*** goroutine dump...\n%s\n*** end\n", string(buf[:stacklen])) + os.Exit(2) + } + }() + return func() { + close(cc) + } +}