diff --git a/src/backend.c b/src/backend.c index c8bb33e81..55aeacd08 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1029,28 +1029,50 @@ int assign_server_and_queue(struct stream *s) * is set on the server, we must also check that the server's queue is * not full, in which case we have to return FULL. */ - if (srv->maxconn && - (srv->queue.length || srv->served >= srv_dynamic_maxconn(srv))) { + if (srv->maxconn) { + int served; + int got_it = 0; - if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue) - return SRV_STATUS_FULL; - - p = pendconn_add(s); - if (p) { - /* There's a TOCTOU here: it may happen that between the - * moment we decided to queue the request and the moment - * it was done, the last active request on the server - * ended and no new one will be able to dequeue that one. - * Since we already have our server we don't care, this - * will be handled by the caller which will check for - * this condition and will immediately dequeue it if - * possible. + /* + * Make sure that there's still a slot on the server. + * Try to increment its served, while making sure + * it is < maxconn. + */ + if (!srv->queue.length && + (served = srv->served) < srv_dynamic_maxconn(srv)) { + /* + * Attempt to increment served, while + * making sure it is always below maxconn */ - return SRV_STATUS_QUEUED; + + do { + got_it = _HA_ATOMIC_CAS(&srv->served, + &served, served + 1); + } while (!got_it && served < srv_dynamic_maxconn(srv) && + __ha_cpu_relax()); } - else - return SRV_STATUS_INTERNAL; - } + if (!got_it) { + if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue) + return SRV_STATUS_FULL; + + p = pendconn_add(s); + if (p) { + /* There's a TOCTOU here: it may happen that between the + * moment we decided to queue the request and the moment + * it was done, the last active request on the server + * ended and no new one will be able to dequeue that one. + * Since we already have our server we don't care, this + * will be handled by the caller which will check for + * this condition and will immediately dequeue it if + * possible. + */ + return SRV_STATUS_QUEUED; + } + else + return SRV_STATUS_INTERNAL; + } + } else + _HA_ATOMIC_INC(&srv->served); /* OK, we can use this server. Let's reserve our place */ sess_change_server(s, srv); diff --git a/src/queue.c b/src/queue.c index c2c702522..234b01a3b 100644 --- a/src/queue.c +++ b/src/queue.c @@ -259,6 +259,9 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int struct pendconn *p = NULL; struct pendconn *pp = NULL; u32 pkey, ppkey; + int served; + int maxconn; + int got_it = 0; p = NULL; if (srv->queue.length) @@ -277,7 +280,25 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int if (!p && !pp) return 0; - else if (!pp) + + served = _HA_ATOMIC_LOAD(&srv->served); + maxconn = srv_dynamic_maxconn(srv); + + while (served < maxconn && !got_it) + got_it = _HA_ATOMIC_CAS(&srv->served, &served, served + 1); + + /* No more slot available, give up */ + if (!got_it) { + if (pp) + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + return 0; + } + + /* + * Now we know we'll have something available. + * Let's try to allocate a slot on the server. + */ + if (!pp) goto use_p; /* p != NULL */ else if (!p) goto use_pp; /* pp != NULL */ @@ -394,10 +415,13 @@ int process_srv_queue(struct server *s) HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock); while (s->served < maxconn) { + /* + * pendconn_process_next_strm() will increment + * the served field, only if it is < maxconn. + */ stop = !pendconn_process_next_strm(s, p, px_ok); if (stop) break; - _HA_ATOMIC_INC(&s->served); done++; if (done >= global.tune.maxpollevents) break; diff --git a/src/stream.c b/src/stream.c index 73c182b68..9ffdf07d9 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2743,11 +2743,17 @@ void sess_change_server(struct stream *strm, struct server *newsrv) * invocation of sess_change_server(). */ + /* + * It is assumed if the stream has a non-NULL srv_conn, then its + * served field has been incremented, so we have to decrement it now. + */ + if (oldsrv) + _HA_ATOMIC_DEC(&oldsrv->served); + if (oldsrv == newsrv) return; if (oldsrv) { - _HA_ATOMIC_DEC(&oldsrv->served); _HA_ATOMIC_DEC(&oldsrv->proxy->served); __ha_barrier_atomic_store(); if (oldsrv->proxy->lbprm.server_drop_conn) @@ -2756,7 +2762,6 @@ void sess_change_server(struct stream *strm, struct server *newsrv) } if (newsrv) { - _HA_ATOMIC_INC(&newsrv->served); _HA_ATOMIC_INC(&newsrv->proxy->served); __ha_barrier_atomic_store(); if (newsrv->proxy->lbprm.server_take_conn)