From 9e43ad2e3710b8051c0c3b7bc70353cb65bd197f Mon Sep 17 00:00:00 2001 From: machine424 Date: Mon, 8 Jul 2024 12:15:37 +0200 Subject: [PATCH] chore(remote_write): clean up as watcher.go is part of wlog now Signed-off-by: machine424 --- tsdb/wlog/watcher.go | 92 +++++++++----------------------------------- 1 file changed, 18 insertions(+), 74 deletions(-) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index bc7a144e66..9a02f3de49 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -20,7 +20,6 @@ import ( "math" "os" "path/filepath" - "slices" "strconv" "strings" "time" @@ -265,9 +264,9 @@ func (w *Watcher) loop() { // Run the watcher, which will tail the WAL until the quit channel is closed // or an error case is hit. func (w *Watcher) Run() error { - _, lastSegment, err := w.firstAndLast() + _, lastSegment, err := Segments(w.walDir) if err != nil { - return fmt.Errorf("wal.Segments: %w", err) + return fmt.Errorf("Segments: %w", err) } // We want to ensure this is false across iterations since @@ -318,57 +317,20 @@ func (w *Watcher) Run() error { // findSegmentForIndex finds the first segment greater than or equal to index. func (w *Watcher) findSegmentForIndex(index int) (int, error) { - refs, err := w.segments(w.walDir) + refs, err := listSegments(w.walDir) if err != nil { return -1, err } for _, r := range refs { - if r >= index { - return r, nil + if r.index >= index { + return r.index, nil } } return -1, errors.New("failed to find segment for index") } -func (w *Watcher) firstAndLast() (int, int, error) { - refs, err := w.segments(w.walDir) - if err != nil { - return -1, -1, err - } - - if len(refs) == 0 { - return -1, -1, nil - } - return refs[0], refs[len(refs)-1], nil -} - -// Copied from tsdb/wlog/wlog.go so we do not have to open a WAL. -// Plan is to move WAL watcher to TSDB and dedupe these implementations. -func (w *Watcher) segments(dir string) ([]int, error) { - files, err := os.ReadDir(dir) - if err != nil { - return nil, err - } - - var refs []int - for _, f := range files { - k, err := strconv.Atoi(f.Name()) - if err != nil { - continue - } - refs = append(refs, k) - } - slices.Sort(refs) - for i := 0; i < len(refs)-1; i++ { - if refs[i]+1 != refs[i+1] { - return nil, errors.New("segments are not sequential") - } - } - return refs, nil -} - func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error { err := w.readSegment(r, segmentNum, tail) @@ -447,35 +409,17 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { // Currently doing a garbage collect, try again later. } + // if a newer segment is produced, read the current one until the end and move on. case <-segmentTicker.C: - _, last, err := w.firstAndLast() + _, last, err := Segments(w.walDir) if err != nil { - return fmt.Errorf("segments: %w", err) + return fmt.Errorf("Segments: %w", err) } - // Check if new segments exists. - if last <= segmentNum { - continue + if last > segmentNum { + return w.readAndHandleError(reader, segmentNum, tail, size) } - err = w.readSegment(reader, segmentNum, tail) - - // Ignore errors reading to end of segment whilst replaying the WAL. - if !tail { - switch { - case err != nil && !errors.Is(err, io.EOF): - level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err) - case reader.Offset() != size: - level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) - } - return nil - } - - // Otherwise, when we are tailing, non-EOFs are fatal. - if err != nil && !errors.Is(err, io.EOF) { - return err - } - - return nil + continue // we haven't read due to a notification in quite some time, try reading anyways case <-readTicker.C: @@ -484,7 +428,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { if err != nil { return err } - // still want to reset the ticker so we don't read too often + // reset the ticker so we don't read too often readTicker.Reset(readTimeout) case <-w.readNotify: @@ -492,7 +436,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { if err != nil { return err } - // still want to reset the ticker so we don't read too often + // reset the ticker so we don't read too often readTicker.Reset(readTimeout) } } @@ -731,17 +675,17 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err } // Ensure we read the whole contents of every segment in the checkpoint dir. - segs, err := w.segments(checkpointDir) + segs, err := listSegments(checkpointDir) if err != nil { return fmt.Errorf("Unable to get segments checkpoint dir: %w", err) } - for _, seg := range segs { - size, err := getSegmentSize(checkpointDir, seg) + for _, segRef := range segs { + size, err := getSegmentSize(checkpointDir, segRef.index) if err != nil { return fmt.Errorf("getSegmentSize: %w", err) } - sr, err := OpenReadSegment(SegmentName(checkpointDir, seg)) + sr, err := OpenReadSegment(SegmentName(checkpointDir, segRef.index)) if err != nil { return fmt.Errorf("unable to open segment: %w", err) } @@ -753,7 +697,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err } if r.Offset() != size { - return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset()) + return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, segRef.index, size, r.Offset()) } }