mirror of
https://github.com/tailscale/tailscale.git
synced 2026-02-10 10:12:27 +01:00
In the event of multiple Filch intances being backed by the same file, it is possible that concurrent rotateLocked calls occur. One operation might clear the file, resulting in another skipping the call to resetReadBuffer, resulting in a later panic because the read index is invalid. To at least avoid the panic, always call resetReadBuffer. Note that the behavior of Filch is undefined when using the same file. While this avoids the panic, we may still experience data corruption or less. Fixes #18552 Signed-off-by: Joe Tsai <joetsai@digital-static.net>
407 lines
11 KiB
Go
407 lines
11 KiB
Go
// Copyright (c) Tailscale Inc & contributors
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package filch
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand/v2"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
jsonv2 "github.com/go-json-experiment/json"
|
|
"tailscale.com/tstest"
|
|
"tailscale.com/util/must"
|
|
)
|
|
|
|
func init() { alwaysStatForTests = true }
|
|
|
|
type filchTest struct {
|
|
*Filch
|
|
|
|
filePrefix string
|
|
}
|
|
|
|
func newForTest(t *testing.T, filePrefix string, opts Options) *filchTest {
|
|
t.Helper()
|
|
if filePrefix == "" {
|
|
filePrefix = filepath.Join(t.TempDir(), "testlog")
|
|
}
|
|
f, err := New(filePrefix, opts)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if err := f.Close(); err != nil {
|
|
t.Errorf("Close error: %v", err)
|
|
}
|
|
})
|
|
return &filchTest{Filch: f, filePrefix: filePrefix}
|
|
}
|
|
|
|
func (f *filchTest) read(t *testing.T, want []byte) {
|
|
t.Helper()
|
|
if got, err := f.TryReadLine(); err != nil {
|
|
t.Fatalf("TryReadLine error: %v", err)
|
|
} else if string(got) != string(want) {
|
|
t.Errorf("TryReadLine = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestNew(t *testing.T) {
|
|
const want1 = "Lorem\nipsum\ndolor\nsit\namet,\nconsectetur\nadipiscing\nelit,\nsed\n"
|
|
const want2 = "do\neiusmod\ntempor\nincididunt\nut\nlabore\net\ndolore\nmagna\naliqua.\n"
|
|
filePrefix := filepath.Join(t.TempDir(), "testlog")
|
|
checkLinesAndCleanup := func() {
|
|
t.Helper()
|
|
defer os.Remove(filepath.Join(filePrefix + ".log1.txt"))
|
|
defer os.Remove(filepath.Join(filePrefix + ".log2.txt"))
|
|
f := newForTest(t, filePrefix, Options{})
|
|
var got []byte
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if b == nil {
|
|
break
|
|
}
|
|
got = append(got, b...)
|
|
}
|
|
if string(got) != want1+want2 {
|
|
t.Errorf("got %q\nwant %q", got, want1+want2)
|
|
}
|
|
}
|
|
now := time.Now()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
checkLinesAndCleanup()
|
|
}
|
|
|
|
func setupStderr(t *testing.T) {
|
|
t.Helper()
|
|
pipeR, pipeW, err := os.Pipe()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() { pipeR.Close() })
|
|
t.Cleanup(func() {
|
|
switch b, err := io.ReadAll(pipeR); {
|
|
case err != nil:
|
|
t.Fatalf("ReadAll error: %v", err)
|
|
case len(b) > 0:
|
|
t.Errorf("unexpected write to fake stderr: %s", b)
|
|
}
|
|
})
|
|
t.Cleanup(func() { pipeW.Close() })
|
|
tstest.Replace(t, &stderrFD, int(pipeW.Fd()))
|
|
tstest.Replace(t, &os.Stderr, pipeW)
|
|
}
|
|
|
|
func TestConcurrentWriteAndRead(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
setupStderr(t)
|
|
}
|
|
|
|
const numWriters = 10
|
|
const linesPerWriter = 1000
|
|
opts := Options{ReplaceStderr: replaceStderrSupportedForTest, MaxFileSize: math.MaxInt32}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Concurrently write many lines.
|
|
var draining sync.RWMutex
|
|
var group sync.WaitGroup
|
|
defer group.Wait()
|
|
data := bytes.Repeat([]byte("X"), 1000)
|
|
var runningWriters atomic.Int64
|
|
for i := range numWriters {
|
|
runningWriters.Add(+1)
|
|
group.Go(func() {
|
|
defer runningWriters.Add(-1)
|
|
var b []byte
|
|
for j := range linesPerWriter {
|
|
b = fmt.Appendf(b[:0], `{"Index":%d,"Count":%d,"Data":"%s"}`+"\n", i+1, j+1, data[:rand.IntN(len(data))])
|
|
draining.RLock()
|
|
if i%2 == 0 && opts.ReplaceStderr {
|
|
stderrWriteForTest(b)
|
|
} else {
|
|
must.Get(f.Write(b))
|
|
}
|
|
draining.RUnlock()
|
|
runtime.Gosched()
|
|
}
|
|
})
|
|
}
|
|
|
|
// Verify that we can read back the lines in an ordered manner.
|
|
var lines int
|
|
var entry struct{ Index, Count int }
|
|
state := make(map[int]int)
|
|
checkLine := func() (ok bool) {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
return false
|
|
}
|
|
entry.Index, entry.Count = 0, 0
|
|
if err := jsonv2.Unmarshal(b, &entry); err != nil {
|
|
t.Fatalf("json.Unmarshal error: %v", err)
|
|
}
|
|
if wantCount := state[entry.Index] + 1; entry.Count != wantCount {
|
|
t.Fatalf("Index:%d, Count = %d, want %d", entry.Index, entry.Count, wantCount)
|
|
}
|
|
state[entry.Index] = entry.Count
|
|
lines++
|
|
return true
|
|
}
|
|
for lines < numWriters*linesPerWriter {
|
|
writersDone := runningWriters.Load() == 0
|
|
for range rand.IntN(100) {
|
|
runtime.Gosched() // bias towards more writer operations
|
|
}
|
|
|
|
if rand.IntN(100) == 0 {
|
|
// Asynchronous read of a single line.
|
|
if !checkLine() && writersDone {
|
|
t.Fatal("failed to read all lines after all writers done")
|
|
}
|
|
} else {
|
|
// Synchronous reading of all lines.
|
|
draining.Lock()
|
|
for checkLine() {
|
|
}
|
|
draining.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test that the
|
|
func TestBufferCapacity(t *testing.T) {
|
|
f := newForTest(t, "", Options{})
|
|
b := bytes.Repeat([]byte("X"), 1000)
|
|
for range 1000 {
|
|
must.Get(f.Write(b[:rand.IntN(len(b))]))
|
|
}
|
|
for must.Get(f.TryReadLine()) != nil {
|
|
}
|
|
if !(10*len(b) < cap(f.rdBuf) && cap(f.rdBuf) < 20*len(b)) {
|
|
t.Errorf("cap(rdBuf) = %v, want within [%v:%v]", cap(f.rdBuf), 10*len(b), 20*len(b))
|
|
}
|
|
|
|
must.Get(f.Write(bytes.Repeat([]byte("X"), DefaultMaxLineSize-1)))
|
|
must.Get(f.TryReadLine())
|
|
wrCap, rdCap := cap(f.wrBuf), cap(f.rdBuf)
|
|
|
|
// Force another rotation. Buffers should not be GC'd yet.
|
|
must.Get(f.TryReadLine())
|
|
if cap(f.wrBuf) != wrCap {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), wrCap)
|
|
}
|
|
if cap(f.rdBuf) != rdCap {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), rdCap)
|
|
}
|
|
|
|
// Force many rotations. Buffers should be GC'd.
|
|
for range 64 {
|
|
t.Logf("cap(f.wrBuf), cap(f.rdBuf) = %d, %d", cap(f.wrBuf), cap(f.rdBuf))
|
|
must.Get(f.TryReadLine())
|
|
}
|
|
if cap(f.wrBuf) != 0 {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), 0)
|
|
}
|
|
if cap(f.rdBuf) != 0 {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), 0)
|
|
}
|
|
}
|
|
|
|
func TestMaxLineSize(t *testing.T) {
|
|
const maxLineSize = 1000
|
|
f := newForTest(t, "", Options{MaxLineSize: maxLineSize})
|
|
|
|
// Test writing.
|
|
b0 := []byte(strings.Repeat("X", maxLineSize-len("\n")) + "\n")
|
|
must.Get(f.Write(b0))
|
|
b1 := []byte(strings.Repeat("X", maxLineSize))
|
|
if _, err := f.Write(b1); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
b2 := bytes.Repeat(b0, 2)
|
|
must.Get(f.Write(b2))
|
|
if f.storedBytesForTest() != int64(len(b0)+len(b2)) {
|
|
t.Errorf("storedBytes = %v, want %v", f.storedBytesForTest(), int64(len(b0)+len(b2)))
|
|
}
|
|
|
|
// Test reading.
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, nil) // should trigger rotate
|
|
if f.storedBytesForTest() != 0 {
|
|
t.Errorf("storedBytes = %v, want 0", f.storedBytesForTest())
|
|
}
|
|
|
|
// Test writing
|
|
must.Get(f.Write([]byte("hello")))
|
|
must.Get(f.Write(b0))
|
|
must.Get(f.Write([]byte("goodbye")))
|
|
|
|
// Test reading.
|
|
f.Close()
|
|
f = newForTest(t, f.filePrefix, Options{MaxLineSize: 10})
|
|
f.read(t, []byte("hello\n"))
|
|
if _, err := f.TryReadLine(); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
f.read(t, []byte("goodbye\n"))
|
|
|
|
// Check that the read buffer does not need to be as long
|
|
// as the overly long line to skip over it.
|
|
if cap(f.rdBuf) >= maxLineSize/2 {
|
|
t.Errorf("cap(rdBuf) = %v, want <%v", cap(f.rdBuf), maxLineSize/2)
|
|
}
|
|
}
|
|
|
|
func TestMaxFileSize(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
t.Run("ReplaceStderr:true", func(t *testing.T) { testMaxFileSize(t, true) })
|
|
}
|
|
t.Run("ReplaceStderr:false", func(t *testing.T) { testMaxFileSize(t, false) })
|
|
}
|
|
|
|
func testMaxFileSize(t *testing.T, replaceStderr bool) {
|
|
if replaceStderr {
|
|
setupStderr(t)
|
|
}
|
|
|
|
opts := Options{ReplaceStderr: replaceStderr, MaxFileSize: 1000}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Write lots of data.
|
|
const calls = 1000
|
|
var group sync.WaitGroup
|
|
var filchedBytes, writeBytes int64
|
|
group.Go(func() {
|
|
if !opts.ReplaceStderr {
|
|
return
|
|
}
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"FilchIndex":%d}`+"\n", i+1)
|
|
filchedBytes += int64(stderrWriteForTest(b))
|
|
}
|
|
})
|
|
group.Go(func() {
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"WriteIndex":%d}`+"\n", i+1)
|
|
writeBytes += int64(must.Get(f.Write(b)))
|
|
}
|
|
})
|
|
group.Wait()
|
|
f.statAndUpdateBytes()
|
|
droppedBytes := filchedBytes + writeBytes - f.storedBytes.Value()
|
|
|
|
switch {
|
|
case f.writeCalls.Value() != calls:
|
|
t.Errorf("writeCalls = %v, want %d", f.writeCalls.Value(), calls)
|
|
case f.readCalls.Value() != 0:
|
|
t.Errorf("readCalls = %v, want 0", f.readCalls.Value())
|
|
case f.rotateCalls.Value() == 0:
|
|
t.Errorf("rotateCalls = 0, want >0")
|
|
case f.callErrors.Value() != 0:
|
|
t.Errorf("callErrors = %v, want 0", f.callErrors.Value())
|
|
case f.writeBytes.Value() != writeBytes+filchedBytes:
|
|
t.Errorf("writeBytes = %v, want %d", f.writeBytes.Value(), writeBytes+filchedBytes)
|
|
case f.readBytes.Value() != 0:
|
|
t.Errorf("readBytes = %v, want 0", f.readBytes.Value())
|
|
case f.filchedBytes.Value() != filchedBytes:
|
|
t.Errorf("filchedBytes = %v, want %d", f.filchedBytes.Value(), filchedBytes)
|
|
case f.droppedBytes.Value() != droppedBytes:
|
|
t.Errorf("droppedBytes = %v, want %d", f.droppedBytes.Value(), droppedBytes)
|
|
case f.droppedBytes.Value() == 0:
|
|
t.Errorf("droppedBytes = 0, want >0")
|
|
case f.storedBytes.Value() != f.storedBytesForTest():
|
|
t.Errorf("storedBytes = %v, want %d", f.storedBytes.Value(), f.storedBytesForTest())
|
|
case f.storedBytes.Value() > int64(opts.MaxFileSize) && !opts.ReplaceStderr:
|
|
// If ReplaceStderr, it is impossible for MaxFileSize to be
|
|
// strictly adhered to since asynchronous os.Stderr.Write calls
|
|
// do not trigger any checks to enforce maximum file size.
|
|
t.Errorf("storedBytes = %v, want <=%d", f.storedBytes.Value(), opts.MaxFileSize)
|
|
}
|
|
|
|
// Read back the data and verify that the entries are in order.
|
|
var readBytes, lastFilchIndex, lastWriteIndex int64
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
break
|
|
}
|
|
var entry struct{ FilchIndex, WriteIndex int64 }
|
|
must.Do(json.Unmarshal(b, &entry))
|
|
if entry.FilchIndex == 0 && entry.WriteIndex == 0 {
|
|
t.Errorf("both indexes are zero")
|
|
}
|
|
if entry.FilchIndex > 0 {
|
|
if entry.FilchIndex <= lastFilchIndex {
|
|
t.Errorf("FilchIndex = %d, want >%d", entry.FilchIndex, lastFilchIndex)
|
|
}
|
|
lastFilchIndex = entry.FilchIndex
|
|
}
|
|
if entry.WriteIndex > 0 {
|
|
if entry.WriteIndex <= lastWriteIndex {
|
|
t.Errorf("WriteIndex = %d, want >%d", entry.WriteIndex, lastWriteIndex)
|
|
}
|
|
lastWriteIndex = entry.WriteIndex
|
|
}
|
|
readBytes += int64(len(b))
|
|
}
|
|
|
|
if f.readBytes.Value() != readBytes {
|
|
t.Errorf("readBytes = %v, want %v", f.readBytes.Value(), readBytes)
|
|
}
|
|
}
|
|
|
|
// TestConcurrentSameFile tests that concurrent Filch operations on the same
|
|
// set of log files does not result in a panic.
|
|
// The exact behavior is undefined, but we should at least avoid a panic.
|
|
func TestConcurrentSameFile(t *testing.T) {
|
|
filePrefix := filepath.Join(t.TempDir(), "testlog")
|
|
f1 := must.Get(New(filePrefix, Options{MaxFileSize: 1000}))
|
|
f2 := must.Get(New(filePrefix, Options{MaxFileSize: 1000}))
|
|
var group sync.WaitGroup
|
|
for _, f := range []*Filch{f1, f2} {
|
|
group.Go(func() {
|
|
for range 1000 {
|
|
for range rand.IntN(10) {
|
|
f.Write([]byte("hello, world"))
|
|
}
|
|
for range rand.IntN(10) {
|
|
f.TryReadLine()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
group.Wait()
|
|
}
|