tailscale/util/execqueue/execqueue.go
Brad Fitzpatrick 8af7778ce0 util/execqueue: don't hold mutex in RunSync
We don't hold q.mu while running normal ExecQueue.Add funcs, so we
shouldn't in RunSync either. Otherwise code it calls can't shut down
the queue, as seen in #18502.

Updates #18052

Co-authored-by: Nick Khyl <nickk@tailscale.com>
Change-Id: Ic5e53440411eca5e9fabac7f4a68a9f6ef026de1
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2025-11-26 10:09:23 -08:00

128 lines
2.5 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package execqueue implements an ordered asynchronous queue for executing functions.
package execqueue
import (
"context"
"errors"
"tailscale.com/syncs"
)
type ExecQueue struct {
mu syncs.Mutex
ctx context.Context // context.Background + closed on Shutdown
cancel context.CancelFunc // closes ctx
closed bool
inFlight bool // whether a goroutine is running q.run
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
queue []func()
}
func (q *ExecQueue) Add(f func()) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
}
q.initCtxLocked()
if q.inFlight {
q.queue = append(q.queue, f)
} else {
q.inFlight = true
go q.run(f)
}
}
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
q.mu.Lock()
q.initCtxLocked()
shutdownCtx := q.ctx
q.mu.Unlock()
ch := make(chan struct{})
q.Add(f)
q.Add(func() { close(ch) })
select {
case <-ch:
return nil
case <-ctx.Done():
return ctx.Err()
case <-shutdownCtx.Done():
return errExecQueueShutdown
}
}
func (q *ExecQueue) run(f func()) {
f()
q.mu.Lock()
for len(q.queue) > 0 && !q.closed {
f := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
q.mu.Unlock()
f()
q.mu.Lock()
}
q.inFlight = false
q.queue = nil
if q.doneWaiter != nil {
close(q.doneWaiter)
q.doneWaiter = nil
}
q.mu.Unlock()
}
// Shutdown asynchronously signals the queue to stop.
func (q *ExecQueue) Shutdown() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
if q.cancel != nil {
q.cancel()
}
}
func (q *ExecQueue) initCtxLocked() {
if q.ctx == nil {
q.ctx, q.cancel = context.WithCancel(context.Background())
}
}
var errExecQueueShutdown = errors.New("execqueue shut down")
// Wait waits for the queue to be empty or shut down.
func (q *ExecQueue) Wait(ctx context.Context) error {
q.mu.Lock()
q.initCtxLocked()
waitCh := q.doneWaiter
if q.inFlight && waitCh == nil {
waitCh = make(chan struct{})
q.doneWaiter = waitCh
}
closed := q.closed
shutdownCtx := q.ctx
q.mu.Unlock()
if closed {
return errExecQueueShutdown
}
if waitCh == nil {
return nil
}
select {
case <-waitCh:
return nil
case <-shutdownCtx.Done():
return errExecQueueShutdown
case <-ctx.Done():
return ctx.Err()
}
}