BUG/MEDIUM: queues: Stricly respect maxconn for outgoing connections

The "served" field of struct server is used to know how many connections
are currently in use for a server. But served used to be incremented way
after the server was picked, so there were race conditions that could
lead more than maxconn connections to be allocated for one server. To
fix this, increment served way earlier, and make sure at the time that
it never goes past maxconn.
We now should never have more outgoing connections than set by maxconn.
This commit is contained in:
Olivier Houchard 2024-12-17 13:30:46 +00:00 committed by Olivier Houchard
parent 4332fed6c1
commit 3372a2ea00
3 changed files with 74 additions and 23 deletions

View File

@ -1029,9 +1029,29 @@ int assign_server_and_queue(struct stream *s)
* is set on the server, we must also check that the server's queue is
* not full, in which case we have to return FULL.
*/
if (srv->maxconn &&
(srv->queue.length || srv->served >= srv_dynamic_maxconn(srv))) {
if (srv->maxconn) {
int served;
int got_it = 0;
/*
* Make sure that there's still a slot on the server.
* Try to increment its served, while making sure
* it is < maxconn.
*/
if (!srv->queue.length &&
(served = srv->served) < srv_dynamic_maxconn(srv)) {
/*
* Attempt to increment served, while
* making sure it is always below maxconn
*/
do {
got_it = _HA_ATOMIC_CAS(&srv->served,
&served, served + 1);
} while (!got_it && served < srv_dynamic_maxconn(srv) &&
__ha_cpu_relax());
}
if (!got_it) {
if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue)
return SRV_STATUS_FULL;
@ -1051,6 +1071,8 @@ int assign_server_and_queue(struct stream *s)
else
return SRV_STATUS_INTERNAL;
}
} else
_HA_ATOMIC_INC(&srv->served);
/* OK, we can use this server. Let's reserve our place */
sess_change_server(s, srv);

View File

@ -259,6 +259,9 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
struct pendconn *p = NULL;
struct pendconn *pp = NULL;
u32 pkey, ppkey;
int served;
int maxconn;
int got_it = 0;
p = NULL;
if (srv->queue.length)
@ -277,7 +280,25 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
if (!p && !pp)
return 0;
else if (!pp)
served = _HA_ATOMIC_LOAD(&srv->served);
maxconn = srv_dynamic_maxconn(srv);
while (served < maxconn && !got_it)
got_it = _HA_ATOMIC_CAS(&srv->served, &served, served + 1);
/* No more slot available, give up */
if (!got_it) {
if (pp)
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
return 0;
}
/*
* Now we know we'll have something available.
* Let's try to allocate a slot on the server.
*/
if (!pp)
goto use_p; /* p != NULL */
else if (!p)
goto use_pp; /* pp != NULL */
@ -394,10 +415,13 @@ int process_srv_queue(struct server *s)
HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock);
while (s->served < maxconn) {
/*
* pendconn_process_next_strm() will increment
* the served field, only if it is < maxconn.
*/
stop = !pendconn_process_next_strm(s, p, px_ok);
if (stop)
break;
_HA_ATOMIC_INC(&s->served);
done++;
if (done >= global.tune.maxpollevents)
break;

View File

@ -2743,11 +2743,17 @@ void sess_change_server(struct stream *strm, struct server *newsrv)
* invocation of sess_change_server().
*/
/*
* It is assumed if the stream has a non-NULL srv_conn, then its
* served field has been incremented, so we have to decrement it now.
*/
if (oldsrv)
_HA_ATOMIC_DEC(&oldsrv->served);
if (oldsrv == newsrv)
return;
if (oldsrv) {
_HA_ATOMIC_DEC(&oldsrv->served);
_HA_ATOMIC_DEC(&oldsrv->proxy->served);
__ha_barrier_atomic_store();
if (oldsrv->proxy->lbprm.server_drop_conn)
@ -2756,7 +2762,6 @@ void sess_change_server(struct stream *strm, struct server *newsrv)
}
if (newsrv) {
_HA_ATOMIC_INC(&newsrv->served);
_HA_ATOMIC_INC(&newsrv->proxy->served);
__ha_barrier_atomic_store();
if (newsrv->proxy->lbprm.server_take_conn)