[release 2.53] Revert 13583 to stop dropping samples in remote-write catch-up (#14446)

* Revert "fix bug that would cause us to endlessly fall behind (#13583)"
This reverts commit 0c71230784.
(leaving the new test in place)

* TSDB: enhance TestRun_AvoidNotifyWhenBehind
With code suggested by @cstyan in #14439.

* WAL watcher: add back log line showing current segment

---------

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-07-10 10:00:31 +01:00 committed by GitHub
parent 4c35b9250a
commit 7083ae8267
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 65 additions and 66 deletions

View File

@ -262,6 +262,11 @@ func (w *Watcher) loop() {
// Run the watcher, which will tail the WAL until the quit channel is closed // Run the watcher, which will tail the WAL until the quit channel is closed
// or an error case is hit. // or an error case is hit.
func (w *Watcher) Run() error { func (w *Watcher) Run() error {
_, lastSegment, err := w.firstAndLast()
if err != nil {
return fmt.Errorf("wal.Segments: %w", err)
}
// We want to ensure this is false across iterations since // We want to ensure this is false across iterations since
// Run will be called again if there was a failure to read the WAL. // Run will be called again if there was a failure to read the WAL.
w.sendSamples = false w.sendSamples = false
@ -286,20 +291,14 @@ func (w *Watcher) Run() error {
return err return err
} }
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment) level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
for !isClosed(w.quit) { for !isClosed(w.quit) {
w.currentSegmentMetric.Set(float64(currentSegment)) w.currentSegmentMetric.Set(float64(currentSegment))
// Re-check on each iteration in case a new segment was added, // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// because watch() will wait for notifications on the last segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
_, lastSegment, err := w.firstAndLast() level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
if err != nil { if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
return fmt.Errorf("wal.Segments: %w", err)
}
tail := currentSegment >= lastSegment
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment, "lastSegment", lastSegment)
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
return err return err
} }

View File

@ -17,6 +17,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"path" "path"
"runtime"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -698,11 +699,46 @@ func TestRun_StartupTime(t *testing.T) {
} }
} }
func generateWALRecords(w *WL, segment, seriesCount, samplesCount int) error {
enc := record.Encoder{}
for j := 0; j < seriesCount; j++ {
ref := j + (segment * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", segment)),
},
}, nil)
if err := w.Log(series); err != nil {
return err
}
for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(segment),
V: float64(segment),
},
}, nil)
if err := w.Log(sample); err != nil {
return err
}
}
}
return nil
}
func TestRun_AvoidNotifyWhenBehind(t *testing.T) { func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
const pageSize = 32 * 1024 if runtime.GOOS == "windows" { // Takes a really long time, perhaps because min sleep time is 15ms.
const segments = 10 t.SkipNow()
const seriesCount = 20 }
const samplesCount = 300 const segmentSize = pageSize // Smallest allowed segment size.
const segmentsToWrite = 5
const segmentsToRead = segmentsToWrite - 1
const seriesCount = 10
const samplesCount = 50
// This test can take longer than intended to finish in cloud CI. // This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time.Second readTimeout := 10 * time.Second
@ -715,73 +751,37 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
err := os.Mkdir(wdir, 0o777) err := os.Mkdir(wdir, 0o777)
require.NoError(t, err) require.NoError(t, err)
enc := record.Encoder{} w, err := NewSize(nil, nil, wdir, segmentSize, compress)
w, err := NewSize(nil, nil, wdir, pageSize, compress)
require.NoError(t, err) require.NoError(t, err)
var wg sync.WaitGroup var wg sync.WaitGroup
// add one segment initially to ensure there's a value > 0 for the last segment id // Generate one segment initially to ensure that watcher.Run() finds at least one segment on disk.
for i := 0; i < 1; i++ { require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
for j := 0; j < seriesCount; j++ { w.NextSegment() // Force creation of the next segment
ref := j + (i * 100)
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))
for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for i := 1; i < segments; i++ { for i := 1; i < segmentsToWrite; i++ {
for j := 0; j < seriesCount; j++ { require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount))
ref := j + (i * 100) w.NextSegment()
series := enc.Series([]record.RefSeries{
{
Ref: chunks.HeadSeriesRef(ref),
Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
},
}, nil)
require.NoError(t, w.Log(series))
for k := 0; k < samplesCount; k++ {
inner := rand.Intn(ref + 1)
sample := enc.Samples([]record.RefSample{
{
Ref: chunks.HeadSeriesRef(inner),
T: int64(i),
V: float64(i),
},
}, nil)
require.NoError(t, w.Log(sample))
}
}
} }
}() }()
wt := newWriteToMock(time.Millisecond) wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false)
watcher.MaxSegment = segments watcher.MaxSegment = segmentsToRead
watcher.setMetrics() watcher.setMetrics()
startTime := time.Now() startTime := time.Now()
err = watcher.Run() err = watcher.Run()
wg.Wait() wg.Wait()
require.Less(t, time.Since(startTime), readTimeout) require.Less(t, time.Since(startTime), readTimeout)
// But samples records shouldn't get dropped
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() > 0
})
require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Close()) require.NoError(t, w.Close())
}) })