diff --git a/doc/configuration.txt b/doc/configuration.txt index 28ca4c6bc..78216b7e2 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -1192,6 +1192,7 @@ The following keywords are supported in the "global" section : - tune.lua.service-timeout - tune.lua.session-timeout - tune.lua.task-timeout + - tune.max-checks-per-thread - tune.maxaccept - tune.maxpollevents - tune.maxrewrite @@ -3280,6 +3281,17 @@ tune.lua.task-timeout remain alive during of the lifetime of HAProxy. For example, a task used to check servers. +tune.max-checks-per-thread + Sets the number of active checks per thread above which a thread will + actively try to search a less loaded thread to run the health check. The + default value is zero, meaning no such limit is set. It may be needed in + certain environments running an extremely large number of expensive checks + with many threads when the load appears unequal and may make health checks + to randomly time out on startup, typically when using OpenSSL 3.0 which is + about 20 times more CPU-intensive on health checks than older ones. This will + have for result to try to level the health check work across all threads. The + vast majority of configurations do not need to touch this parameter. + tune.maxaccept Sets the maximum number of consecutive connections a process may accept in a row before switching to other work. In single process mode, higher numbers diff --git a/include/haproxy/global-t.h b/include/haproxy/global-t.h index 3523f631f..3f025c59c 100644 --- a/include/haproxy/global-t.h +++ b/include/haproxy/global-t.h @@ -169,6 +169,7 @@ struct global { unsigned short idle_timer; /* how long before an empty buffer is considered idle (ms) */ int nb_stk_ctr; /* number of stick counters, defaults to MAX_SESS_STKCTR */ int default_shards; /* default shards for listeners, or -1 (by-thread) or -2 (by-group) */ + uint max_checks_per_thread; /* if >0, no more than this concurrent checks per thread */ #ifdef USE_QUIC unsigned int quic_backend_max_idle_timeout; unsigned int quic_frontend_max_idle_timeout; diff --git a/src/check.c b/src/check.c index d19ec26d3..290d432c9 100644 --- a/src/check.c +++ b/src/check.c @@ -1138,6 +1138,24 @@ static inline int check_thread_cmp_load(int thr1, int thr2) return 0; } +/* returns <0, 0, >0 if check thread 1's active checks count is respectively + * higher than, equal, or lower than thread 2's. This is made to decide on + * forced migrations upon overload, so only a very little margin is applied + * here (~1%). For ease of remembering the direction, consider this returns + * active1 - active2. + */ +static inline int check_thread_cmp_active(int thr1, int thr2) +{ + uint t1_act = _HA_ATOMIC_LOAD(&ha_thread_ctx[thr1].active_checks); + uint t2_act = _HA_ATOMIC_LOAD(&ha_thread_ctx[thr2].active_checks); + + if (t1_act * 128 >= t2_act * 129) + return 1; + if (t2_act * 128 >= t1_act * 129) + return -1; + return 0; +} + /* manages a server health-check that uses a connection. Returns * the time the task accepts to wait, or TIME_ETERNITY for infinity. @@ -1168,35 +1186,50 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state) * the task again because we're setting CHK_ST_READY indicating * a migration. */ + uint run_checks = _HA_ATOMIC_LOAD(&th_ctx->running_checks); uint my_load = HA_ATOMIC_LOAD(&th_ctx->rq_total); + uint attempts = MIN(global.nbthread, 3); if (check->state & CHK_ST_READY) { /* check was migrated, active already counted */ activity[tid].check_adopted++; } - else if (my_load >= 3 && th_ctx->active_checks >= 3) { - uint new_tid = statistical_prng_range(global.nbthread); - - if (check_thread_cmp_load(tid, new_tid) > 0) { - /* Found one. Let's migrate the task over there. We have to - * remove it from the WQ first and kill its expire time - * otherwise the scheduler will reinsert it and trigger a - * BUG_ON() as we're not allowed to call task_queue() for a - * foreign thread. The recipient will restore the expiration. - */ - check->state |= CHK_ST_READY; - HA_ATOMIC_INC(&ha_thread_ctx[new_tid].active_checks); - task_unlink_wq(t); - t->expire = TICK_ETERNITY; - task_set_thread(t, new_tid); - task_wakeup(t, TASK_WOKEN_MSG); - TRACE_LEAVE(CHK_EV_TASK_WAKE, check); - return t; - } - /* check just woke up, count it as active */ - _HA_ATOMIC_INC(&th_ctx->active_checks); - } else { + /* first wakeup, let's check if another thread is less loaded + * than this one in order to smooth the load. If the current + * thread is not yet overloaded, we attempt an opportunistic + * migration to another thread that is not full and that is + * significantly less loaded. And if the current thread is + * already overloaded, we attempt a forced migration to a + * thread with less active checks. We try at most 3 random + * other thread. + */ + while (attempts-- > 0 && + my_load >= 3 && _HA_ATOMIC_LOAD(&th_ctx->active_checks) >= 3) { + uint new_tid = statistical_prng_range(global.nbthread); + + if (new_tid == tid) + continue; + + if (check_thread_cmp_active(tid, new_tid) > 0 && + (run_checks >= global.tune.max_checks_per_thread || + check_thread_cmp_load(tid, new_tid) > 0)) { + /* Found one. Let's migrate the task over there. We have to + * remove it from the WQ first and kill its expire time + * otherwise the scheduler will reinsert it and trigger a + * BUG_ON() as we're not allowed to call task_queue() for a + * foreign thread. The recipient will restore the expiration. + */ + check->state |= CHK_ST_READY; + HA_ATOMIC_INC(&ha_thread_ctx[new_tid].active_checks); + task_unlink_wq(t); + t->expire = TICK_ETERNITY; + task_set_thread(t, new_tid); + task_wakeup(t, TASK_WOKEN_MSG); + TRACE_LEAVE(CHK_EV_TASK_WAKE, check); + return t; + } + } /* check just woke up, count it as active */ _HA_ATOMIC_INC(&th_ctx->active_checks); } @@ -2501,6 +2534,26 @@ static int srv_parse_check_port(char **args, int *cur_arg, struct proxy *curpx, goto out; } +/* config parser for global "tune.max-checks-per-thread" */ +static int check_parse_global_max_checks(char **args, int section_type, struct proxy *curpx, + const struct proxy *defpx, const char *file, int line, + char **err) +{ + if (too_many_args(1, args, err, NULL)) + return -1; + global.tune.max_checks_per_thread = atoi(args[1]); + return 0; +} + +/* register "global" section keywords */ +static struct cfg_kw_list chk_cfg_kws = {ILH, { + { CFG_GLOBAL, "tune.max-checks-per-thread", check_parse_global_max_checks }, + { 0, NULL, NULL } +}}; + +INITCALL1(STG_REGISTER, cfg_register_keywords, &chk_cfg_kws); + +/* register "server" line keywords */ static struct srv_kw_list srv_kws = { "CHK", { }, { { "addr", srv_parse_addr, 1, 1, 1 }, /* IP address to send health to or to probe from agent-check */ { "agent-addr", srv_parse_agent_addr, 1, 1, 1 }, /* Enable an auxiliary agent check */