BUG/MEDIUM: listener: make sure the listener never accepts too many conns

We were not checking p->feconn nor the global actconn soon enough. In
older versions this could result in a frontend accepting more connections
than allowed by its maxconn or the global maxconn, exactly N-1 extra
connections where N is the number of threads, provided each of these
threads were running a different listener. But with the lock removal,
it became worse, the excess could be the listener's maxconn multiplied
by the number of threads. Among the nasty side effect was that LI_FULL
could be removed while the limit was still over and in some cases the
polling on the socket was no re-enabled.

This commit takes care of updating and checking p->feconn and the global
actconn *before* processing the connection, so that the listener can be
turned off before accepting the socket if needed. This requires to move
some of the bookkeeping operations form session to listen, which totally
makes sense in this context.

Now the limits are properly respected, even if a listener's maxconn is
over a frontend's. This only applies on top of the listener lock removal
series and doesn't have to be backported.
This commit is contained in:
Willy Tarreau 2019-02-27 19:32:32 +01:00
parent 01abd02508
commit 82c9789ac4
2 changed files with 67 additions and 22 deletions

View File

@ -578,6 +578,8 @@ void listener_accept(int fd)
struct proxy *p; struct proxy *p;
int max_accept; int max_accept;
int next_conn = 0; int next_conn = 0;
int next_feconn = 0;
int next_actconn = 0;
int expire; int expire;
int cfd; int cfd;
int ret; int ret;
@ -648,12 +650,15 @@ void listener_accept(int fd)
* worst case. If we fail due to system limits or temporary resource * worst case. If we fail due to system limits or temporary resource
* shortage, we try again 100ms later in the worst case. * shortage, we try again 100ms later in the worst case.
*/ */
for (; max_accept-- > 0; next_conn = 0) { for (; max_accept-- > 0; next_conn = next_feconn = next_actconn = 0) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr); socklen_t laddr = sizeof(addr);
unsigned int count; unsigned int count;
/* pre-increase the number of connections without going too far */ /* pre-increase the number of connections without going too far.
* We process the listener, then the proxy, then the process.
* We know which ones to unroll based on the next_xxx value.
*/
do { do {
count = l->nbconn; count = l->nbconn;
if (count >= l->maxconn) { if (count >= l->maxconn) {
@ -671,16 +676,43 @@ void listener_accept(int fd)
listener_full(l); listener_full(l);
} }
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { if (p) {
limit_listener(l, &global_listener_queue); do {
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ count = p->feconn;
if (count >= p->maxconn) {
/* the frontend was marked full or another
* thread is going to do it.
*/
next_feconn = 0;
goto end; goto end;
} }
next_feconn = count + 1;
} while (!HA_ATOMIC_CAS(&p->feconn, &count, next_feconn));
if (unlikely(p && p->feconn >= p->maxconn)) { if (unlikely(next_feconn == p->maxconn)) {
/* we just filled it */
limit_listener(l, &p->listener_queue); limit_listener(l, &p->listener_queue);
}
}
if (!(l->options & LI_O_UNLIMITED)) {
do {
count = actconn;
if (count >= global.maxconn) {
/* the process was marked full or another
* thread is going to do it.
*/
next_actconn = 0;
goto end; goto end;
} }
next_actconn = count + 1;
} while (!HA_ATOMIC_CAS(&actconn, &count, next_actconn));
if (unlikely(next_actconn == global.maxconn)) {
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
}
}
/* with sockpair@ we don't want to do an accept */ /* with sockpair@ we don't want to do an accept */
if (unlikely(l->addr.ss_family == AF_CUST_SOCKPAIR)) { if (unlikely(l->addr.ss_family == AF_CUST_SOCKPAIR)) {
@ -723,6 +755,10 @@ void listener_accept(int fd)
case EINTR: case EINTR:
case ECONNABORTED: case ECONNABORTED:
HA_ATOMIC_SUB(&l->nbconn, 1); HA_ATOMIC_SUB(&l->nbconn, 1);
if (p)
HA_ATOMIC_SUB(&p->feconn, 1);
if (!(l->options & LI_O_UNLIMITED))
HA_ATOMIC_SUB(&actconn, 1);
continue; continue;
case ENFILE: case ENFILE:
if (p) if (p)
@ -757,18 +793,20 @@ void listener_accept(int fd)
if (l->counters) if (l->counters)
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn); HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
if (p)
HA_ATOMIC_UPDATE_MAX(&p->fe_counters.conn_max, next_feconn);
proxy_inc_fe_conn_ctr(l, p);
if (!(l->options & LI_O_UNLIMITED)) { if (!(l->options & LI_O_UNLIMITED)) {
count = update_freq_ctr(&global.conn_per_sec, 1); count = update_freq_ctr(&global.conn_per_sec, 1);
HA_ATOMIC_UPDATE_MAX(&global.cps_max, count); HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
HA_ATOMIC_ADD(&actconn, 1);
} }
if (unlikely(cfd >= global.maxsock)) { if (unlikely(cfd >= global.maxsock)) {
send_log(p, LOG_EMERG, send_log(p, LOG_EMERG,
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
p->id); p->id);
if (!(l->options & LI_O_UNLIMITED))
HA_ATOMIC_SUB(&actconn, 1);
close(cfd); close(cfd);
limit_listener(l, &global_listener_queue); limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
@ -776,11 +814,13 @@ void listener_accept(int fd)
} }
/* past this point, l->accept() will automatically decrement /* past this point, l->accept() will automatically decrement
* l->nbconn and actconn once done. Setting next_conn=0 allows * l->nbconn, feconn and actconn once done. Setting next_*conn=0
* the error path not to rollback on nbconn. It's more convenient * allows the error path not to rollback on nbconn. It's more
* than duplicating all exit labels. * convenient than duplicating all exit labels.
*/ */
next_conn = 0; next_conn = 0;
next_feconn = 0;
next_actconn = 0;
#if defined(USE_THREAD) #if defined(USE_THREAD)
count = l->bind_conf->thr_count; count = l->bind_conf->thr_count;
@ -875,7 +915,14 @@ void listener_accept(int fd)
if (next_conn) if (next_conn)
HA_ATOMIC_SUB(&l->nbconn, 1); HA_ATOMIC_SUB(&l->nbconn, 1);
if (l->nbconn < l->maxconn && l->state == LI_FULL) { if (p && next_feconn)
HA_ATOMIC_SUB(&p->feconn, 1);
if (next_actconn)
HA_ATOMIC_SUB(&actconn, 1);
if ((l->state == LI_FULL && l->nbconn < l->maxconn) ||
(l->state == LI_LIMITED && ((!p || p->feconn < p->maxconn) && (actconn < global.maxconn)))) {
/* at least one thread has to this when quitting */ /* at least one thread has to this when quitting */
resume_listener(l); resume_listener(l);
@ -899,9 +946,12 @@ void listener_release(struct listener *l)
if (!(l->options & LI_O_UNLIMITED)) if (!(l->options & LI_O_UNLIMITED))
HA_ATOMIC_SUB(&actconn, 1); HA_ATOMIC_SUB(&actconn, 1);
if (fe)
HA_ATOMIC_SUB(&fe->feconn, 1);
HA_ATOMIC_SUB(&l->nbconn, 1); HA_ATOMIC_SUB(&l->nbconn, 1);
HA_ATOMIC_SUB(&l->thr_conn[tid], 1); HA_ATOMIC_SUB(&l->thr_conn[tid], 1);
if (l->state == LI_FULL)
if (l->state == LI_FULL || l->state == LI_LIMITED)
resume_listener(l); resume_listener(l);
/* Dequeues all of the listeners waiting for a resource */ /* Dequeues all of the listeners waiting for a resource */

View File

@ -55,10 +55,6 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
vars_init(&sess->vars, SCOPE_SESS); vars_init(&sess->vars, SCOPE_SESS);
sess->task = NULL; sess->task = NULL;
sess->t_handshake = -1; /* handshake not done yet */ sess->t_handshake = -1; /* handshake not done yet */
HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max,
HA_ATOMIC_ADD(&fe->feconn, 1));
if (li)
proxy_inc_fe_conn_ctr(li, fe);
HA_ATOMIC_ADD(&totalconn, 1); HA_ATOMIC_ADD(&totalconn, 1);
HA_ATOMIC_ADD(&jobs, 1); HA_ATOMIC_ADD(&jobs, 1);
LIST_INIT(&sess->srv_list); LIST_INIT(&sess->srv_list);
@ -72,7 +68,6 @@ void session_free(struct session *sess)
struct connection *conn, *conn_back; struct connection *conn, *conn_back;
struct sess_srv_list *srv_list, *srv_list_back; struct sess_srv_list *srv_list, *srv_list_back;
HA_ATOMIC_SUB(&sess->fe->feconn, 1);
if (sess->listener) if (sess->listener)
listener_release(sess->listener); listener_release(sess->listener);
session_store_counters(sess); session_store_counters(sess);