MAJOR: fd/threads: Make the fdcache mostly lockless.

Create a local, per-thread, fdcache, for file descriptors that only belongs
to one thread, and make the global fd cache mostly lockless, as we can get
a lot of contention on the fd cache lock.
This commit is contained in:
Olivier Houchard 2018-01-24 18:17:56 +01:00 committed by Willy Tarreau
parent cf975d46bc
commit 4815c8cbfe
6 changed files with 262 additions and 79 deletions

View File

@ -259,7 +259,6 @@ int thread_need_sync(void);
/* WARNING!!! if you update this enum, please also keep lock_label() up to date below */ /* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
enum lock_label { enum lock_label {
THREAD_SYNC_LOCK = 0, THREAD_SYNC_LOCK = 0,
FDCACHE_LOCK,
FD_LOCK, FD_LOCK,
TASK_RQ_LOCK, TASK_RQ_LOCK,
TASK_WQ_LOCK, TASK_WQ_LOCK,
@ -376,7 +375,6 @@ static inline const char *lock_label(enum lock_label label)
{ {
switch (label) { switch (label) {
case THREAD_SYNC_LOCK: return "THREAD_SYNC"; case THREAD_SYNC_LOCK: return "THREAD_SYNC";
case FDCACHE_LOCK: return "FDCACHE";
case FD_LOCK: return "FD"; case FD_LOCK: return "FD";
case TASK_RQ_LOCK: return "TASK_RQ"; case TASK_RQ_LOCK: return "TASK_RQ";
case TASK_WQ_LOCK: return "TASK_WQ"; case TASK_WQ_LOCK: return "TASK_WQ";

View File

@ -33,8 +33,9 @@
/* public variables */ /* public variables */
extern unsigned int *fd_cache; // FD events cache extern volatile struct fdlist fd_cache;
extern int fd_cache_num; // number of events in the cache extern volatile struct fdlist fd_cache_local[MAX_THREADS];
extern unsigned long fd_cache_mask; // Mask of threads with events in the cache extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
extern THREAD_LOCAL int *fd_updt; // FD updates list extern THREAD_LOCAL int *fd_updt; // FD updates list
@ -105,44 +106,223 @@ static inline void updt_fd_polling(const int fd)
} }
#define _GET_NEXT(fd) fdtab[fd].fdcache_entry.next
#define _GET_PREV(fd) fdtab[fd].fdcache_entry.prev
static inline void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
{
int next;
int new;
int old;
int last;
redo_next:
next = _GET_NEXT(fd);
/*
* Check that we're not already in the cache, and if not, lock us.
* <= -3 means not in the cache, -2 means locked, -1 means we're
* in the cache, and the last element, >= 0 gives the FD of the next
* in the cache.
*/
if (next >= -2)
goto done;
if (!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2))
goto redo_next;
__ha_barrier_store();
redo_last:
/* First, insert in the linked list */
last = list->last;
old = -1;
new = fd;
if (unlikely(last == -1)) {
/* list is empty, try to add ourselves alone so that list->last=fd */
_GET_PREV(fd) = last;
/* Make sure the "prev" store is visible before we update the last entry */
__ha_barrier_store();
if (unlikely(!HA_ATOMIC_CAS(&list->last, &old, new)))
goto redo_last;
/* list->first was necessary -1, we're guaranteed to be alone here */
list->first = fd;
/* since we're alone at the end of the list and still locked(-2),
* we know noone tried to add past us. Mark the end of list.
*/
_GET_NEXT(fd) = -1;
goto done; /* We're done ! */
} else {
/* non-empty list, add past the tail */
do {
new = fd;
old = -1;
_GET_PREV(fd) = last;
__ha_barrier_store();
/* adding ourselves past the last element
* The CAS will only succeed if its next is -1,
* which means it's in the cache, and the last element.
*/
if (likely(HA_ATOMIC_CAS(&_GET_NEXT(last), &old, new)))
break;
goto redo_last;
} while (1);
}
/* Then, update the last entry */
redo_fd_cache:
last = list->last;
__ha_barrier_load();
if (unlikely(!HA_ATOMIC_CAS(&list->last, &last, fd)))
goto redo_fd_cache;
__ha_barrier_store();
_GET_NEXT(fd) = -1;
__ha_barrier_store();
done:
return;
}
/* Allocates a cache entry for a file descriptor if it does not yet have one. /* Allocates a cache entry for a file descriptor if it does not yet have one.
* This can be done at any time. * This can be done at any time.
*/ */
static inline void fd_alloc_cache_entry(const int fd) static inline void fd_alloc_cache_entry(const int fd)
{ {
HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock); if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
if (fdtab[fd].cache) fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
goto end; else
fd_cache_num++; fd_add_to_fd_list(&fd_cache, fd);
fd_cache_mask |= fdtab[fd].thread_mask;
fdtab[fd].cache = fd_cache_num;
fd_cache[fd_cache_num-1] = fd;
end:
HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
} }
static inline void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
{
#if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B)
volatile struct fdlist_entry cur_list, next_list;
#endif
int old;
int new = -2;
volatile int prev;
volatile int next;
int last;
lock_self:
#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
next_list.next = next_list.prev = -2;
cur_list.prev = _GET_PREV(fd);
cur_list.next = _GET_NEXT(fd);
/* First, attempt to lock our own entries */
do {
/* The FD is not in the FD cache, give up */
if (unlikely(cur_list.next <= -3))
return;
if (unlikely(cur_list.prev == -2 || cur_list.next == -2))
goto lock_self;
} while (
#ifdef HA_CAS_IS_8B
unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_GET_NEXT(fd)), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
#else
unlikely(!__ha_cas_dw((void *)&_GET_NEXT(fd), (void *)&cur_list, (void *)&next_list)))
#endif
;
next = cur_list.next;
prev = cur_list.prev;
#else
lock_self_next:
next = _GET_NEXT(fd);
if (next == -2)
goto lock_self_next;
if (next <= -3)
goto done;
if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2)))
goto lock_self_next;
lock_self_prev:
prev = _GET_PREV(fd);
if (prev == -2)
goto lock_self_prev;
if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(fd), &prev, -2)))
goto lock_self_prev;
#endif
__ha_barrier_store();
/* Now, lock the entries of our neighbours */
if (likely(prev != -1)) {
redo_prev:
old = fd;
if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(prev), &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
_GET_PREV(fd) = prev;
__ha_barrier_store();
_GET_NEXT(fd) = next;
__ha_barrier_store();
goto lock_self;
}
goto redo_prev;
}
}
if (likely(next != -1)) {
redo_next:
old = fd;
if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(next), &old, new))) {
if (unlikely(old == -2)) {
/* Neighbour already locked, give up and
* retry again once he's done
*/
if (prev != -1) {
_GET_NEXT(prev) = fd;
__ha_barrier_store();
}
_GET_PREV(fd) = prev;
__ha_barrier_store();
_GET_NEXT(fd) = next;
__ha_barrier_store();
goto lock_self;
}
goto redo_next;
}
}
if (list->first == fd)
list->first = next;
__ha_barrier_store();
last = list->last;
while (unlikely(last == fd && (!HA_ATOMIC_CAS(&list->last, &last, prev))))
__ha_compiler_barrier();
/* Make sure we let other threads know we're no longer in cache,
* before releasing our neighbours.
*/
__ha_barrier_store();
if (likely(prev != -1))
_GET_NEXT(prev) = next;
__ha_barrier_store();
if (likely(next != -1))
_GET_PREV(next) = prev;
__ha_barrier_store();
/* Ok, now we're out of the fd cache */
_GET_NEXT(fd) = -(next + 4);
__ha_barrier_store();
done:
return;
}
#undef _GET_NEXT
#undef _GET_PREV
/* Removes entry used by fd <fd> from the FD cache and replaces it with the /* Removes entry used by fd <fd> from the FD cache and replaces it with the
* last one. The fdtab.cache is adjusted to match the back reference if needed. * last one.
* If the fd has no entry assigned, return immediately. * If the fd has no entry assigned, return immediately.
*/ */
static inline void fd_release_cache_entry(int fd) static inline void fd_release_cache_entry(int fd)
{ {
unsigned int pos; if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock); else
pos = fdtab[fd].cache; fd_rm_from_fd_list(&fd_cache, fd);
if (!pos)
goto end;
fdtab[fd].cache = 0;
fd_cache_num--;
if (likely(pos <= fd_cache_num)) {
/* was not the last entry */
fd = fd_cache[fd_cache_num];
fd_cache[pos - 1] = fd;
fdtab[fd].cache = pos;
}
end:
HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
} }
/* Computes the new polled status based on the active and ready statuses, for /* Computes the new polled status based on the active and ready statuses, for
@ -402,7 +582,6 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].update_mask &= ~tid_bit;
fdtab[fd].linger_risk = 0; fdtab[fd].linger_risk = 0;
fdtab[fd].cloned = 0; fdtab[fd].cloned = 0;
fdtab[fd].cache = 0;
fdtab[fd].thread_mask = thread_mask; fdtab[fd].thread_mask = thread_mask;
/* note: do not reset polled_mask here as it indicates which poller /* note: do not reset polled_mask here as it indicates which poller
* still knows this FD from a possible previous round. * still knows this FD from a possible previous round.

View File

@ -90,15 +90,25 @@ enum fd_states {
*/ */
#define DEAD_FD_MAGIC 0xFDDEADFD #define DEAD_FD_MAGIC 0xFDDEADFD
struct fdlist_entry {
volatile int next;
volatile int prev;
} __attribute__ ((aligned(8)));
struct fdlist {
volatile int first;
volatile int last;
} __attribute__ ((aligned(8)));
/* info about one given fd */ /* info about one given fd */
struct fdtab { struct fdtab {
__decl_hathreads(HA_SPINLOCK_T lock); __decl_hathreads(HA_SPINLOCK_T lock);
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */ unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
unsigned long polled_mask; /* mask of thread IDs currently polling this fd */ unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */ unsigned long update_mask; /* mask of thread IDs having an update for fd */
struct fdlist_entry fdcache_entry; /* Entry in the fdcache */
void (*iocb)(int fd); /* I/O handler */ void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */ void *owner; /* the connection or listener associated with this fd, NULL if closed */
unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */
unsigned char state; /* FD state for read and write directions (2*3 bits) */ unsigned char state; /* FD state for read and write directions (2*3 bits) */
unsigned char ev; /* event seen in return of poll() : FD_POLL_* */ unsigned char ev; /* event seen in return of poll() : FD_POLL_* */
unsigned char linger_risk:1; /* 1 if we must kill lingering before closing */ unsigned char linger_risk:1; /* 1 if we must kill lingering before closing */

View File

@ -811,7 +811,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
(fdt.ev & FD_POLL_IN) ? 'I' : 'i', (fdt.ev & FD_POLL_IN) ? 'I' : 'i',
fdt.linger_risk ? 'L' : 'l', fdt.linger_risk ? 'L' : 'l',
fdt.cloned ? 'C' : 'c', fdt.cloned ? 'C' : 'c',
fdt.cache, fdt.fdcache_entry.next >= -2 ? 1 : 0,
fdt.owner, fdt.owner,
fdt.iocb, fdt.iocb,
(fdt.iocb == conn_fd_handler) ? "conn_fd_handler" : (fdt.iocb == conn_fd_handler) ? "conn_fd_handler" :

View File

@ -167,15 +167,14 @@ struct poller pollers[MAX_POLLERS];
struct poller cur_poller; struct poller cur_poller;
int nbpollers = 0; int nbpollers = 0;
unsigned int *fd_cache = NULL; // FD events cache volatile struct fdlist fd_cache ; // FD events cache
int fd_cache_num = 0; // number of events in the cache volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
THREAD_LOCAL int *fd_updt = NULL; // FD updates list THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
__decl_hathreads(HA_RWLOCK_T fdcache_lock); /* global lock to protect fd_cache array */
/* Deletes an FD from the fdsets. /* Deletes an FD from the fdsets.
* The file descriptor is also closed. * The file descriptor is also closed.
*/ */
@ -221,33 +220,30 @@ void fd_remove(int fd)
fd_dodelete(fd, 0); fd_dodelete(fd, 0);
} }
/* Scan and process the cached events. This should be called right after static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
* the poller. The loop may cause new entries to be created, for example
* if a listener causes an accept() to initiate a new incoming connection
* wanting to attempt an recv().
*/
void fd_process_cached_events()
{ {
int fd, entry, e; int fd, old_fd, e;
HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock); for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].fdcache_entry.next) {
fd_cache_mask &= ~tid_bit; if (fd == -2) {
for (entry = 0; entry < fd_cache_num; ) { fd = old_fd;
fd = fd_cache[entry]; continue;
} else if (fd <= -3)
fd = -fd - 4;
if (fd == -1)
break;
old_fd = fd;
if (!(fdtab[fd].thread_mask & tid_bit))
continue;
if (fdtab[fd].fdcache_entry.next < -3)
continue;
if (!(fdtab[fd].thread_mask & tid_bit)) { HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
activity[tid].fd_skip++;
goto next;
}
fd_cache_mask |= tid_bit;
if (HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) { if (HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
activity[tid].fd_lock++; activity[tid].fd_lock++;
goto next; continue;
} }
HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
e = fdtab[fd].state; e = fdtab[fd].state;
fdtab[fd].ev &= FD_POLL_STICKY; fdtab[fd].ev &= FD_POLL_STICKY;
@ -265,19 +261,19 @@ void fd_process_cached_events()
fd_release_cache_entry(fd); fd_release_cache_entry(fd);
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
} }
}
}
HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock); /* Scan and process the cached events. This should be called right after
/* If the fd was removed from the cache, it has been * the poller. The loop may cause new entries to be created, for example
* replaced by the next one that we don't want to skip ! * if a listener causes an accept() to initiate a new incoming connection
* wanting to attempt an recv().
*/ */
if (entry < fd_cache_num && fd_cache[entry] != fd) { void fd_process_cached_events()
activity[tid].fd_del++; {
continue; HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit);
} fdlist_process_cached_events(&fd_cache_local[tid]);
next: fdlist_process_cached_events(&fd_cache);
entry++;
}
HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
} }
/* disable the specified poller */ /* disable the specified poller */
@ -320,16 +316,19 @@ int init_pollers()
if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL) if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
goto fail_info; goto fail_info;
if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL) fd_cache.first = fd_cache.last = -1;
goto fail_cache;
hap_register_per_thread_init(init_pollers_per_thread); hap_register_per_thread_init(init_pollers_per_thread);
hap_register_per_thread_deinit(deinit_pollers_per_thread); hap_register_per_thread_deinit(deinit_pollers_per_thread);
for (p = 0; p < global.maxsock; p++) for (p = 0; p < global.maxsock; p++) {
HA_SPIN_INIT(&fdtab[p].lock); HA_SPIN_INIT(&fdtab[p].lock);
/* Mark the fd as out of the fd cache */
fdtab[p].fdcache_entry.next = -3;
fdtab[p].fdcache_entry.next = -3;
}
for (p = 0; p < global.nbthread; p++)
fd_cache_local[p].first = fd_cache_local[p].last = -1;
HA_RWLOCK_INIT(&fdcache_lock);
do { do {
bp = NULL; bp = NULL;
for (p = 0; p < nbpollers; p++) for (p = 0; p < nbpollers; p++)
@ -372,11 +371,8 @@ void deinit_pollers() {
bp->term(bp); bp->term(bp);
} }
free(fd_cache); fd_cache = NULL;
free(fdinfo); fdinfo = NULL; free(fdinfo); fdinfo = NULL;
free(fdtab); fdtab = NULL; free(fdtab); fdtab = NULL;
HA_RWLOCK_DESTROY(&fdcache_lock);
} }
/* /*

View File

@ -2905,7 +2905,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
conn->flags, conn->flags,
conn->handle.fd, conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
} }
@ -2938,7 +2938,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
conn->flags, conn->flags,
conn->handle.fd, conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
} }