diff --git a/src/listener.c b/src/listener.c index 8b167bd72..40a774ed5 100644 --- a/src/listener.c +++ b/src/listener.c @@ -49,6 +49,12 @@ static struct bind_kw_list bind_keywords = { struct xfer_sock_list *xfer_sock_list = NULL; +/* there is one listener queue per thread so that a thread unblocking the + * global queue can wake up listeners bound only to foreing threads by + * moving them to the remote queues and waking up the associated task. + */ +static struct work_list *local_listener_queue; + #if defined(USE_THREAD) struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { }; @@ -328,6 +334,12 @@ int resume_listener(struct listener *l) HA_SPIN_LOCK(LISTENER_LOCK, &l->lock); + /* check that another thread didn't to the job in parallel (e.g. at the + * end of listen_accept() while we'd come from dequeue_all_listeners(). + */ + if (LIST_ADDED(&l->wait_queue)) + goto end; + if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) && !(proc_mask(l->bind_conf->bind_proc) & pid_bit)) goto end; @@ -370,6 +382,15 @@ int resume_listener(struct listener *l) goto end; } + if (!(thread_mask(l->bind_conf->bind_thread) & tid_bit)) { + /* we're not allowed to touch this listener's FD, let's requeue + * the listener into one of its owning thread's queue instead. + */ + int first_thread = my_flsl(thread_mask(l->bind_conf->bind_thread)) - 1; + work_list_add(&local_listener_queue[first_thread], &l->wait_queue); + goto end; + } + fd_want_recv(l->fd); l->state = LI_READY; end: @@ -1063,6 +1084,38 @@ void listener_release(struct listener *l) dequeue_all_listeners(&fe->listener_queue); } +/* resume listeners waiting in the local listener queue. They are still in LI_LIMITED state */ +static struct task *listener_queue_process(struct task *t, void *context, unsigned short state) +{ + struct work_list *wl = context; + struct listener *l; + + while ((l = LIST_POP_LOCKED(&wl->head, struct listener *, wait_queue))) { + /* The listeners are still in the LI_LIMITED state */ + resume_listener(l); + } + return t; +} + +/* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */ +static int listener_queue_init() +{ + local_listener_queue = work_list_create(global.nbthread, listener_queue_process, NULL); + if (!local_listener_queue) { + ha_alert("Out of memory while initializing listener queues.\n"); + return ERR_FATAL|ERR_ABORT; + } + return 0; +} + +static void listener_queue_deinit() +{ + work_list_destroy(local_listener_queue, global.nbthread); +} + +REGISTER_CONFIG_POSTPARSER("multi-threaded listener queue", listener_queue_init); +REGISTER_POST_DEINIT(listener_queue_deinit); + /* * Registers the bind keyword list as a list of valid keywords for next * parsing sessions.