MINOR: queue: factor out the proxy/server queuing code

The code only differed by the nbpend_max counter. Let's have a pointer
to it and merge the two variants to always use a generic queue. It was
initially considered to put the max inside the queue structure itself,
but the stats support clearing values and maxes and this would have been
the only counter having to be handled separately there. Given that we
don't need this max anywhere outside stats, let's keep it where it is
and have a pointer to it instead.

The CAS loop to update the max remains. It was naively thought that it
would have been faster without atomic ops inside the lock, but this is
not the case for the simple reason that it is a max, it converges very
quickly and never has to perform the check anymore. Thus this code is
better out of the lock.

The queue_idx is still updated inside the lock since that's where the
idx is updated, though it could be performed using atomic ops given
that it's only used to roughly count places for logging.
This commit is contained in:
Willy Tarreau 2021-06-18 10:21:20 +02:00
parent c83e45e9b0
commit 3eecdb65c5

View File

@ -391,6 +391,9 @@ struct pendconn *pendconn_add(struct stream *strm)
struct pendconn *p; struct pendconn *p;
struct proxy *px; struct proxy *px;
struct server *srv; struct server *srv;
struct queue *q;
unsigned int *max_ptr;
unsigned int old_max, new_max;
p = pool_alloc(pool_head_pendconn); p = pool_alloc(pool_head_pendconn);
if (!p) if (!p)
@ -411,38 +414,27 @@ struct pendconn *pendconn_add(struct stream *strm)
strm->pend_pos = p; strm->pend_pos = p;
if (srv) { if (srv) {
unsigned int old_max, new_max; q = &srv->queue;
max_ptr = &srv->counters.nbpend_max;
new_max = _HA_ATOMIC_ADD_FETCH(&srv->queue.length, 1);
old_max = srv->counters.nbpend_max;
while (new_max > old_max) {
if (likely(_HA_ATOMIC_CAS(&srv->counters.nbpend_max, &old_max, new_max)))
break;
}
__ha_barrier_atomic_store();
HA_SPIN_LOCK(QUEUE_LOCK, &p->srv->queue.lock);
p->queue_idx = srv->queue.idx - 1; // for increment
eb32_insert(&srv->queue.head, &p->node);
HA_SPIN_UNLOCK(QUEUE_LOCK, &p->srv->queue.lock);
} }
else { else {
unsigned int old_max, new_max; q = &px->queue;
max_ptr = &px->be_counters.nbpend_max;
new_max = _HA_ATOMIC_ADD_FETCH(&px->queue.length, 1);
old_max = px->be_counters.nbpend_max;
while (new_max > old_max) {
if (likely(_HA_ATOMIC_CAS(&px->be_counters.nbpend_max, &old_max, new_max)))
break;
}
__ha_barrier_atomic_store();
HA_SPIN_LOCK(QUEUE_LOCK, &p->px->queue.lock);
p->queue_idx = px->queue.idx - 1; // for increment
eb32_insert(&px->queue.head, &p->node);
HA_SPIN_UNLOCK(QUEUE_LOCK, &p->px->queue.lock);
} }
new_max = _HA_ATOMIC_ADD_FETCH(&q->length, 1);
old_max = _HA_ATOMIC_LOAD(max_ptr);
while (new_max > old_max) {
if (likely(_HA_ATOMIC_CAS(max_ptr, &old_max, new_max)))
break;
}
__ha_barrier_atomic_store();
HA_SPIN_LOCK(QUEUE_LOCK, &q->lock);
p->queue_idx = q->idx - 1; // for increment
eb32_insert(&q->head, &p->node);
HA_SPIN_UNLOCK(QUEUE_LOCK, &q->lock);
_HA_ATOMIC_INC(&px->totpend); _HA_ATOMIC_INC(&px->totpend);
return p; return p;
} }