diff --git a/src/proto_rhttp.c b/src/proto_rhttp.c index 52d45441f..9acc41143 100644 --- a/src/proto_rhttp.c +++ b/src/proto_rhttp.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -129,9 +130,6 @@ static struct connection *new_reverse_conn(struct listener *l, struct server *sr */ void rhttp_notify_preconn_err(struct listener *l) { - /* For the moment reverse connection are bound only on first thread. */ - BUG_ON(tid != 0); - /* Receiver must reference a reverse connection as pending. */ BUG_ON(!l->rx.rhttp.pend_conn); @@ -150,6 +148,52 @@ void rhttp_notify_preconn_err(struct listener *l) task_queue(l->rx.rhttp.task); } +/* Lookup over listener threads for their current count of active reverse + * HTTP connections. Returns the less loaded thread ID. + */ +static unsigned int select_thread(struct listener *l) +{ + unsigned long mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled); + unsigned int load_min = HA_ATOMIC_LOAD(&th_ctx->nb_rhttp_conns); + unsigned int load_thr; + unsigned int ret = tid; + int i; + + /* Returns current tid if listener runs on one thread only. */ + if (!atleast2(mask)) + goto end; + + /* Loop over all threads and return the less loaded one. This needs to + * be just an approximation so it's not important if the selected + * thread load has varied since its selection. + */ + + for (i = tg->base; mask; mask >>= 1, i++) { + if (!(mask & 0x1)) + continue; + + load_thr = HA_ATOMIC_LOAD(&ha_thread_ctx[i].nb_rhttp_conns); + if (load_min > load_thr) { + ret = i; + load_min = load_thr; + } + } + + end: + return ret; +} + +/* Detach from its thread and assign it to thread. The task is + * queued to be woken up on the new thread. + */ +static void task_migrate(struct task *task, uint new_tid) +{ + task_unlink_wq(task); + task->expire = TICK_ETERNITY; + task_set_thread(task, new_tid); + task_wakeup(task, TASK_WOKEN_MSG); +} + struct task *rhttp_process(struct task *task, void *ctx, unsigned int state) { struct listener *l = ctx; @@ -179,6 +223,8 @@ struct task *rhttp_process(struct task *task, void *ctx, unsigned int state) /* conn_free() must report preconnect failure using rhttp_notify_preconn_err(). */ BUG_ON(l->rx.rhttp.pend_conn); + + l->rx.rhttp.task->expire = TICKS_TO_MS(now_ms); } else { /* Spurious receiver task woken up despite pend_conn not ready/on error. */ @@ -192,6 +238,14 @@ struct task *rhttp_process(struct task *task, void *ctx, unsigned int state) else { struct server *srv = l->rx.rhttp.srv; + if ((state & TASK_WOKEN_ANY) != TASK_WOKEN_MSG) { + unsigned int new_tid = select_thread(l); + if (new_tid != tid) { + task_migrate(l->rx.rhttp.task, new_tid); + return task; + } + } + /* No pending reverse connection, prepare a new one. Store it in the * listener and return NULL. Connection will be returned later after * reversal is completed. @@ -222,8 +276,16 @@ int rhttp_bind_listener(struct listener *listener, char *errmsg, int errlen) struct ist be_name, sv_name; char *name = NULL; - /* TODO for the moment reverse conn creation is pinned to the first thread only. */ - if (!(task = task_new_here())) { + unsigned long mask; + uint task_tid; + + if (listener->state != LI_ASSIGNED) + return ERR_NONE; /* already bound */ + + /* Retrieve the first thread usable for this listener. */ + mask = listener->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled); + task_tid = my_ffsl(mask) + ha_tgroup_info[listener->rx.bind_tgroup].base; + if (!(task = task_new_on(task_tid))) { snprintf(errmsg, errlen, "Out of memory."); goto err; } @@ -336,6 +398,15 @@ struct connection *rhttp_accept_conn(struct listener *l, int *status) /* Instantiate a new conn if maxconn not yet exceeded. */ if (l->nbconn <= l->bind_conf->maxconn) { + /* Try first if a new thread should be used for the new connection. */ + unsigned int new_tid = select_thread(l); + if (new_tid != tid) { + task_migrate(l->rx.rhttp.task, new_tid); + *status = CO_AC_DONE; + return NULL; + } + + /* No need to use a new thread, use the opportunity to alloc the connection right now. */ l->rx.rhttp.pend_conn = new_reverse_conn(l, l->rx.rhttp.srv); if (!l->rx.rhttp.pend_conn) { *status = CO_AC_PAUSE; @@ -366,8 +437,10 @@ void rhttp_unbind_receiver(struct listener *l) int rhttp_set_affinity(struct connection *conn, int new_tid) { - /* TODO reversal conn rebinding after is disabled for the moment as we - * did not test possible race conditions. + /* Explicitely disable connection thread migration on accept. Indeed, + * it's unsafe to move a connection with its FD to another thread. Note + * that active reverse task thread migration should be sufficient to + * ensure repartition of reversed connections accross listener threads. */ return -1; }