MEDIUM: stream: interrupt costly rulesets after too many evaluations

It is not rare to see configurations with a large number of "tcp-request
content" or "http-request" rules for instance. A large number of rules
combined with cpu-demanding actions (e.g.: actions that work on content)
may create thread contention as all the rules from a given ruleset are
evaluated under the same polling loop if the evaluation is not interrupted

Thus, in this patch we add extra logic around "tcp-request content",
"tcp-response content", "http-request" and "http-response" rulesets, so
that when a certain number of rules are evaluated under the single polling
loop, we force the evaluating function to yield. As such, the rule which
was about to be evaluated is saved, and the function starts evaluating
rules from the save pointer when it returns (in the next polling loop).

We use task_wakeup(task, TASK_WOKEN_MSG) to explicitly wake the task so
that no time is wasted and the processing is resumed ASAP. TASK_WOKEN_MSG
is mandatory here because process_stream() expects TASK_WOKEN_MSG for
explicit analyzers re-evaluation.

rules_bcount stream's attribute was added to count how manu rules were
evaluated since last interruption (yield). Also, SF_RULE_FYIELD flag
was added to know that the s->current_rule was assigned due to forced
yield and not regular yield.

By default haproxy will enforce a yield every 50 rules, this behavior
can be configured using the "tune.max-rules-at-once" global keyword.

There is a limitation though: for now, if the ACT_OPT_FINAL flag is set
on act_opts, we consider it is not safe to yield (as it is already the
case for automatic yield). In this case instead of yielding an taking
the risk of not being called back, we skip the yield and hope it will
not create contention. This is something we should ideally try to
improve in order to yield in all conditions.
This commit is contained in:
Aurelien DARRAGON 2025-01-30 13:26:42 +01:00
parent 04bbfa4354
commit 0846638f7f
10 changed files with 165 additions and 21 deletions

View File

@ -1677,6 +1677,7 @@ The following keywords are supported in the "global" section :
- tune.maxaccept
- tune.maxpollevents
- tune.maxrewrite
- tune.max-rules-at-once
- tune.memory.hot-size
- tune.pattern.cache-size
- tune.peers.max-updates-at-once
@ -4083,6 +4084,28 @@ tune.maxrewrite <number>
larger than that. This means you don't have to worry about it when changing
bufsize.
tune.max-rules-at-once <number>
Sets the maximum number of rules that can be evaluated at once in ruleset
evaluating functions, provided that they support yielding. Indeed, it is
not rare to see configurations with a large number of "tcp-request content"
or "http-request" rules for instance. A large number of rules combined with
cpu-demanding actions (e.g.: actions that work on content) may create thread
contention as all the rules from a given ruleset are evaluated under the same
polling loop if the evaluation is not interrupted. This option ensures that no
more than <number> number of rules may be excecuted under the same polling
loop for content-oriented rulesets (those that already support yielding due
to content inspection). What it does is that it forces the evaluating function
to yield, so that it comes back on the next polling loop to continues the
evaluation.
Affected rulesets are:
- "tcp-request content"
- "tcp-response content"
- "http-request"
- "http-response"
The default value is 50.
tune.memory.hot-size <number>
Sets the per-thread amount of memory that will be kept hot in the local cache
and will never be recoverable by other threads. Access to this memory is very

View File

@ -194,6 +194,15 @@
#define MAX_POLL_EVENTS 200
#endif
// the max number of rules evaluated in one call to rule handling function.
// If the function is able to yield, a forced yield will be enforced when
// reaching this value, else the evaluation will continue. Lowering this
// value may help to fight against thread contention with cpu-intensive
// rulesets
#ifndef MAX_RULES_AT_ONCE
#define MAX_RULES_AT_ONCE 50
#endif
/* eternity when exprimed in timeval */
#ifndef TV_ETERNITY
#define TV_ETERNITY (~0UL)

View File

@ -161,6 +161,7 @@ struct global {
unsigned char cluster_secret[16]; /* 128 bits of an SHA1 digest of a secret defined as ASCII string */
struct {
int maxpollevents; /* max number of poll events at once */
int max_rules_at_once; /* max number of rules excecuted in a single evaluation loop */
int maxaccept; /* max number of consecutive accept() */
int options; /* various tuning options */
int runqueue_depth;/* max number of tasks to run at once */

View File

@ -190,6 +190,7 @@ enum {
enum rule_result {
HTTP_RULE_RES_CONT = 0, /* nothing special, continue rules evaluation */
HTTP_RULE_RES_YIELD, /* call me later because some data is missing. */
HTTP_RULE_RES_FYIELD, /* forced yield, not because of missing data */
HTTP_RULE_RES_STOP, /* stopped processing on an accept */
HTTP_RULE_RES_DENY, /* deny (or tarpit if TX_CLTARPIT) */
HTTP_RULE_RES_ABRT, /* abort request, msg already sent (eg: auth) */

View File

@ -89,6 +89,7 @@
#define SF_SRC_ADDR 0x00800000 /* get the source ip/port with getsockname */
#define SF_BC_MARK 0x01000000 /* need to set specific mark on backend/srv conn upon connect */
#define SF_BC_TOS 0x02000000 /* need to set specific tos on backend/srv conn upon connect */
#define SF_RULE_FYIELD 0x04000000 /* s->current_rule set because of forced yield */
/* This function is used to report flags in debugging tools. Please reflect
* below any single-bit flag addition above in the same order via the
@ -103,7 +104,8 @@ static forceinline char *strm_show_flags(char *buf, size_t len, const char *deli
_(0);
/* flags & enums */
_(SF_IGNORE_PRST, _(SF_SRV_REUSED, _(SF_SRV_REUSED_ANTICIPATED,
_(SF_WEBSOCKET, _(SF_SRC_ADDR, _(SF_BC_MARK, _(SF_BC_TOS)))))));
_(SF_WEBSOCKET, _(SF_SRC_ADDR, _(SF_BC_MARK, _(SF_BC_TOS,
_(SF_RULE_FYIELD))))))));
_e(SF_FINST_MASK, SF_FINST_R, _e(SF_FINST_MASK, SF_FINST_C,
_e(SF_FINST_MASK, SF_FINST_H, _e(SF_FINST_MASK, SF_FINST_D,
@ -259,6 +261,8 @@ struct stream {
unsigned int conn_exp; /* wake up time for connect, queue, turn-around, ... */
unsigned int conn_err_type; /* first error detected, one of STRM_ET_* */
uint32_t rules_bcount; /* number of rules evaluated since last yield */
struct stream *parent; /* Pointer to the parent stream, if any. NULL most of time */
struct list list; /* position in the thread's streams list */

View File

@ -1106,6 +1106,17 @@ static int cfg_parse_global_tune_opts(char **args, int section_type,
return 0;
}
else if (strcmp(args[0], "tune.max-rules-at-once") == 0) {
if (*(args[1]) == 0) {
memprintf(err, "'%s' expects a positive numeric value", args[0]);
return -1;
}
global.tune.max_rules_at_once = atoi(args[1]);
if (global.tune.max_rules_at_once < 0) {
memprintf(err, "'%s' expects a positive numeric value", args[0]);
return -1;
}
}
else if (strcmp(args[0], "tune.maxaccept") == 0) {
long max;
@ -1687,6 +1698,7 @@ static struct cfg_kw_list cfg_kws = {ILH, {
{ CFG_GLOBAL, "expose-experimental-directives", cfg_parse_global_non_std_directives },
{ CFG_GLOBAL, "tune.runqueue-depth", cfg_parse_global_tune_opts },
{ CFG_GLOBAL, "tune.maxpollevents", cfg_parse_global_tune_opts },
{ CFG_GLOBAL, "tune.max-rules-at-once", cfg_parse_global_tune_opts },
{ CFG_GLOBAL, "tune.maxaccept", cfg_parse_global_tune_opts },
{ CFG_GLOBAL, "tune.recv_enough", cfg_parse_global_tune_opts },
{ CFG_GLOBAL, "tune.bufsize", cfg_parse_global_tune_opts },

View File

@ -2157,6 +2157,9 @@ static void step_init_2(int argc, char** argv)
if (global.tune.maxpollevents <= 0)
global.tune.maxpollevents = MAX_POLL_EVENTS;
if (global.tune.max_rules_at_once <= 0)
global.tune.max_rules_at_once = MAX_RULES_AT_ONCE;
if (global.tune.runqueue_depth <= 0) {
/* tests on various thread counts from 1 to 64 have shown an
* optimal queue depth following roughly 1/sqrt(threads).

View File

@ -404,6 +404,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
case HTTP_RULE_RES_YIELD: /* some data miss, call the function later. */
goto return_prx_yield;
case HTTP_RULE_RES_FYIELD: /* we must try again after context-switch */
goto return_prx_fyield;
case HTTP_RULE_RES_CONT:
case HTTP_RULE_RES_STOP: /* nothing to do */
break;
@ -635,6 +638,12 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
DBG_TRACE_DEVEL("waiting for more data",
STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn);
return 0;
return_prx_fyield:
channel_dont_connect(req);
DBG_TRACE_DEVEL("forced yield",
STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn);
return 0;
}
/* This function performs all the processing enabled for the current request.
@ -1773,6 +1782,9 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
case HTTP_RULE_RES_YIELD: /* some data miss, call the function later. */
goto return_prx_yield;
case HTTP_RULE_RES_FYIELD: /* we must try again after context-switch */
goto return_prx_fyield;
case HTTP_RULE_RES_CONT:
case HTTP_RULE_RES_STOP: /* nothing to do */
break;
@ -2034,6 +2046,13 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
DBG_TRACE_DEVEL("waiting for more data",
STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn);
return 0;
return_prx_fyield:
channel_dont_close(rep);
DBG_TRACE_DEVEL("forced yield",
STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn);
return 0;
}
/* This function is an analyser which forwards response body (including chunk
@ -2733,16 +2752,27 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis
enum rule_result rule_ret = HTTP_RULE_RES_CONT;
int act_opts = 0;
if ((s->scf->flags & SC_FL_ERROR) ||
((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) &&
(px->options & PR_O_ABRT_CLOSE)))
act_opts |= ACT_OPT_FINAL;
/* If "the current_rule_list" match the executed rule list, we are in
* resume condition. If a resume is needed it is always in the action
* and never in the ACL or converters. In this case, we initialise the
* current rule, and go to the action execution point.
*/
if (s->current_rule) {
int forced = s->flags & SF_RULE_FYIELD;
rule = s->current_rule;
s->current_rule = NULL;
if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules))
s->flags &= ~SF_RULE_FYIELD;
if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) {
if (forced)
goto resume_rule;
goto resume_execution;
}
}
s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules);
@ -2751,6 +2781,18 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis
txn->req.flags &= ~HTTP_MSGF_SOFT_RW;
list_for_each_entry(rule, s->current_rule_list, list) {
resume_rule:
/* check if budget is exceeded and we need to continue on the next
* polling loop, unless we know that we cannot yield
*/
if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) {
s->current_rule = rule;
s->flags |= SF_RULE_FYIELD;
rule_ret = HTTP_RULE_RES_FYIELD;
task_wakeup(s->task, TASK_WOKEN_MSG);
goto end;
}
/* check optional condition */
if (!acl_match_cond(rule->cond, px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL))
continue;
@ -2762,11 +2804,6 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis
/* Always call the action function if defined */
if (rule->action_ptr) {
if ((s->scf->flags & SC_FL_ERROR) ||
((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) &&
(px->options & PR_O_ABRT_CLOSE)))
act_opts |= ACT_OPT_FINAL;
if (!(s->scf->flags & SC_FL_ERROR) & !(s->req.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = STRM_ENTITY_NONE;
s->waiting_entity.ptr = NULL;
@ -2876,7 +2913,7 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis
end:
/* if the ruleset evaluation is finished reset the strict mode */
if (rule_ret != HTTP_RULE_RES_YIELD)
if (rule_ret != HTTP_RULE_RES_YIELD && rule_ret != HTTP_RULE_RES_FYIELD)
txn->req.flags &= ~HTTP_MSGF_SOFT_RW;
/* we reached the end of the rules, nothing to report */
@ -2884,8 +2921,8 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis
}
/* Executes the http-response rules <rules> for stream <s> and proxy <px>. It
* returns one of 5 possible statuses: HTTP_RULE_RES_CONT, HTTP_RULE_RES_STOP,
* HTTP_RULE_RES_DONE, HTTP_RULE_RES_YIELD, or HTTP_RULE_RES_BADREQ. If *CONT
* returns one of 6 possible statuses: HTTP_RULE_RES_CONT, HTTP_RULE_RES_STOP,
* HTTP_RULE_RES_DONE, HTTP_RULE_RES_(F)YIELD, or HTTP_RULE_RES_BADREQ. If *CONT
* is returned, the process can continue the evaluation of next rule list. If
* *STOP or *DONE is returned, the process must stop the evaluation. If *BADREQ
* is returned, it means the operation could not be processed and a server error
@ -2903,16 +2940,27 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis
if (final)
act_opts |= ACT_OPT_FINAL;
if ((s->scf->flags & SC_FL_ERROR) ||
((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) &&
(px->options & PR_O_ABRT_CLOSE)))
act_opts |= ACT_OPT_FINAL;
/* If "the current_rule_list" match the executed rule list, we are in
* resume condition. If a resume is needed it is always in the action
* and never in the ACL or converters. In this case, we initialise the
* current rule, and go to the action execution point.
*/
if (s->current_rule) {
int forced = s->flags & SF_RULE_FYIELD;
rule = s->current_rule;
s->current_rule = NULL;
if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules))
s->flags &= ~SF_RULE_FYIELD;
if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) {
if (forced)
goto resume_rule;
goto resume_execution;
}
}
s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules);
@ -2922,6 +2970,18 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis
txn->rsp.flags &= ~HTTP_MSGF_SOFT_RW;
list_for_each_entry(rule, s->current_rule_list, list) {
resume_rule:
/* check if budget is exceeded and we need to continue on the next
* polling loop, unless we know that we cannot yield
*/
if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) {
s->current_rule = rule;
s->flags |= SF_RULE_FYIELD;
rule_ret = HTTP_RULE_RES_YIELD;
task_wakeup(s->task, TASK_WOKEN_MSG);
goto end;
}
/* check optional condition */
if (!acl_match_cond(rule->cond, px, sess, s, SMP_OPT_DIR_RES|SMP_OPT_FINAL))
continue;
@ -2933,11 +2993,6 @@ resume_execution:
/* Always call the action function if defined */
if (rule->action_ptr) {
if ((s->scf->flags & SC_FL_ERROR) ||
((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) &&
(px->options & PR_O_ABRT_CLOSE)))
act_opts |= ACT_OPT_FINAL;
if (!(s->scb->flags & SC_FL_ERROR) & !(s->res.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = STRM_ENTITY_NONE;
s->waiting_entity.ptr = NULL;
@ -3037,7 +3092,7 @@ resume_execution:
end:
/* if the ruleset evaluation is finished reset the strict mode */
if (rule_ret != HTTP_RULE_RES_YIELD)
if (rule_ret != HTTP_RULE_RES_YIELD && rule_ret != HTTP_RULE_RES_FYIELD)
txn->rsp.flags &= ~HTTP_MSGF_SOFT_RW;
/* we reached the end of the rules, nothing to report */

View File

@ -2018,6 +2018,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
*/
ana_list = ana_back = req->analysers;
s->rules_bcount = 0;
while (ana_list && max_loops--) {
/* Warning! ensure that analysers are always placed in ascending order! */
ANALYZE (s, req, flt_start_analyze, ana_list, ana_back, AN_REQ_FLT_START_FE);
@ -2100,6 +2101,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
*/
ana_list = ana_back = res->analysers;
s->rules_bcount = 0;
while (ana_list && max_loops--) {
/* Warning! ensure that analysers are always placed in ascending order! */
ANALYZE (s, res, flt_start_analyze, ana_list, ana_back, AN_RES_FLT_START_FE);

View File

@ -135,22 +135,39 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit)
* current rule, and go to the action execution point.
*/
if (s->current_rule) {
int forced = s->flags & SF_RULE_FYIELD;
rule = s->current_rule;
s->current_rule = NULL;
s->flags &= ~SF_RULE_FYIELD;
if (!(req->flags & SC_FL_ERROR) && !(req->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = STRM_ENTITY_NONE;
s->waiting_entity.ptr = NULL;
}
if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules)
if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) {
if (forced)
goto resume_rule;
goto resume_execution;
}
}
s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules);
restart:
list_for_each_entry(rule, s->current_rule_list, list) {
enum acl_test_res ret = ACL_TEST_PASS;
resume_rule:
/* check if budget is exceeded and we need to continue on the next
* polling loop, unless we know that we cannot yield
*/
if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) {
s->current_rule = rule;
s->flags |= SF_RULE_FYIELD;
task_wakeup(s->task, TASK_WOKEN_MSG);
goto missing_data;
}
if (rule->cond) {
enum acl_test_res ret = ACL_TEST_PASS;
ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_REQ | partial);
if (ret == ACL_TEST_MISS)
goto missing_data;
@ -331,22 +348,39 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit)
* current rule, and go to the action execution point.
*/
if (s->current_rule) {
int forced = s->flags & SF_RULE_FYIELD;
rule = s->current_rule;
s->current_rule = NULL;
s->flags &= ~SF_RULE_FYIELD;
if (!(rep->flags & SC_FL_ERROR) && !(rep->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = STRM_ENTITY_NONE;
s->waiting_entity.ptr = NULL;
}
if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules)
if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) {
if (forced)
goto resume_rule;
goto resume_execution;
}
}
s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules);
restart:
list_for_each_entry(rule, s->current_rule_list, list) {
enum acl_test_res ret = ACL_TEST_PASS;
resume_rule:
/* check if budget is exceeded and we need to continue on the next
* polling loop, unless we know that we cannot yield
*/
if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) {
s->current_rule = rule;
s->flags |= SF_RULE_FYIELD;
task_wakeup(s->task, TASK_WOKEN_MSG);
goto missing_data;
}
if (rule->cond) {
enum acl_test_res ret = ACL_TEST_PASS;
ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_RES | partial);
if (ret == ACL_TEST_MISS)
goto missing_data;