diff --git a/logtail/iopipe/ephemeral.go b/logtail/iopipe/ephemeral.go new file mode 100644 index 000000000..f87bd60ae --- /dev/null +++ b/logtail/iopipe/ephemeral.go @@ -0,0 +1,115 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package iopipe + +import ( + "cmp" + "io" + "sync" +) + +// EphemeralBuffer in an in-memory implementation of [Buffer]. +// The zero value is an empty buffer ready for use. +type EphemeralBuffer struct { + mu sync.Mutex + buf []byte // unread data is in buf[idx:] + idx int + waiter chan struct{} +} + +// Len reports the size of the buffer, +// which is the number of written, but unread bytes. +func (b *EphemeralBuffer) Len() int64 { + b.mu.Lock() + defer b.mu.Unlock() + return int64(len(b.buf[b.idx:])) +} + +// Write writes data to the end of the buffer, +// incrementing Len by the amount of bytes written. +func (b *EphemeralBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + b.buf = append(b.buf, p...) + + // Check if there are any waiters to wake up. + if len(p) > 0 && b.waiter != nil { + close(b.waiter) + b.waiter = nil + } + return len(p), nil +} + +// Read reads data from the front of the buffer, +// decrementing Len by the amount of bytes read. +// When the buffer is empty, it returns [io.EOF]. +func (b *EphemeralBuffer) Read(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + p2, peekErr := b.peekLocked(len(p)) + n, discErr := b.discardLocked(copy(p, p2)) + return n, cmp.Or(discErr, peekErr) +} + +// Peek peeks n bytes from the front of the buffer. +// The buffer is only valid until the next Read, Peek, or Discard call. +// It reports an error if the buffer length is less than n. +func (b *EphemeralBuffer) Peek(n int) ([]byte, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.peekLocked(n) +} + +// Discard discards n bytes from the front of the buffer, +// decrementing Len by the amount of bytes discarded. +// It reports an error if the number of discard bytes is less than n. +func (b *EphemeralBuffer) Discard(n int) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.discardLocked(n) +} + +// peekLocked implements Peek while mu is already held. +func (b *EphemeralBuffer) peekLocked(n int) ([]byte, error) { + switch data := b.buf[b.idx:]; { + case n < 0: + return nil, wrapError("peek", errNegative) + case n <= len(data): + return data[:n], nil + default: + return data, io.EOF + } +} + +// discardLocked implements Discard while mu is already held. +func (b *EphemeralBuffer) discardLocked(n int) (int, error) { + // Use peek to determine the available bytes to discard + // and discard by incrementing idx. + p, err := b.peekLocked(n) + err = wrapError("discard", err) // remains nil if already nil + b.idx += len(p) + + // If enough of the buffer has already been read, + // then move the data to the front. + if b.idx > len(b.buf)/2 { // more than half the buffer is already read + // TODO: Allow shrinking the buffer if unused enough? + m := copy(b.buf[:cap(b.buf)], b.buf[b.idx:]) // copy data to the front + b.buf = b.buf[:m] + b.idx = 0 + } + + return len(p), err +} + +// Wait returns channel that is closed when the buffer is non-empty. +func (b *EphemeralBuffer) Wait() <-chan struct{} { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.buf[b.idx:]) > 0 { + return alreadyClosed // data is available + } else if b.waiter == nil { + b.waiter = make(chan struct{}) + } + return b.waiter +} diff --git a/logtail/iopipe/iopipe.go b/logtail/iopipe/iopipe.go new file mode 100644 index 000000000..e4573400e --- /dev/null +++ b/logtail/iopipe/iopipe.go @@ -0,0 +1,98 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package iopipe provides a ring buffer for writing and reading bytes. +package iopipe + +import ( + "bytes" + "errors" + "fmt" + "io" +) + +// Buffer is a ring buffer semantically similar to a [bytes.Buffer]. +// It is an infinitely sized buffer, so it is the application's +// responsibility to drain and/or avoid writing if it is too full. +// It does not provide any form of message framing, +// which is the responsibility of the application logic. +// All methods must be safe for concurrent use. +type Buffer interface { + // Len reports the size of the buffer, + // which is the number of written, but unread bytes. + Len() int64 + + // Write writes data to the end of the buffer, + // incrementing Len by the amount of bytes written. + // Concurrent Write calls are atomically performed. + // Write does not block. + Write([]byte) (int, error) + + // Read reads data from the front of the buffer, + // decrementing Len by the amount of bytes read. + // It cannot read partially written data for a concurrent Write call. + // Rather than blocking, it returns [io.EOF] when the buffer is empty. + Read([]byte) (int, error) + + // Peek peeks n bytes from the front of the buffer + // without affecting the read offset or changing the Len. + // It cannot peek partially written data for a concurrent Write call. + // The buffer is only valid until the next Read, Peek, or Discard call. + // It reports an error if the buffer length is less than n. + // If n is greater than Len, then the error is usually [io.EOF]. + Peek(n int) ([]byte, error) + + // Discard discards n bytes from the front of the buffer, + // decrementing Len by the amount of bytes discarded. + // It reports an error if the number of discard bytes is less than n. + Discard(n int) (int, error) + + // Wait returns channel that is closed when the buffer is non-empty. + Wait() <-chan struct{} +} + +var alreadyClosed = func() chan struct{} { + c := make(chan struct{}) + close(c) + return c +}() + +var ( + _ bytes.Buffer // for godoc hot-linking + + // Statically verify concrete implementations against interface. + _ Buffer = (*PersistentBuffer)(nil) + _ Buffer = (*EphemeralBuffer)(nil) +) + +var ( + errClosed = errors.New("closed buffer") + errNegative = errors.New("negative count") +) + +type iopipeError struct { + op string + err error +} + +func wrapError(op string, err error) error { + if err == nil || err == io.EOF { + return err + } + if e, ok := err.(*iopipeError); ok { + err = e.err // avoid double wrapping + } + return &iopipeError{op: op, err: err} +} + +func (e *iopipeError) Error() string { + if e.op == "" { + return fmt.Sprintf("iopipe: %v", e.err) + } else { + return fmt.Sprintf("iopipe %s: %v", e.op, e.err) + } +} + +func (e *iopipeError) Unwrap() error { + return e.err +} diff --git a/logtail/iopipe/iopipe_test.go b/logtail/iopipe/iopipe_test.go new file mode 100644 index 000000000..9b864c317 --- /dev/null +++ b/logtail/iopipe/iopipe_test.go @@ -0,0 +1,240 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package iopipe + +import ( + "bytes" + "encoding/binary" + "flag" + "io" + "io/fs" + "math/rand/v2" + "os" + "path/filepath" + "runtime" + "slices" + "sync" + "testing" + + "github.com/google/go-cmp/cmp" + "tailscale.com/types/bools" + "tailscale.com/util/must" +) + +// testFile implements [file], but allows override methods of [osFile]. +type testFile struct { + file + + stat func() (fs.FileInfo, error) + writeAt func([]byte, int64) (int, error) + readAt func([]byte, int64) (int, error) + truncate func(int64) error + close func() error +} + +func (f testFile) Stat() (fs.FileInfo, error) { + return bools.IfElse(f.stat != nil, f.stat, f.file.Stat)() +} +func (f testFile) WriteAt(b []byte, p int64) (int, error) { + return bools.IfElse(f.writeAt != nil, f.writeAt, f.file.WriteAt)(b, p) +} +func (f testFile) ReadAt(b []byte, p int64) (int, error) { + return bools.IfElse(f.readAt != nil, f.readAt, f.file.ReadAt)(b, p) +} +func (f testFile) Truncate(len int64) error { + return bools.IfElse(f.truncate != nil, f.truncate, f.file.Truncate)(len) +} +func (f testFile) Close() error { + return bools.IfElse(f.close != nil, f.close, f.file.Close)() +} + +func mustOpenPersistent(t *testing.T) *PersistentBuffer { + fp := filepath.Join(t.TempDir(), "file") + var f file = must.Get(os.OpenFile(fp, os.O_RDWR|os.O_CREATE, 0600)) + if testing.Verbose() { + f0 := f + f = testFile{file: f0, + writeAt: func(b []byte, p int64) (int, error) { + n, err := f0.WriteAt(b, p) + if n != len(b) || err != nil { + t.Logf("WriteAt(pos:%d, len:%d) = (%v, %v)", p, len(b), n, err) + } else if uint64(len(b)) != offsetsSize || p != 0 { + t.Logf("WriteAt(pos:%d, len:%d)", p, len(b)) + } else { + t.Logf("WriteOffsets(rd:%d, wr:%d)", int64(binary.LittleEndian.Uint64(b[:8])), int64(binary.LittleEndian.Uint64(b[8:]))) + } + return n, err + }, + readAt: func(b []byte, p int64) (int, error) { + n, err := f0.ReadAt(b, p) + if n != len(b) || err != nil { + t.Logf("ReadAt(pos:%d, len:%d) = (%v, %v)", p, len(b), n, err) + } else if uint64(len(b)) != offsetsSize || p != 0 { + t.Logf("ReadAt(pos:%d, len:%d)", p, len(b)) + } else { + t.Logf("ReadOffsets() = (rd:%d, wr:%d)", int64(binary.LittleEndian.Uint64(b[:8])), int64(binary.LittleEndian.Uint64(b[8:]))) + } + return n, err + }, + truncate: func(p int64) error { + err := f0.Truncate(p) + if err == nil { + t.Logf("Truncate(pos:%d)", p) + } else { + t.Logf("Truncate(pos:%d) = (%v)", p, err) + } + return err + }, + } + } + b := must.Get(newPersistent(f)) + t.Cleanup(func() { b.Close() }) + return b +} + +func testAll(t *testing.T, f func(t *testing.T, b Buffer)) { + t.Run("Ephemeral", func(t *testing.T) { f(t, new(EphemeralBuffer)) }) + t.Run("Persistent", func(t *testing.T) { f(t, mustOpenPersistent(t)) }) +} + +var streamTestLength = flag.Int64("buffer-stream-size", 1<<20, "number of bytes to stream") + +func TestBufferStream(t *testing.T) { + testAll(t, func(t *testing.T, b Buffer) { + maxSize := *streamTestLength + var group sync.WaitGroup + defer group.Wait() + group.Go(func() { + var written int64 + var data []byte + stream := rand.NewChaCha8([32]byte{}) + for written < maxSize { + n := rand.IntN(1 << 16) + data = slices.Grow(data[:0], n)[:n] + must.Get(stream.Read(data)) + m := must.Get(b.Write(data)) + if n != m { + t.Fatalf("Write = %v, want %v", m, n) + } + written += int64(n) + runtime.Gosched() + } + }) + group.Go(func() { + var read, maxLen int64 + var got, want []byte + stream := rand.NewChaCha8([32]byte{}) + for read < maxSize { + blen := b.Len() + maxLen = max(maxLen, blen) + nn := rand.IntN(1 + int(min(3*blen/2, 1<<20))) + noEOF := rand.IntN(2) == 0 + if noEOF && int64(nn) > blen { + nn = int(blen) // reading up to Buffer.Len should never report EOF + } + want = slices.Grow(want[:0], nn)[:nn] + switch rand.IntN(3) { + case 0: // Read + got = slices.Grow(got[:0], nn)[:nn] + n, err := b.Read(got) + if err != nil && (noEOF || err != io.EOF) { + t.Fatalf("Read error: %v", err) + } else if err == nil && n != nn { + t.Fatalf("Read = %d, want %d", n, nn) + } + must.Get(stream.Read(want[:n])) + if !bytes.Equal(got[:n], want[:n]) { + t.Fatalf("data mismatch:\n%s", cmp.Diff(got[:n], want[:n])) + } + read += int64(n) + case 1: // Peek+Discard + data, err := b.Peek(nn) + got = append(got[:0], data...) + if err != nil && (noEOF || err != io.EOF) { + t.Fatalf("Peek error: %v", err) + } else if err == nil && len(got) != nn { + t.Fatalf("Peek = %d, want %d", len(got), nn) + } + n, err := b.Discard(len(got)) + if err != nil { + t.Fatalf("Discard error: %v", err) + } else if n != len(got) { + t.Fatalf("Discard = %d, want %d", n, len(got)) + } + must.Get(stream.Read(want[:n])) + if !bytes.Equal(got[:n], want[:n]) { + t.Fatalf("data mismatch:\n%s", cmp.Diff(got[:n], want[:n])) + } + read += int64(n) + case 2: // Discard only + n, err := b.Discard(nn) + if err != nil && (noEOF || err != io.EOF) { + t.Fatalf("Discard error: %v", err) + } else if err == nil && n != nn { + t.Fatalf("Discard = %d, want %d", n, nn) + } + must.Get(stream.Read(want[:n])) + read += int64(n) + } + } + t.Logf("peak Buffer.Len: %d", maxLen) + }) + }) +} + +func TestPersistentRestart(t *testing.T) { + fp := filepath.Join(t.TempDir(), "file") + b := must.Get(OpenPersistent(fp)) + must.Get(b.Write(make([]byte, 100))) + want := "Hello, world!" + must.Get(b.Write([]byte(want))) + must.Get(b.Discard(100)) + must.Do(b.Close()) + b = must.Get(OpenPersistent(fp)) + got := string(must.Get(b.Peek(int(b.Len())))) + if got != want { + t.Errorf("Peek = %s, want %s", got, want) + } + must.Do(b.Close()) +} + +func TestBufferWait(t *testing.T) { + testAll(t, func(t *testing.T, b Buffer) { + var want [8]byte + for i := range 1000 { + binary.LittleEndian.PutUint64(want[:], uint64(i)) + go must.Get(b.Write(want[:])) + if i%2 == 0 { + runtime.Gosched() // increase probability of a race + } + select { + case <-b.Wait(): + got := must.Get(b.Peek(len(want))) + if !bytes.Equal(got, want[:]) { + t.Errorf("Peek = %x, want %x", got, want) + } + must.Get(b.Discard(len(want))) + case <-t.Context().Done(): + t.Fatalf("test timeout: %v", t.Context().Err()) + } + } + }) +} + +func TestBufferNoWait(t *testing.T) { + testAll(t, func(t *testing.T, b Buffer) { + done := make(chan struct{}) + go func() { + for range 1000 { + runtime.Gosched() + } + close(done) + }() + select { + case <-b.Wait(): + t.Fatalf("Wait unexpectedly closed early") + case <-done: + } + }) +} diff --git a/logtail/iopipe/persistent.go b/logtail/iopipe/persistent.go new file mode 100644 index 000000000..dbaa8909a --- /dev/null +++ b/logtail/iopipe/persistent.go @@ -0,0 +1,520 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package iopipe + +import ( + "cmp" + "encoding/binary" + "errors" + "io" + "io/fs" + "math" + "os" + "slices" + "sync" + "sync/atomic" + + "tailscale.com/types/bools" +) + +// file is general-purpose interface for a file. +type file interface { + Stat() (fs.FileInfo, error) + io.WriterAt + io.ReaderAt + Truncate(int64) error // used for compaction + io.Closer +} + +const offsetsSize = uint64(len(offsets{})) + +type offsets [16]byte // tuple of ReadOffset and WriteOffset + +func (o *offsets) ReadOffset() uint64 { return binary.LittleEndian.Uint64(o[0:]) } +func (o *offsets) WriteOffset() uint64 { return binary.LittleEndian.Uint64(o[8:]) } +func (o *offsets) PutReadOffset(n uint64) { binary.LittleEndian.PutUint64(o[0:], n) } +func (o *offsets) PutWriteOffset(n uint64) { binary.LittleEndian.PutUint64(o[8:], n) } + +// PersistentBuffer in an on-disk implementation of [Buffer]. +type PersistentBuffer struct { + // The on-disk format of the buffer is sequentially organized as: + // + // - ReadOffset: 64-bit little endian unsigned integer that + // contains the offset to the start of DataBuffer (inclusive). + // The offset should always be ≥ [offsetsSize] and ≤ WriteOffset. + // + // - WriteOffset: 64-bit little endian unsigned integer that + // contains the offset to the end of DataBuffer (exclusive). + // The offset should always be ≥ ReadOffset and ≤ the file size. + // As a special case, if this value is 0 or [math.MaxUint64], + // then it is implicitly the current file size. + // + // - FreeBuffer: A variable-length buffer that occupies space + // after the WriteOffset field until the offset in ReadOffset. + // The FreeBuffer contains already consumed data, + // where the actual content is not meaningful. + // As an optimization, the file may be sparse where FreeBuffer + // is mostly unallocated disk blocks. + // + // - DataBuffer: A variable-length buffer starting at the offset + // in ReadOffset and contains written, but unread data. + // Reads start at the beginning of buffer and + // ReadOffset is incremented by the amount of bytes read. + // Writes are appended to the end of the buffer starting at the + // offset in WriteOffset, which usually grows the size of the file. + // + // A naive implementation of file buffer can grow indefinitely + // due to the ever increasing size of FreeBuffer. + // Compaction is needed to reduce the file size: + // + // - In the simple case where ReadOffset equals WriteOffset, + // the ReadOffset and WriteOffset can both be set to [offsetsSize], + // and the file be truncated to [offsetsSize]. + // If successfully truncated, the WriteOffset may be set to [math.MaxUint64]. + // + // - If the underlying filesystem supports sparse files, + // a hole can be punched that covers the FreeBuffer range. + // With sparse files, it is technically fine if the file size grows + // indefinitely since the on-disk size is mainly the DataBuffer. + // However, a corrupted ReadOffset could end up causing the buffer + // to mistakenly report a massive number of zero bytes, + // so there is still wisdom in compacting eventually. + // + // - If size of DataBuffer is smaller than the FreeBuffer, + // then the content of DataBuffer can be copied to the start + // of FreeBuffer, ReadOffset set to [offsetsSize], and the file size + // truncated to the number of copied bytes plus [offsetsSize]. + // + // - The WriteOffset field is not strictly needed, + // but is useful for data resilience. + // Under normal operation, it will be set to [math.MaxUint64] + // and simply rely on the file size to determine the WriteOffset. + // However, compaction requires two non-atomic operations + // (updating the offset fields and file truncation). + // If the offsets are updated, but file truncation failed, + // then prior data may accidentally be "added" to the DataBuffer. + // Since it is highly likely that two adjacent offsets + // can be written atomically to disk, + // we can update both ReadOffset and WriteOffset together + // and use that to help protect against failed truncation. + + file file + closed atomic.Bool // set to true while holding both rdMu and wrMu + + // rdMu is held by Read, Peek, Discard, Wait, and Close. + rdMu sync.Mutex // may acquire wrMu while holding rdMu + rdPos atomic.Uint64 // may only decrement while holding both rdMu and wrMu + peekPos uint64 // offset into peekBuf + peekBuf []byte // contains file data at rdPos-peekPos + offsets offsets // offsets in the file + lastCompactPos uint64 // rdPos of when a compaction was last attempted + blockSize int64 // block size used by the file (best-effort) + + // wrMu is held by Len, Write, Discard, Wait, and Close. + wrMu sync.Mutex // must never acquire rdMu while holding wrMu + wrPos atomic.Uint64 // may only decrement while holding both rdMu and wrMu + waiter chan struct{} // closed by Write if non-nil + + // While more complicated, there are two different mutexes + // to minimize how often Read and Write may block each other. + // Some operations need to hold both mutexes. To avoid a deadlock, + // the wrMu must always be acquired after the rdMu. +} + +// OpenPersistent opens or creates a persistent [Buffer] +// backed on disk by a file located at path. +// The buffer must be closed to release resources. +func OpenPersistent(path string) (*PersistentBuffer, error) { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return nil, wrapError("open", err) + } + b, err := newPersistent(f) + if err != nil { + return nil, err + } + return b, nil +} + +// newPersistent constructs a new PersistentBuffer from the file. +// It takes ownership of closing the file. +func newPersistent(f file) (*PersistentBuffer, error) { + // Load the ReadOffset, WriteOffsets, fileSize, and blockSize. + b := &PersistentBuffer{file: f} + if _, err := readFullAt(b.file, b.offsets[:], 0); err != nil && err != io.ErrUnexpectedEOF { + f.Close() + return nil, wrapError("open", err) + } + fi, err := b.file.Stat() + if err != nil { + f.Close() + return nil, wrapError("open", err) + } + // TODO: Populate blockSize. + + // Enforce the following invariant: + // offsetsSize ≤ ReadOffset ≤ WriteOffset ≤ fileSize + fileSize := uint64(max(int64(offsetsSize), fi.Size())) // enforce fileSize against offsetSize, which is a constant + readOffset := clamp(offsetsSize, b.offsets.ReadOffset(), fileSize) + writeOffset := clamp(offsetsSize, cmp.Or(b.offsets.WriteOffset(), fileSize), fileSize) + readOffset = min(readOffset, writeOffset) + + // Always update the offsets (even if unchanged). + // This helps detect read-only files before they become a problem. + if err := b.truncateLocked(readOffset, writeOffset); err != nil { + f.Close() + return nil, wrapError("open", err) + } + return b, nil +} + +// Len reports the size of the buffer, +// which is the number of written, but unread bytes. +// It reports zero if the buffer is closed. +func (b *PersistentBuffer) Len() int64 { + b.wrMu.Lock() // generally faster to acquire + defer b.wrMu.Unlock() + return int64(b.wrPos.Load() - b.rdPos.Load()) // rdPos may increase asynchronously +} + +// Write writes data to the end of the buffer, +// incrementing Len by the amount of bytes written. +func (b *PersistentBuffer) Write(p []byte) (int, error) { + b.wrMu.Lock() + defer b.wrMu.Unlock() + if b.closed.Load() { + return 0, wrapError("write", errClosed) + } + n, err := b.file.WriteAt(p, int64(b.wrPos.Load())) // wrPos is stable + b.wrPos.Add(uint64(n)) + + // Check if there are any waiters to wake up. + if n > 0 && b.waiter != nil { + close(b.waiter) + b.waiter = nil + } + + return n, wrapError("write", err) // err remains nil if already nil +} + +// Read reads data from the front of the buffer, +// decrementing Len by the amount of bytes read. +// When the buffer is empty, it returns [io.EOF]. +func (b *PersistentBuffer) Read(p []byte) (int, error) { + b.rdMu.Lock() + defer b.rdMu.Unlock() + p2, peekErr := b.peekReadLocked(len(p)) + n, discErr := b.discardReadLocked(copy(p, p2)) + return n, cmp.Or(discErr, peekErr) +} + +// Peek peeks n bytes from the front of the buffer. +// The buffer is only valid until the next Read, Peek, or Discard call. +// It reports an error if the buffer length is less than n. +func (b *PersistentBuffer) Peek(n int) ([]byte, error) { + b.rdMu.Lock() + defer b.rdMu.Unlock() + return b.peekReadLocked(n) +} + +// Discard discards n bytes from the front of the buffer, +// decrementing Len by the amount of bytes discarded. +// It reports an error if the number of discard bytes is less than n. +func (b *PersistentBuffer) Discard(n int) (int, error) { + b.rdMu.Lock() + defer b.rdMu.Unlock() + return b.discardReadLocked(n) +} + +// peekReadLocked implements Peek while rdMu is already held. +func (b *PersistentBuffer) peekReadLocked(n int) ([]byte, error) { + switch { + case b.closed.Load(): + return nil, wrapError("peek", errClosed) + case n < 0: + return nil, wrapError("peek", errNegative) + } + + // Fill the peek buffer if necessary. + var rdErr error + peekBuf := b.peekBuf[min(b.peekPos, uint64(len(b.peekBuf))):] + if n > len(peekBuf) { + // Move data in peek buffer to the front. + m := copy(b.peekBuf[:cap(b.peekBuf)], peekBuf) + b.peekPos, b.peekBuf = 0, b.peekBuf[:m] + + // Read data into the peek buffer. + availData := max(0, int64(b.wrPos.Load()-b.rdPos.Load())-int64(len(b.peekBuf))) + b.peekBuf = slices.Grow(b.peekBuf, int(min(int64(n-len(peekBuf)), availData))) + m = int(min(int64(cap(b.peekBuf)-len(b.peekBuf)), availData)) + m, rdErr = readFullAt(b.file, b.peekBuf[len(b.peekBuf):cap(b.peekBuf)][:m], int64(b.rdPos.Load())+int64(len(b.peekBuf))) + rdErr = wrapError("peek", rdErr) // remains nil if already nil + b.peekBuf = b.peekBuf[:len(b.peekBuf)+m] + peekBuf = b.peekBuf + } + + // Return the available data in the peek buffer. + if n > len(peekBuf) { + return peekBuf, cmp.Or(rdErr, io.EOF) + } + return peekBuf[:n], nil +} + +// discardReadLocked implements Discard while rdMu is already held. +func (b *PersistentBuffer) discardReadLocked(n int) (m int, err error) { + switch { + case b.closed.Load(): + return 0, wrapError("discard", errClosed) + case n < 0: + return 0, wrapError("discard", errNegative) + } + + avail := max(0, int64(b.wrPos.Load()-b.rdPos.Load())) // wrPos may increase asynchronously + if int64(n) > avail { + n, err = int(avail), io.EOF + } + if n > 0 { + if err := b.updateOffsetsReadLocked(n); err != nil { + return 0, wrapError("discard", err) + } + if err := b.mayCompactReadLocked(); err != nil { + return n, wrapError("compact", err) + } + } + return n, err // either nil or [io.EOF] +} + +// errMoreData reports that the DataBuffer is non-empty. +var errMoreData = errors.New("more data available") + +// updateOffsetsReadLocked updates the offsets. +// The rdMu must already be held. +func (b *PersistentBuffer) updateOffsetsReadLocked(n int) error { + readOffset := b.rdPos.Load() + uint64(n) // rdPos is stable + + // Check if the file would be empty, in which case, just truncate. + if readOffset == b.wrPos.Load() { // wrPos may increase asynchronously + if err := func() error { + b.wrMu.Lock() // properly acquired after rdMu + defer b.wrMu.Unlock() + if readOffset == b.wrPos.Load() { // wrPos is stable + if err := b.truncateLocked(readOffset, b.wrPos.Load()); err != nil { + return err + } + b.peekPos, b.peekBuf = 0, b.peekBuf[:0] // invalidate peek buffer + return nil + } + return errMoreData + }(); (err != nil && err != errMoreData) || err == nil { + return err + } + } + + // Otherwise, we need to write the offsets. + offsetsOld := b.offsets + b.offsets.PutReadOffset(readOffset) + if b.offsets.WriteOffset() < math.MaxUint64 { + b.offsets.PutWriteOffset(b.wrPos.Load()) // wrPos may increase asynchronously + } + if _, err := b.file.WriteAt(b.offsets[:], 0); err != nil { + b.offsets = offsetsOld + return err + } + + // Update the offsets. + b.rdPos.Add(uint64(n)) + b.peekPos += uint64(n) // invalidate leading bytes of peekBuf + return nil +} + +// mayCompactReadLocked optionally compacts the file. +// The rdMu must already be held. +func (b *PersistentBuffer) mayCompactReadLocked() error { + // Always trying to compact for every read could be expensive. + // Similar to GOGC, only attempt compaction when the FreeBuffer + // grows by some fraction (chosen default is 25%). + // + // Also, skip compaction if the entire file fits in a single block, + // since it will generally occupy the same amount of disk space. + singleBlock := b.wrPos.Load() <= clamp(1<<12, uint64(b.blockSize), 1<<20) + compactedRecently := b.rdPos.Load() < 5*b.lastCompactPos/4 // rdPos is stable + if singleBlock || compactedRecently { + return nil + } + + freeLen := max(0, int64(b.rdPos.Load()-offsetsSize)) // rdPos is stable + dataLen := max(0, int64(b.wrPos.Load()-b.rdPos.Load())) // wrPos may increase asynchronously + + // Rely on hole-punching to reclaim disk space. + // If the file supports sparse holes, then we can tolerate a higher + // logical file size since the physical size on disk is smaller. + if freeLen < 16*dataLen && int64(b.rdPos.Load()) > 2*b.blockSize && b.blockSize > 0 { + // TODO: Implement support for punching holes. + } + + // Move the data to the front of the file. + // Ensure there is notably more free space than data to reduce + // probability that data grows beyond free space while copying. + if freeLen > 3*dataLen/2 { + if err := b.copyingCompactReadLocked(); err != nil { + return err + } + } + + return nil +} + +// errNoSpace reports that the DataBuffer is larger than the FreeBuffer. +// This an internal error and should not be exposed to the external API. +var errNoSpace = errors.New("insufficient free space") + +// copyingCompactReadLocked copies the DataBuffer into the FreeBuffer +// and updates the ReadOffset and WriteOffset. +func (b *PersistentBuffer) copyingCompactReadLocked() error { + // Copy DataBuffer to FreeBuffer on a block-by-block basis. + var blockBuffer [1 << 12]byte // TODO: Pool this? + dstPos := uint64(offsetsSize) + srcPos := b.rdPos.Load() + for { + if err := func() (err error) { + // If this seems like the last block, acquire wrMu beforehand + // to ensure that copying does not race with concurrent Writes. + // Thus, we can know for certain that this is truly the last block. + availData := int64(b.wrPos.Load() - srcPos) // wrPos may increase asynchronously + if availData <= int64(len(blockBuffer)) { + b.wrMu.Lock() // properly acquired after rdMu + defer b.wrMu.Unlock() + + // After copying the last block, update the offsets. + defer func() { + availData = int64(b.wrPos.Load() - srcPos) // wrPos is stable + if err != nil || availData != 0 { + return // still more data to copy + } + dataLen := b.wrPos.Load() - b.rdPos.Load() + err = cmp.Or(b.truncateLocked(dstPos-dataLen, dstPos), io.EOF) + }() + } + + // Read a block from the DataBuffer. + availData = int64(b.wrPos.Load() - srcPos) // wrPos may increase asynchronously unless wrMu is held + n := int(min(int64(len(blockBuffer)), availData)) + if _, err := readFullAt(b.file, blockBuffer[:n], int64(srcPos)); err != nil { + return err + } + srcPos += uint64(n) // should never run past b.wrPos + + // Write a block into the FreeBuffer. + availFree := int64(b.rdPos.Load() - dstPos) // rdPos may increase asynchronously unless rdMu is held + if availData > availFree { + return errNoSpace + } + if _, err := b.file.WriteAt(blockBuffer[:n], int64(dstPos)); err != nil { + return err + } + dstPos += uint64(n) // should never run past b.rdPos + + return nil + }(); err != nil { + return bools.IfElse(err != errNoSpace && err != io.EOF, err, nil) + } + } +} + +// truncateLocked truncates the file according the specified offsets. +// Both rdMu and wrMu must be held. +func (b *PersistentBuffer) truncateLocked(readOffset, writeOffset uint64) error { + // Special-case: If all data is read, then just truncate the file. + // This reduces IO operations from 3 down to 1. + if readOffset == writeOffset { + if err := b.file.Truncate(0); err != nil { + return err + } + b.offsets.PutReadOffset(offsetsSize) + b.offsets.PutWriteOffset(math.MaxUint64) + b.rdPos.Store(offsetsSize) + b.wrPos.Store(offsetsSize) + b.lastCompactPos = offsetsSize + return nil + } + + // Step 1: Update both offsets. + // A modern disk should be able to update both offsets atomically. + offsetsOld := b.offsets + b.offsets.PutReadOffset(readOffset) + b.offsets.PutWriteOffset(writeOffset) + if _, err := b.file.WriteAt(b.offsets[:], 0); err != nil { + b.offsets = offsetsOld + return err + } + b.rdPos.Store(readOffset) // only time rdPos is possibly decremented + b.wrPos.Store(writeOffset) // only time wrPos is possibly decremented + b.lastCompactPos = readOffset + + // Step 2: Truncate the file. + // If this fails, then WriteOffset holds the real file size, + // allowing OpenPersistent to reliably resume the file. + if err := b.file.Truncate(int64(b.wrPos.Load())); err != nil { + return err + } + + // Step 3: Update WriteOffset to use the file size. + // Since the file was successfully truncated, + // we can rely of the file size to implicitly be the WriteOffset. + offsetsOld = b.offsets + b.offsets.PutWriteOffset(math.MaxUint64) // use file size as WriteOffset + if _, err := b.file.WriteAt(b.offsets[:], 0); err != nil { + b.offsets = offsetsOld + return err + } + + return nil +} + +// Wait returns channel that is closed when the buffer is non-empty +// or when the buffer itself is closed. +func (b *PersistentBuffer) Wait() <-chan struct{} { + b.rdMu.Lock() + defer b.rdMu.Unlock() + b.wrMu.Lock() // properly acquired after rdMu + defer b.wrMu.Unlock() + if b.closed.Load() || b.wrPos.Load() > b.rdPos.Load() { // both wrPos and rdPos are stable + return alreadyClosed // already closed or data is available + } else if b.waiter == nil { + b.waiter = make(chan struct{}) + } + return b.waiter +} + +// Close closes the buffer. +func (b *PersistentBuffer) Close() error { + b.rdMu.Lock() + defer b.rdMu.Unlock() + b.wrMu.Lock() // properly acquired after rdMu + defer b.wrMu.Unlock() + if b.closed.Load() { + return wrapError("close", errors.New("buffer already closed")) + } + b.closed.Store(true) + if b.waiter != nil { + close(b.waiter) + b.waiter = nil + } + return wrapError("close", b.file.Close()) +} + +// readFullAt is like ReadAt except it +// converts [io.EOF] to [io.ErrUnexpectedEOF] unless all of b is read. +func readFullAt(r io.ReaderAt, b []byte, pos int64) (int, error) { + n, err := r.ReadAt(b, pos) + if err == io.EOF { + err = bools.IfElse(n < len(b), io.ErrUnexpectedEOF, nil) + } + return n, err +} + +// clamp clamps val to be within lo and hi, inclusive. +func clamp[T cmp.Ordered](lo, val, hi T) T { + return min(max(lo, val), hi) +}