mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-03-19 05:51:10 +01:00
MEDIUM: servers: Introduce server_full
To detect that a server is full, its served variable is used. That variable, however, is in a very busy cache line, and it can be costly to access it. So instead, introduce a new variable, in a different cache line, server_full, that indicates if the server is full.
This commit is contained in:
parent
d6a2d03d39
commit
cd66a94b63
@ -554,6 +554,7 @@ struct server {
|
||||
EXTRA_COUNTERS(extra_counters);
|
||||
__decl_thread(HA_SPINLOCK_T state_lock);/* protect the following state fields */
|
||||
uint8_t queues_not_empty; /* Are the request queues not empty ? Only changed when the queues go from non-empty to empty, and vice-versa. Protected by the state_lock lock when changed */
|
||||
uint8_t server_full; /* we reached maxconn, and can no longer process more requests, protected by the state_lock */
|
||||
};
|
||||
|
||||
/* data provided to EVENT_HDL_SUB_SERVER handlers through event_hdl facility */
|
||||
|
||||
@ -398,12 +398,27 @@ static inline int srv_manage_queues(struct server *srv, struct proxy *px)
|
||||
{
|
||||
int full = -1;
|
||||
|
||||
if (may_dequeue_tasks(srv, px))
|
||||
full = process_srv_queue(srv, &full);
|
||||
if (may_dequeue_tasks(srv, px)) {
|
||||
process_srv_queue(srv, &full);
|
||||
if ((full && !srv->server_full) || (!full && srv->server_full)) {
|
||||
HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
|
||||
srv->server_full = (srv->served >= srv->maxconn);
|
||||
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
|
||||
}
|
||||
}
|
||||
|
||||
return full;
|
||||
}
|
||||
|
||||
static inline void srv_check_full_state(struct server *srv, int served)
|
||||
{
|
||||
if (srv->maxconn > 0 && served == srv->maxconn - 1) {
|
||||
HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
|
||||
srv->server_full = (srv->served >= srv->maxconn);
|
||||
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* _HAPROXY_SERVER_H */
|
||||
|
||||
/*
|
||||
|
||||
@ -353,8 +353,12 @@ static inline void stream_choose_redispatch(struct stream *s)
|
||||
(s->conn_retries % (s->max_retries + 1 + s->be->redispatch_after) == 0))) ||
|
||||
(!(s->flags & SF_DIRECT) && s->be->srv_act > 1 &&
|
||||
((s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_HI)))) {
|
||||
sess_change_server(s, NULL);
|
||||
srv_manage_queues(objt_server(s->target), s->be);
|
||||
struct server *srv = __objt_server(s->target);
|
||||
int served;
|
||||
|
||||
served = sess_change_server(s, NULL);
|
||||
if (srv_manage_queues(srv, s->be) == -1)
|
||||
srv_check_full_state(srv, served);
|
||||
|
||||
sockaddr_free(&s->scb->dst);
|
||||
s->flags &= ~(SF_DIRECT | SF_ASSIGNED);
|
||||
|
||||
@ -645,8 +645,10 @@ int assign_server(struct stream *s)
|
||||
/* We have to release any connection slot before applying any LB algo,
|
||||
* otherwise we may erroneously end up with no available slot.
|
||||
*/
|
||||
if (conn_slot)
|
||||
sess_change_server(s, NULL);
|
||||
if (conn_slot) {
|
||||
int served = sess_change_server(s, NULL);
|
||||
srv_check_full_state(conn_slot, served);
|
||||
}
|
||||
|
||||
/* We will now try to find the good server and store it into <objt_server(s->target)>.
|
||||
* Note that <objt_server(s->target)> may be NULL in case of dispatch or proxy mode,
|
||||
@ -858,7 +860,8 @@ out_ok:
|
||||
*/
|
||||
if (conn_slot) {
|
||||
if (conn_slot == srv) {
|
||||
sess_change_server(s, srv);
|
||||
int served = sess_change_server(s, srv);
|
||||
srv_check_full_state(srv, served);
|
||||
} else {
|
||||
srv_manage_queues(conn_slot, s->be);
|
||||
}
|
||||
@ -982,7 +985,7 @@ static int alloc_dst_address(struct sockaddr_storage **ss,
|
||||
int assign_server_and_queue(struct stream *s)
|
||||
{
|
||||
struct pendconn *p;
|
||||
struct server *srv;
|
||||
struct server *srv, *oldsrv;
|
||||
int count;
|
||||
int err;
|
||||
|
||||
@ -1039,10 +1042,16 @@ int assign_server_and_queue(struct stream *s)
|
||||
* assigned from persistence information (direct mode).
|
||||
*/
|
||||
if ((s->flags & SF_REDIRECTABLE) && srv->rdr_len) {
|
||||
struct server *oldsrv;
|
||||
|
||||
int served;
|
||||
/* server scheduled for redirection, and already assigned. We
|
||||
* don't want to go further nor check the queue.
|
||||
*/
|
||||
sess_change_server(s, srv); /* not really needed in fact */
|
||||
oldsrv = s->srv_conn;
|
||||
served = sess_change_server(s, srv); /* not really needed in fact */
|
||||
if (oldsrv)
|
||||
srv_check_full_state(oldsrv, served);
|
||||
return SRV_STATUS_OK;
|
||||
}
|
||||
|
||||
@ -1095,12 +1104,21 @@ int assign_server_and_queue(struct stream *s)
|
||||
else
|
||||
return SRV_STATUS_INTERNAL;
|
||||
}
|
||||
if (served + 1 == srv->maxconn) {
|
||||
HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
|
||||
srv->server_full = (srv->served >= srv->maxconn);
|
||||
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
|
||||
}
|
||||
} else
|
||||
count = _HA_ATOMIC_ADD_FETCH(&srv->served, 1);
|
||||
|
||||
HA_ATOMIC_UPDATE_MAX(&srv->counters.cur_sess_max, count);
|
||||
/* OK, we can use this server. Let's reserve our place */
|
||||
sess_change_server(s, srv);
|
||||
oldsrv = s->srv_conn;
|
||||
|
||||
count = sess_change_server(s, srv);
|
||||
if (oldsrv != NULL)
|
||||
srv_check_full_state(oldsrv, count);
|
||||
return SRV_STATUS_OK;
|
||||
|
||||
case SRV_STATUS_FULL:
|
||||
@ -1123,15 +1141,15 @@ int assign_server_and_queue(struct stream *s)
|
||||
* dequeue us eventually, so we can just do nothing.
|
||||
*/
|
||||
if (unlikely(s->be->ready_srv != NULL)) {
|
||||
struct server *newserv;
|
||||
struct server *newserv, *oldserv;
|
||||
int served;
|
||||
|
||||
newserv = HA_ATOMIC_XCHG(&s->be->ready_srv, NULL);
|
||||
if (newserv != NULL) {
|
||||
int got_slot = 0;
|
||||
|
||||
while (_HA_ATOMIC_LOAD(&newserv->served) == 0) {
|
||||
int served = 0;
|
||||
|
||||
served = 0;
|
||||
if (_HA_ATOMIC_CAS(&newserv->served, &served, 1)) {
|
||||
got_slot = 1;
|
||||
break;
|
||||
@ -1145,6 +1163,12 @@ int assign_server_and_queue(struct stream *s)
|
||||
return SRV_STATUS_QUEUED;
|
||||
}
|
||||
|
||||
if (served + 1 == newserv->maxconn) {
|
||||
HA_SPIN_LOCK(SERVER_LOCK, &newserv->state_lock);
|
||||
newserv->server_full = (newserv->served >= newserv->maxconn);
|
||||
HA_SPIN_UNLOCK(SERVER_LOCK, &newserv->state_lock);
|
||||
}
|
||||
|
||||
HA_SPIN_LOCK(QUEUE_LOCK, &p->queue->lock);
|
||||
if (!p->node.node.leaf_p) {
|
||||
HA_SPIN_UNLOCK(QUEUE_LOCK, &p->queue->lock);
|
||||
@ -1185,7 +1209,11 @@ int assign_server_and_queue(struct stream *s)
|
||||
stream_set_srv_target(s, newserv);
|
||||
|
||||
s->pend_pos = NULL;
|
||||
sess_change_server(s, newserv);
|
||||
oldserv = s->srv_conn;
|
||||
|
||||
served = sess_change_server(s, newserv);
|
||||
if (oldserv)
|
||||
srv_check_full_state(oldserv, served);
|
||||
return SRV_STATUS_OK;
|
||||
}
|
||||
}
|
||||
@ -2481,6 +2509,8 @@ void back_try_conn_req(struct stream *s)
|
||||
* abort, retry immediately or redispatch.
|
||||
*/
|
||||
if (conn_err == SF_ERR_INTERNAL) {
|
||||
int served;
|
||||
|
||||
if (!s->conn_err_type) {
|
||||
s->conn_err_type = STRM_ET_CONN_OTHER;
|
||||
}
|
||||
@ -2495,8 +2525,9 @@ void back_try_conn_req(struct stream *s)
|
||||
_HA_ATOMIC_INC(&s->be_tgcounters->failed_conns);
|
||||
|
||||
/* release other streams waiting for this server */
|
||||
sess_change_server(s, NULL);
|
||||
srv_manage_queues(srv, s->be);
|
||||
served = sess_change_server(s, NULL);
|
||||
if (srv && srv_manage_queues(srv, s->be) == -1)
|
||||
srv_check_full_state(srv, served);
|
||||
|
||||
/* Failed and not retryable. */
|
||||
sc_abort(sc);
|
||||
@ -2804,6 +2835,9 @@ void back_handle_st_cer(struct stream *s)
|
||||
|
||||
/* ensure that we have enough retries left */
|
||||
if (s->conn_retries >= s->max_retries || !(s->be->retry_type & PR_RE_CONN_FAILED)) {
|
||||
struct server *srv = objt_server(s->target);
|
||||
int served;
|
||||
|
||||
if (!s->conn_err_type) {
|
||||
s->conn_err_type = STRM_ET_CONN_ERR;
|
||||
}
|
||||
@ -2812,8 +2846,9 @@ void back_handle_st_cer(struct stream *s)
|
||||
_HA_ATOMIC_INC(&s->sv_tgcounters->failed_conns);
|
||||
if (s->be_tgcounters)
|
||||
_HA_ATOMIC_INC(&s->be_tgcounters->failed_conns);
|
||||
sess_change_server(s, NULL);
|
||||
srv_manage_queues(objt_server(s->target), s->be);
|
||||
served = sess_change_server(s, NULL);
|
||||
if (srv && srv_manage_queues(srv, s->be) == -1)
|
||||
srv_check_full_state(srv, served);
|
||||
|
||||
/* shutw is enough to stop a connecting socket */
|
||||
sc_shutdown(sc);
|
||||
@ -2838,6 +2873,9 @@ void back_handle_st_cer(struct stream *s)
|
||||
* ST_TAR and SC_FL_ERROR and SF_CONN_EXP flags will be unset.
|
||||
*/
|
||||
if (sc_reset_endp(sc) < 0) {
|
||||
struct server *srv = objt_server(s->target);
|
||||
int served;
|
||||
|
||||
if (!s->conn_err_type)
|
||||
s->conn_err_type = STRM_ET_CONN_OTHER;
|
||||
|
||||
@ -2845,8 +2883,9 @@ void back_handle_st_cer(struct stream *s)
|
||||
_HA_ATOMIC_INC(&s->sv_tgcounters->internal_errors);
|
||||
if (s->be_tgcounters)
|
||||
_HA_ATOMIC_INC(&s->be_tgcounters->internal_errors);
|
||||
sess_change_server(s, NULL);
|
||||
srv_manage_queues(objt_server(s->target), s->be);
|
||||
served = sess_change_server(s, NULL);
|
||||
if (srv && srv_manage_queues(srv, s->be) == -1)
|
||||
srv_check_full_state(srv, served);
|
||||
|
||||
/* shutw is enough to stop a connecting socket */
|
||||
sc_shutdown(sc);
|
||||
|
||||
@ -3485,8 +3485,13 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
|
||||
|
||||
if (s->flags & SF_BE_ASSIGNED) {
|
||||
HA_ATOMIC_DEC(&be->beconn);
|
||||
if (unlikely(s->srv_conn))
|
||||
sess_change_server(s, NULL);
|
||||
if (unlikely(s->srv_conn)) {
|
||||
struct server *srv = s->srv_conn;
|
||||
int served;
|
||||
|
||||
served = sess_change_server(s, NULL);
|
||||
srv_check_full_state(srv, served);
|
||||
}
|
||||
}
|
||||
|
||||
s->logs.t_close = ns_to_ms(now_ns - s->logs.accept_ts);
|
||||
|
||||
25
src/stream.c
25
src/stream.c
@ -641,8 +641,9 @@ void stream_free(struct stream *s)
|
||||
* free the connection before.
|
||||
*/
|
||||
if (!(oldsrv->flags & SRV_F_STRICT_MAXCONN)) {
|
||||
sess_change_server(s, NULL);
|
||||
srv_manage_queues(oldsrv, s->be);
|
||||
int served = sess_change_server(s, NULL);
|
||||
if (srv_manage_queues(oldsrv, s->be) == -1)
|
||||
srv_check_full_state(oldsrv, served);
|
||||
}
|
||||
}
|
||||
|
||||
@ -751,8 +752,9 @@ void stream_free(struct stream *s)
|
||||
struct server *oldsrv = s->srv_conn;
|
||||
|
||||
if ((oldsrv->flags & SRV_F_STRICT_MAXCONN)) {
|
||||
sess_change_server(s, NULL);
|
||||
srv_manage_queues(oldsrv, s->be);
|
||||
int served = sess_change_server(s, NULL);
|
||||
if (srv_manage_queues(oldsrv, s->be) == -1)
|
||||
srv_check_full_state(oldsrv, served);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2052,8 +2054,11 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
||||
* free the connection before.
|
||||
*/
|
||||
if (!(srv->flags & SRV_F_STRICT_MAXCONN)) {
|
||||
sess_change_server(s, NULL);
|
||||
srv_manage_queues(srv, s->be);
|
||||
int served;
|
||||
|
||||
served = sess_change_server(s, NULL);
|
||||
if (srv_manage_queues(srv, s->be) == -1)
|
||||
srv_check_full_state(srv, served);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2969,8 +2974,12 @@ void stream_shutdown_self(struct stream *stream, int why)
|
||||
stream->flags |= why;
|
||||
|
||||
if (objt_server(stream->target)) {
|
||||
sess_change_server(stream, NULL);
|
||||
srv_manage_queues(__objt_server(stream->target), stream->be);
|
||||
struct server *srv = __objt_server(stream->target);
|
||||
int served;
|
||||
|
||||
served = sess_change_server(stream, NULL);
|
||||
if (srv_manage_queues(srv, stream->be) == -1)
|
||||
srv_check_full_state(srv, served);
|
||||
}
|
||||
|
||||
/* shutw is enough to stop a connecting socket */
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user