BUG/MAJOR: fd/threads, task/threads: ensure all spin locks are unlocked

Calculate if the fd or task should be locked once, before locking, and
reuse the calculation when determing when to unlock.

Fixes a race condition added in 87d54a9a for fds, and b20aa9ee for tasks,
released in 1.9-dev4. When one thread modifies thread_mask to be a single
thread for a task or fd while a second thread has locked or is waiting on a
lock for that task or fd, the second thread will not unlock it.  For FDs,
this is observable when a listener is polled by multiple threads, and is
closed while those threads have events pending.  For tasks, this seems
possible, where task_set_affinity is called, but I did not observe it.

This must be backported to 1.9.
This commit is contained in:
Richard Russo 2019-02-20 12:43:45 -08:00 committed by Olivier Houchard
parent b8e602cb1b
commit bc9d9844d5
3 changed files with 60 additions and 29 deletions

View File

@ -282,6 +282,7 @@ static inline int fd_active(const int fd)
static inline void fd_stop_recv(int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -294,10 +295,11 @@ static inline void fd_stop_recv(int fd)
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -305,6 +307,7 @@ static inline void fd_stop_recv(int fd)
static inline void fd_stop_send(int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -317,10 +320,11 @@ static inline void fd_stop_send(int fd)
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -328,6 +332,7 @@ static inline void fd_stop_send(int fd)
static inline void fd_stop_both(int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -340,10 +345,11 @@ static inline void fd_stop_both(int fd)
if ((old ^ new) & FD_EV_POLLED_RW)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -351,6 +357,7 @@ static inline void fd_stop_both(int fd)
static inline void fd_cant_recv(const int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -364,23 +371,27 @@ static inline void fd_cant_recv(const int fd)
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can receive anymore without polling. */
static inline void fd_may_recv(const int fd)
{
unsigned long locked;
/* marking ready never changes polled status */
HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_R);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -392,6 +403,7 @@ static inline void fd_may_recv(const int fd)
static inline void fd_done_recv(const int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -405,10 +417,11 @@ static inline void fd_done_recv(const int fd)
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -416,6 +429,7 @@ static inline void fd_done_recv(const int fd)
static inline void fd_cant_send(const int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -429,23 +443,27 @@ static inline void fd_cant_send(const int fd)
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
static inline void fd_may_send(const int fd)
{
unsigned long locked;
/* marking ready never changes polled status */
HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_W);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -453,6 +471,7 @@ static inline void fd_may_send(const int fd)
static inline void fd_want_recv(int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -466,10 +485,11 @@ static inline void fd_want_recv(int fd)
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -477,6 +497,7 @@ static inline void fd_want_recv(int fd)
static inline void fd_want_send(int fd)
{
unsigned char old, new;
unsigned long locked;
old = fdtab[fd].state;
do {
@ -490,10 +511,11 @@ static inline void fd_want_send(int fd)
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
if (atleast2(fdtab[fd].thread_mask))
locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@ -501,11 +523,13 @@ static inline void fd_want_send(int fd)
* by the poller to set FD_POLL_* flags. */
static inline void fd_update_events(int fd, int evts)
{
if (atleast2(fdtab[fd].thread_mask))
unsigned long locked = atleast2(fdtab[fd].thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev &= FD_POLL_STICKY;
fdtab[fd].ev |= evts;
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
@ -518,7 +542,9 @@ static inline void fd_update_events(int fd, int evts)
/* Prepares <fd> for being polled */
static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask)
{
if (atleast2(thread_mask))
unsigned long locked = atleast2(thread_mask);
if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].owner = owner;
fdtab[fd].iocb = iocb;
@ -529,7 +555,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
/* note: do not reset polled_mask here as it indicates which poller
* still knows this FD from a possible previous round.
*/
if (atleast2(thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}

View File

@ -184,11 +184,14 @@ static inline struct task *__task_unlink_wq(struct task *t)
*/
static inline struct task *task_unlink_wq(struct task *t)
{
unsigned long locked;
if (likely(task_in_wq(t))) {
if (atleast2(t->thread_mask))
locked = atleast2(t->thread_mask);
if (locked)
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
__task_unlink_wq(t);
if (atleast2(t->thread_mask))
if (locked)
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
}
return t;

View File

@ -411,6 +411,7 @@ void fd_remove(int fd)
static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
{
int fd, old_fd, e;
unsigned long locked;
for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) {
if (fd == -2) {
@ -427,7 +428,8 @@ static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
continue;
HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
if (atleast2(fdtab[fd].thread_mask) && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
locked = atleast2(fdtab[fd].thread_mask);
if (locked && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
activity[tid].fd_lock++;
continue;
}
@ -442,13 +444,13 @@ static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
fdtab[fd].ev |= FD_POLL_OUT;
if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].iocb(fd);
}
else {
fd_release_cache_entry(fd);
if (atleast2(fdtab[fd].thread_mask))
if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
}