MEDIUM: queues: Introduce a new field to know if queues are empty.

Proxies and servers both have a queueslength entry, that indicates how
many entries there are in their request queues. Those can get a lot of
contention, and they are often accessed just to know if the queue is
empty or not, which it will always be when there is no maxconn.
So for both, introduce a new field, "queues_not_empty", in a different,
less contended cache line, and use it every time we just want to know if
the queue is empty or not.
Those are only changed when the queues goes from empty to non-empty, and
vice-versa, which hopefully should not be too often.
For proxies, they are protected by the proxy lock. For servers, they are
protected by a new lock contained in the server, the status_lock.
In both case, to prevent any race condition, once the relevant lock is
held, the value of queueslength should be checked again before deciding
to change or not queues_not_empty, to prevent any race condition.
This commit is contained in:
Olivier Houchard 2026-02-03 03:44:58 +01:00 committed by Olivier Houchard
parent 9db62d408a
commit 131113bbe5
11 changed files with 64 additions and 17 deletions

View File

@ -510,6 +510,7 @@ struct proxy {
EXTRA_COUNTERS(extra_counters_fe);
EXTRA_COUNTERS(extra_counters_be);
uint8_t queues_not_empty; /* Are the request queues not empty ? Only changed when the queues go from non-empty to empty, and vice-versa. Protected by proxy lock */
THREAD_ALIGN();
/* these ones change all the time */
int served; /* # of active sessions currently being served */

View File

@ -86,7 +86,7 @@ static inline int server_has_room(const struct server *s) {
* for and if/else usage.
*/
static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) {
return (s && (s->queueslength || (p->queueslength && srv_currently_usable(s))) &&
return (s && (s->queues_not_empty || (p->queues_not_empty && srv_currently_usable(s))) &&
(!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)));
}

View File

@ -553,6 +553,8 @@ struct server {
struct sockaddr_storage socks4_addr; /* the address of the SOCKS4 Proxy, including the port */
EXTRA_COUNTERS(extra_counters);
__decl_thread(HA_SPINLOCK_T state_lock);/* protect the following state fields */
uint8_t queues_not_empty; /* Are the request queues not empty ? Only changed when the queues go from non-empty to empty, and vice-versa. Protected by the state_lock lock when changed */
};
/* data provided to EVENT_HDL_SUB_SERVER handlers through event_hdl facility */

View File

@ -597,7 +597,7 @@ struct server *get_server_rnd(struct stream *s, const struct server *avoid)
* the backend's queue instead.
*/
if (curr &&
(curr->queueslength || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
(curr->queues_not_empty || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
curr = NULL;
return curr;
@ -1158,10 +1158,21 @@ int assign_server_and_queue(struct stream *s)
_HA_ATOMIC_DEC(&p->queue->length);
if (p->queue->sv)
_HA_ATOMIC_DEC(&p->queue->sv->queueslength);
else
_HA_ATOMIC_DEC(&p->queue->px->queueslength);
if (p->queue->sv) {
if (_HA_ATOMIC_SUB_FETCH(&p->queue->sv->queueslength, 1) == 0) {
HA_SPIN_LOCK(SERVER_LOCK, &p->queue->sv->state_lock);
if (p->queue->sv->queueslength == 0)
p->queue->sv->queues_not_empty = 0;
HA_SPIN_UNLOCK(SERVER_LOCK, &p->queue->sv->state_lock);
}
} else {
if (_HA_ATOMIC_SUB_FETCH(&p->queue->px->queueslength, 1) == 0) {
HA_SPIN_LOCK(PROXY_LOCK, &p->queue->px->lock);
if (p->queue->px->queueslength == 0)
p->queue->px->queues_not_empty = 0;
HA_SPIN_UNLOCK(PROXY_LOCK, &p->queue->px->lock);
}
}
_HA_ATOMIC_INC(&p->queue->idx);
_HA_ATOMIC_DEC(&s->be->totpend);

View File

@ -532,7 +532,7 @@ struct server *chash_get_next_server(struct proxy *p, struct server *srvtoavoid)
* case we simply remember it for later use if needed.
*/
s = eb32_entry(node, struct tree_occ, node)->server;
if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
if (!s->maxconn || (!s->queueslength && server_has_room(s))) {
if (s != srvtoavoid) {
srv = s;
break;

View File

@ -322,7 +322,7 @@ struct server *fas_get_next_server(struct proxy *p, struct server *srvtoavoid)
struct server *s;
s = eb32_entry(node, struct server, lb_node);
if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
if (!s->maxconn || (!s->queueslength && server_has_room(s))) {
if (s != srvtoavoid) {
srv = s;
break;

View File

@ -797,7 +797,7 @@ redo:
eweight = _HA_ATOMIC_LOAD(&s->cur_eweight);
planned_inflight = tree_elt->lb_node.key * eweight / SRV_EWGHT_MAX;
if (!s->maxconn || s->served + s->queueslength < srv_dynamic_maxconn(s) + s->maxqueue) {
if (server_has_room(s)) {
if (_HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength) > planned_inflight + 2) {
/*
* The server has more requests than expected,

View File

@ -620,7 +620,7 @@ struct server *fwrr_get_next_server(struct proxy *p, struct server *srvtoavoid)
fwrr_update_position(grp, srv, next_weight);
fwrr_dequeue_srv(srv, tgid);
grp->curr_pos++;
if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
if (!srv->maxconn || (!srv->queueslength && server_has_room(srv))) {
/* make sure it is not the server we are trying to exclude... */
if (srv != srvtoavoid || avoided)
break;

View File

@ -230,7 +230,7 @@ struct server *map_get_server_rr(struct proxy *px, struct server *srvtoavoid)
avoididx = 0; /* shut a gcc warning */
do {
srv = px->lbprm.map.srv[newidx++];
if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
if (!srv->maxconn || (!srv->queueslength && server_has_room(srv))) {
/* make sure it is not the server we are try to exclude... */
/* ...but remember that is was selected yet avoided */
avoided = srv;

View File

@ -197,11 +197,20 @@ void pendconn_unlink(struct pendconn *p)
oldidx -= p->queue_idx;
if (sv) {
p->strm->logs.srv_queue_pos += oldidx;
_HA_ATOMIC_DEC(&sv->queueslength);
}
else {
if (_HA_ATOMIC_FETCH_SUB(&sv->queueslength, 1) == 0) {
HA_SPIN_LOCK(SERVER_LOCK, &sv->state_lock);
if (sv->queueslength == 0)
sv->queues_not_empty = 0;
HA_SPIN_UNLOCK(SERVER_LOCK, &sv->state_lock);
}
} else {
p->strm->logs.prx_queue_pos += oldidx;
_HA_ATOMIC_DEC(&px->queueslength);
if (_HA_ATOMIC_FETCH_SUB(&px->queueslength, 1) == 0) {
HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
if (px->queueslength == 0)
px->queues_not_empty = 0;
HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
}
}
_HA_ATOMIC_DEC(&q->length);
@ -350,7 +359,12 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
_HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length);
_HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx);
_HA_ATOMIC_DEC(&px->queueslength);
if (_HA_ATOMIC_SUB_FETCH(&px->queueslength, 1) == 0) {
HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
if (px->queueslength == 0)
px->queues_not_empty = 0;
HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
}
return 1;
use_p:
@ -372,7 +386,12 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
_HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length);
_HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx);
_HA_ATOMIC_DEC(&srv->queueslength);
if (_HA_ATOMIC_SUB_FETCH(&srv->queueslength, 1) == 0) {
HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
if (srv->queueslength == 0)
srv->queues_not_empty = 0;
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
}
return 1;
}
@ -603,6 +622,17 @@ struct pendconn *pendconn_add(struct stream *strm)
p->queue = q;
p->queue_idx = _HA_ATOMIC_LOAD(&q->idx) - 1; // for logging only
new_max = _HA_ATOMIC_ADD_FETCH(queueslength, 1);
if (new_max == 1) {
if (srv) {
HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
srv->queues_not_empty = 1;
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
} else {
HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
px->queues_not_empty = 1;
HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
}
}
_HA_ATOMIC_INC(&q->length);
old_max = _HA_ATOMIC_LOAD(max_ptr);
while (new_max > old_max) {

View File

@ -3259,6 +3259,8 @@ struct server *srv_drop(struct server *srv)
HA_SPIN_DESTROY(&srv->lock);
HA_SPIN_DESTROY(&srv->state_lock);
MT_LIST_DELETE(&srv->global_list);
event_hdl_sub_list_destroy(&srv->e_subs);
@ -3816,6 +3818,7 @@ static int _srv_parse_init(struct server **srv, char **args, int *cur_arg,
} else
srv_settings_init(newsrv);
HA_SPIN_INIT(&newsrv->lock);
HA_SPIN_INIT(&newsrv->state_lock);
}
else {
/* This is a "default-server" line. Let's make certain the