MEDIUM: fd/threads: Make sure we don't miss a fd cache entry.

An fd cache entry might be removed and added at the end of the list, while
another thread is parsing it, if that happens, we may miss fd cache entries,
to avoid that, add a new field in the struct fdtab, "added_mask", which
contains a mask for potentially affected threads, if it is set, the
corresponding thread will set its bit in fd_cache_mask, to avoid waiting in
poll while it may have more work to do.
This commit is contained in:
Olivier Houchard 2018-01-31 18:07:29 +01:00 committed by Willy Tarreau
parent 4815c8cbfe
commit 1256836ebf
5 changed files with 49 additions and 53 deletions

View File

@ -105,10 +105,6 @@ static inline void updt_fd_polling(const int fd)
fd_updt[fd_nbupdt++] = fd;
}
#define _GET_NEXT(fd) fdtab[fd].fdcache_entry.next
#define _GET_PREV(fd) fdtab[fd].fdcache_entry.prev
static inline void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
{
int next;
@ -117,16 +113,11 @@ static inline void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
int last;
redo_next:
next = _GET_NEXT(fd);
/*
* Check that we're not already in the cache, and if not, lock us.
* <= -3 means not in the cache, -2 means locked, -1 means we're
* in the cache, and the last element, >= 0 gives the FD of the next
* in the cache.
*/
next = fdtab[fd].cache.next;
/* Check that we're not already in the cache, and if not, lock us. */
if (next >= -2)
goto done;
if (!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2))
if (!HA_ATOMIC_CAS(&fdtab[fd].cache.next, &next, -2))
goto redo_next;
__ha_barrier_store();
redo_last:
@ -137,7 +128,7 @@ redo_last:
if (unlikely(last == -1)) {
/* list is empty, try to add ourselves alone so that list->last=fd */
_GET_PREV(fd) = last;
fdtab[fd].cache.prev = last;
/* Make sure the "prev" store is visible before we update the last entry */
__ha_barrier_store();
@ -150,14 +141,14 @@ redo_last:
/* since we're alone at the end of the list and still locked(-2),
* we know noone tried to add past us. Mark the end of list.
*/
_GET_NEXT(fd) = -1;
fdtab[fd].cache.next = -1;
goto done; /* We're done ! */
} else {
/* non-empty list, add past the tail */
do {
new = fd;
old = -1;
_GET_PREV(fd) = last;
fdtab[fd].cache.prev = last;
__ha_barrier_store();
@ -165,7 +156,7 @@ redo_last:
* The CAS will only succeed if its next is -1,
* which means it's in the cache, and the last element.
*/
if (likely(HA_ATOMIC_CAS(&_GET_NEXT(last), &old, new)))
if (likely(HA_ATOMIC_CAS(&fdtab[last].cache.next, &old, new)))
break;
goto redo_last;
} while (1);
@ -178,7 +169,7 @@ redo_fd_cache:
if (unlikely(!HA_ATOMIC_CAS(&list->last, &last, fd)))
goto redo_fd_cache;
__ha_barrier_store();
_GET_NEXT(fd) = -1;
fdtab[fd].cache.next = -1;
__ha_barrier_store();
done:
return;
@ -202,15 +193,14 @@ static inline void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
#endif
int old;
int new = -2;
volatile int prev;
volatile int next;
int prev;
int next;
int last;
lock_self:
#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
next_list.next = next_list.prev = -2;
cur_list.prev = _GET_PREV(fd);
cur_list.next = _GET_NEXT(fd);
cur_list = fdtab[fd].cache;
/* First, attempt to lock our own entries */
do {
/* The FD is not in the FD cache, give up */
@ -220,9 +210,9 @@ lock_self:
goto lock_self;
} while (
#ifdef HA_CAS_IS_8B
unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_GET_NEXT(fd)), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
unlikely(!HA_ATOMIC_CAS(((void **)(void *)&fdtab[fd].cache.next), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
#else
unlikely(!__ha_cas_dw((void *)&_GET_NEXT(fd), (void *)&cur_list, (void *)&next_list)))
unlikely(!__ha_cas_dw((void *)&fdtab[fd].cache.next, (void *)&cur_list, (void *)&next_list)))
#endif
;
next = cur_list.next;
@ -230,18 +220,18 @@ lock_self:
#else
lock_self_next:
next = _GET_NEXT(fd);
next = fdtab[fd].cache.next;
if (next == -2)
goto lock_self_next;
if (next <= -3)
goto done;
if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2)))
if (unlikely(!HA_ATOMIC_CAS(&fdtab[fd].cache.next, &next, -2)))
goto lock_self_next;
lock_self_prev:
prev = _GET_PREV(fd);
prev = fdtab[fd].cache.prev;
if (prev == -2)
goto lock_self_prev;
if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(fd), &prev, -2)))
if (unlikely(!HA_ATOMIC_CAS(&fdtab[fd].cache.prev, &prev, -2)))
goto lock_self_prev;
#endif
__ha_barrier_store();
@ -251,14 +241,14 @@ lock_self_prev:
redo_prev:
old = fd;
if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(prev), &old, new))) {
if (unlikely(!HA_ATOMIC_CAS(&fdtab[prev].cache.next, &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
_GET_PREV(fd) = prev;
fdtab[fd].cache.prev = prev;
__ha_barrier_store();
_GET_NEXT(fd) = next;
fdtab[fd].cache.next = next;
__ha_barrier_store();
goto lock_self;
}
@ -268,18 +258,18 @@ redo_prev:
if (likely(next != -1)) {
redo_next:
old = fd;
if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(next), &old, new))) {
if (unlikely(!HA_ATOMIC_CAS(&fdtab[next].cache.prev, &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
if (prev != -1) {
_GET_NEXT(prev) = fd;
fdtab[prev].cache.next = fd;
__ha_barrier_store();
}
_GET_PREV(fd) = prev;
fdtab[fd].cache.prev = prev;
__ha_barrier_store();
_GET_NEXT(fd) = next;
fdtab[fd].cache.next = next;
__ha_barrier_store();
goto lock_self;
}
@ -297,22 +287,18 @@ redo_next:
*/
__ha_barrier_store();
if (likely(prev != -1))
_GET_NEXT(prev) = next;
fdtab[prev].cache.next = next;
__ha_barrier_store();
if (likely(next != -1))
_GET_PREV(next) = prev;
fdtab[next].cache.prev = prev;
__ha_barrier_store();
/* Ok, now we're out of the fd cache */
_GET_NEXT(fd) = -(next + 4);
fdtab[fd].cache.next = -(next + 4);
__ha_barrier_store();
done:
return;
}
#undef _GET_NEXT
#undef _GET_PREV
/* Removes entry used by fd <fd> from the FD cache and replaces it with the
* last one.
* If the fd has no entry assigned, return immediately.

View File

@ -90,14 +90,24 @@ enum fd_states {
*/
#define DEAD_FD_MAGIC 0xFDDEADFD
/* fdlist_entry: entry used by the fd cache.
* >= 0 means we're in the cache and gives the FD of the next in the cache,
* -1 means we're in the cache and the last element,
* -2 means the entry is locked,
* <= -3 means not in the cache, and next element is -4-fd
*
* It must remain 8-aligned so that aligned CAS operations may be done on both
* entries at once.
*/
struct fdlist_entry {
volatile int next;
volatile int prev;
int next;
int prev;
} __attribute__ ((aligned(8)));
/* head of the fd cache */
struct fdlist {
volatile int first;
volatile int last;
int first;
int last;
} __attribute__ ((aligned(8)));
/* info about one given fd */
@ -106,7 +116,7 @@ struct fdtab {
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
struct fdlist_entry fdcache_entry; /* Entry in the fdcache */
struct fdlist_entry cache; /* Entry in the fdcache */
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
unsigned char state; /* FD state for read and write directions (2*3 bits) */

View File

@ -811,7 +811,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
(fdt.ev & FD_POLL_IN) ? 'I' : 'i',
fdt.linger_risk ? 'L' : 'l',
fdt.cloned ? 'C' : 'c',
fdt.fdcache_entry.next >= -2 ? 1 : 0,
fdt.cache.next >= -2 ? 1 : 0,
fdt.owner,
fdt.iocb,
(fdt.iocb == conn_fd_handler) ? "conn_fd_handler" :

View File

@ -224,7 +224,7 @@ static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
{
int fd, old_fd, e;
for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].fdcache_entry.next) {
for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) {
if (fd == -2) {
fd = old_fd;
continue;
@ -235,7 +235,7 @@ static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
old_fd = fd;
if (!(fdtab[fd].thread_mask & tid_bit))
continue;
if (fdtab[fd].fdcache_entry.next < -3)
if (fdtab[fd].cache.next < -3)
continue;
HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
@ -323,8 +323,8 @@ int init_pollers()
for (p = 0; p < global.maxsock; p++) {
HA_SPIN_INIT(&fdtab[p].lock);
/* Mark the fd as out of the fd cache */
fdtab[p].fdcache_entry.next = -3;
fdtab[p].fdcache_entry.next = -3;
fdtab[p].cache.next = -3;
fdtab[p].cache.next = -3;
}
for (p = 0; p < global.nbthread; p++)
fd_cache_local[p].first = fd_cache_local[p].last = -1;

View File

@ -2905,7 +2905,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
conn->flags,
conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}
@ -2938,7 +2938,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
conn->flags,
conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}