diff --git a/doc/configuration.txt b/doc/configuration.txt index 2d6ba4b14..7c208c47d 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -1677,6 +1677,7 @@ The following keywords are supported in the "global" section : - tune.maxaccept - tune.maxpollevents - tune.maxrewrite + - tune.max-rules-at-once - tune.memory.hot-size - tune.pattern.cache-size - tune.peers.max-updates-at-once @@ -4083,6 +4084,28 @@ tune.maxrewrite larger than that. This means you don't have to worry about it when changing bufsize. +tune.max-rules-at-once + Sets the maximum number of rules that can be evaluated at once in ruleset + evaluating functions, provided that they support yielding. Indeed, it is + not rare to see configurations with a large number of "tcp-request content" + or "http-request" rules for instance. A large number of rules combined with + cpu-demanding actions (e.g.: actions that work on content) may create thread + contention as all the rules from a given ruleset are evaluated under the same + polling loop if the evaluation is not interrupted. This option ensures that no + more than number of rules may be excecuted under the same polling + loop for content-oriented rulesets (those that already support yielding due + to content inspection). What it does is that it forces the evaluating function + to yield, so that it comes back on the next polling loop to continues the + evaluation. + + Affected rulesets are: + - "tcp-request content" + - "tcp-response content" + - "http-request" + - "http-response" + + The default value is 50. + tune.memory.hot-size Sets the per-thread amount of memory that will be kept hot in the local cache and will never be recoverable by other threads. Access to this memory is very diff --git a/include/haproxy/defaults.h b/include/haproxy/defaults.h index f70bf4309..dbb39f4c3 100644 --- a/include/haproxy/defaults.h +++ b/include/haproxy/defaults.h @@ -194,6 +194,15 @@ #define MAX_POLL_EVENTS 200 #endif +// the max number of rules evaluated in one call to rule handling function. +// If the function is able to yield, a forced yield will be enforced when +// reaching this value, else the evaluation will continue. Lowering this +// value may help to fight against thread contention with cpu-intensive +// rulesets +#ifndef MAX_RULES_AT_ONCE +#define MAX_RULES_AT_ONCE 50 +#endif + /* eternity when exprimed in timeval */ #ifndef TV_ETERNITY #define TV_ETERNITY (~0UL) diff --git a/include/haproxy/global-t.h b/include/haproxy/global-t.h index 6d4a3986c..508ef846a 100644 --- a/include/haproxy/global-t.h +++ b/include/haproxy/global-t.h @@ -161,6 +161,7 @@ struct global { unsigned char cluster_secret[16]; /* 128 bits of an SHA1 digest of a secret defined as ASCII string */ struct { int maxpollevents; /* max number of poll events at once */ + int max_rules_at_once; /* max number of rules excecuted in a single evaluation loop */ int maxaccept; /* max number of consecutive accept() */ int options; /* various tuning options */ int runqueue_depth;/* max number of tasks to run at once */ diff --git a/include/haproxy/http_ana-t.h b/include/haproxy/http_ana-t.h index f4f13d070..10b8273d1 100644 --- a/include/haproxy/http_ana-t.h +++ b/include/haproxy/http_ana-t.h @@ -190,6 +190,7 @@ enum { enum rule_result { HTTP_RULE_RES_CONT = 0, /* nothing special, continue rules evaluation */ HTTP_RULE_RES_YIELD, /* call me later because some data is missing. */ + HTTP_RULE_RES_FYIELD, /* forced yield, not because of missing data */ HTTP_RULE_RES_STOP, /* stopped processing on an accept */ HTTP_RULE_RES_DENY, /* deny (or tarpit if TX_CLTARPIT) */ HTTP_RULE_RES_ABRT, /* abort request, msg already sent (eg: auth) */ diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h index 335fed8c9..1f72dc4ed 100644 --- a/include/haproxy/stream-t.h +++ b/include/haproxy/stream-t.h @@ -89,6 +89,7 @@ #define SF_SRC_ADDR 0x00800000 /* get the source ip/port with getsockname */ #define SF_BC_MARK 0x01000000 /* need to set specific mark on backend/srv conn upon connect */ #define SF_BC_TOS 0x02000000 /* need to set specific tos on backend/srv conn upon connect */ +#define SF_RULE_FYIELD 0x04000000 /* s->current_rule set because of forced yield */ /* This function is used to report flags in debugging tools. Please reflect * below any single-bit flag addition above in the same order via the @@ -103,7 +104,8 @@ static forceinline char *strm_show_flags(char *buf, size_t len, const char *deli _(0); /* flags & enums */ _(SF_IGNORE_PRST, _(SF_SRV_REUSED, _(SF_SRV_REUSED_ANTICIPATED, - _(SF_WEBSOCKET, _(SF_SRC_ADDR, _(SF_BC_MARK, _(SF_BC_TOS))))))); + _(SF_WEBSOCKET, _(SF_SRC_ADDR, _(SF_BC_MARK, _(SF_BC_TOS, + _(SF_RULE_FYIELD)))))))); _e(SF_FINST_MASK, SF_FINST_R, _e(SF_FINST_MASK, SF_FINST_C, _e(SF_FINST_MASK, SF_FINST_H, _e(SF_FINST_MASK, SF_FINST_D, @@ -259,6 +261,8 @@ struct stream { unsigned int conn_exp; /* wake up time for connect, queue, turn-around, ... */ unsigned int conn_err_type; /* first error detected, one of STRM_ET_* */ + uint32_t rules_bcount; /* number of rules evaluated since last yield */ + struct stream *parent; /* Pointer to the parent stream, if any. NULL most of time */ struct list list; /* position in the thread's streams list */ diff --git a/src/cfgparse-global.c b/src/cfgparse-global.c index f050b5b49..4064b2b9c 100644 --- a/src/cfgparse-global.c +++ b/src/cfgparse-global.c @@ -1106,6 +1106,17 @@ static int cfg_parse_global_tune_opts(char **args, int section_type, return 0; } + else if (strcmp(args[0], "tune.max-rules-at-once") == 0) { + if (*(args[1]) == 0) { + memprintf(err, "'%s' expects a positive numeric value", args[0]); + return -1; + } + global.tune.max_rules_at_once = atoi(args[1]); + if (global.tune.max_rules_at_once < 0) { + memprintf(err, "'%s' expects a positive numeric value", args[0]); + return -1; + } + } else if (strcmp(args[0], "tune.maxaccept") == 0) { long max; @@ -1687,6 +1698,7 @@ static struct cfg_kw_list cfg_kws = {ILH, { { CFG_GLOBAL, "expose-experimental-directives", cfg_parse_global_non_std_directives }, { CFG_GLOBAL, "tune.runqueue-depth", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.maxpollevents", cfg_parse_global_tune_opts }, + { CFG_GLOBAL, "tune.max-rules-at-once", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.maxaccept", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.recv_enough", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.bufsize", cfg_parse_global_tune_opts }, diff --git a/src/haproxy.c b/src/haproxy.c index 22a17adc7..3f5b9db09 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2157,6 +2157,9 @@ static void step_init_2(int argc, char** argv) if (global.tune.maxpollevents <= 0) global.tune.maxpollevents = MAX_POLL_EVENTS; + if (global.tune.max_rules_at_once <= 0) + global.tune.max_rules_at_once = MAX_RULES_AT_ONCE; + if (global.tune.runqueue_depth <= 0) { /* tests on various thread counts from 1 to 64 have shown an * optimal queue depth following roughly 1/sqrt(threads). diff --git a/src/http_ana.c b/src/http_ana.c index 33cfbb0ce..09a53b0f3 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -404,6 +404,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s case HTTP_RULE_RES_YIELD: /* some data miss, call the function later. */ goto return_prx_yield; + case HTTP_RULE_RES_FYIELD: /* we must try again after context-switch */ + goto return_prx_fyield; + case HTTP_RULE_RES_CONT: case HTTP_RULE_RES_STOP: /* nothing to do */ break; @@ -635,6 +638,12 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); return 0; + + return_prx_fyield: + channel_dont_connect(req); + DBG_TRACE_DEVEL("forced yield", + STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); + return 0; } /* This function performs all the processing enabled for the current request. @@ -1773,6 +1782,9 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s case HTTP_RULE_RES_YIELD: /* some data miss, call the function later. */ goto return_prx_yield; + case HTTP_RULE_RES_FYIELD: /* we must try again after context-switch */ + goto return_prx_fyield; + case HTTP_RULE_RES_CONT: case HTTP_RULE_RES_STOP: /* nothing to do */ break; @@ -2034,6 +2046,13 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); return 0; + + return_prx_fyield: + channel_dont_close(rep); + DBG_TRACE_DEVEL("forced yield", + STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); + return 0; + } /* This function is an analyser which forwards response body (including chunk @@ -2733,16 +2752,27 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis enum rule_result rule_ret = HTTP_RULE_RES_CONT; int act_opts = 0; + if ((s->scf->flags & SC_FL_ERROR) || + ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && + (px->options & PR_O_ABRT_CLOSE))) + act_opts |= ACT_OPT_FINAL; + /* If "the current_rule_list" match the executed rule list, we are in * resume condition. If a resume is needed it is always in the action * and never in the ACL or converters. In this case, we initialise the * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; - if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) + s->flags &= ~SF_RULE_FYIELD; + if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); @@ -2751,6 +2781,18 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis txn->req.flags &= ~HTTP_MSGF_SOFT_RW; list_for_each_entry(rule, s->current_rule_list, list) { + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + rule_ret = HTTP_RULE_RES_FYIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto end; + } + /* check optional condition */ if (!acl_match_cond(rule->cond, px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL)) continue; @@ -2762,11 +2804,6 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis /* Always call the action function if defined */ if (rule->action_ptr) { - if ((s->scf->flags & SC_FL_ERROR) || - ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && - (px->options & PR_O_ABRT_CLOSE))) - act_opts |= ACT_OPT_FINAL; - if (!(s->scf->flags & SC_FL_ERROR) & !(s->req.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; @@ -2876,7 +2913,7 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis end: /* if the ruleset evaluation is finished reset the strict mode */ - if (rule_ret != HTTP_RULE_RES_YIELD) + if (rule_ret != HTTP_RULE_RES_YIELD && rule_ret != HTTP_RULE_RES_FYIELD) txn->req.flags &= ~HTTP_MSGF_SOFT_RW; /* we reached the end of the rules, nothing to report */ @@ -2884,8 +2921,8 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis } /* Executes the http-response rules for stream and proxy . It - * returns one of 5 possible statuses: HTTP_RULE_RES_CONT, HTTP_RULE_RES_STOP, - * HTTP_RULE_RES_DONE, HTTP_RULE_RES_YIELD, or HTTP_RULE_RES_BADREQ. If *CONT + * returns one of 6 possible statuses: HTTP_RULE_RES_CONT, HTTP_RULE_RES_STOP, + * HTTP_RULE_RES_DONE, HTTP_RULE_RES_(F)YIELD, or HTTP_RULE_RES_BADREQ. If *CONT * is returned, the process can continue the evaluation of next rule list. If * *STOP or *DONE is returned, the process must stop the evaluation. If *BADREQ * is returned, it means the operation could not be processed and a server error @@ -2903,16 +2940,27 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis if (final) act_opts |= ACT_OPT_FINAL; + if ((s->scf->flags & SC_FL_ERROR) || + ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && + (px->options & PR_O_ABRT_CLOSE))) + act_opts |= ACT_OPT_FINAL; + /* If "the current_rule_list" match the executed rule list, we are in * resume condition. If a resume is needed it is always in the action * and never in the ACL or converters. In this case, we initialise the * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; - if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) + s->flags &= ~SF_RULE_FYIELD; + if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); @@ -2922,6 +2970,18 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis txn->rsp.flags &= ~HTTP_MSGF_SOFT_RW; list_for_each_entry(rule, s->current_rule_list, list) { + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + rule_ret = HTTP_RULE_RES_YIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto end; + } + /* check optional condition */ if (!acl_match_cond(rule->cond, px, sess, s, SMP_OPT_DIR_RES|SMP_OPT_FINAL)) continue; @@ -2933,11 +2993,6 @@ resume_execution: /* Always call the action function if defined */ if (rule->action_ptr) { - if ((s->scf->flags & SC_FL_ERROR) || - ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && - (px->options & PR_O_ABRT_CLOSE))) - act_opts |= ACT_OPT_FINAL; - if (!(s->scb->flags & SC_FL_ERROR) & !(s->res.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; @@ -3037,7 +3092,7 @@ resume_execution: end: /* if the ruleset evaluation is finished reset the strict mode */ - if (rule_ret != HTTP_RULE_RES_YIELD) + if (rule_ret != HTTP_RULE_RES_YIELD && rule_ret != HTTP_RULE_RES_FYIELD) txn->rsp.flags &= ~HTTP_MSGF_SOFT_RW; /* we reached the end of the rules, nothing to report */ diff --git a/src/stream.c b/src/stream.c index d0a4a96d4..f673c4f4f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2018,6 +2018,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) */ ana_list = ana_back = req->analysers; + s->rules_bcount = 0; while (ana_list && max_loops--) { /* Warning! ensure that analysers are always placed in ascending order! */ ANALYZE (s, req, flt_start_analyze, ana_list, ana_back, AN_REQ_FLT_START_FE); @@ -2100,6 +2101,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) */ ana_list = ana_back = res->analysers; + s->rules_bcount = 0; while (ana_list && max_loops--) { /* Warning! ensure that analysers are always placed in ascending order! */ ANALYZE (s, res, flt_start_analyze, ana_list, ana_back, AN_RES_FLT_START_FE); diff --git a/src/tcp_rules.c b/src/tcp_rules.c index d4c40258b..c81f271fa 100644 --- a/src/tcp_rules.c +++ b/src/tcp_rules.c @@ -135,22 +135,39 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit) * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; + s->flags &= ~SF_RULE_FYIELD; if (!(req->flags & SC_FL_ERROR) && !(req->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; } - if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) + if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); restart: list_for_each_entry(rule, s->current_rule_list, list) { - enum acl_test_res ret = ACL_TEST_PASS; + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto missing_data; + } if (rule->cond) { + enum acl_test_res ret = ACL_TEST_PASS; + ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_REQ | partial); if (ret == ACL_TEST_MISS) goto missing_data; @@ -331,22 +348,39 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit) * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; + s->flags &= ~SF_RULE_FYIELD; if (!(rep->flags & SC_FL_ERROR) && !(rep->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; } - if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) + if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); restart: list_for_each_entry(rule, s->current_rule_list, list) { - enum acl_test_res ret = ACL_TEST_PASS; + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto missing_data; + } if (rule->cond) { + enum acl_test_res ret = ACL_TEST_PASS; + ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_RES | partial); if (ret == ACL_TEST_MISS) goto missing_data;