diff --git a/include/haproxy/queue.h b/include/haproxy/queue.h index 3c6ebe89b..58a0b091c 100644 --- a/include/haproxy/queue.h +++ b/include/haproxy/queue.h @@ -87,7 +87,7 @@ static inline int server_has_room(const struct server *s) { * for and if/else usage. */ static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) { - return (s && (s->nbpend || (p->queue.length && srv_currently_usable(s))) && + return (s && (s->queue.length || (p->queue.length && srv_currently_usable(s))) && (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s))); } diff --git a/include/haproxy/server-t.h b/include/haproxy/server-t.h index ef91ee9c7..6772db2b0 100644 --- a/include/haproxy/server-t.h +++ b/include/haproxy/server-t.h @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -271,7 +272,7 @@ struct server { unsigned int est_need_conns; /* Estimate on the number of needed connections (max of curr and previous max_used) */ unsigned int next_takeover; /* thread ID to try to steal connections from next time */ - struct eb_root pendconns; /* pending connections */ + struct queue queue; /* pending connections */ /* Element below are usd by LB algorithms and must be doable in * parallel to other threads reusing connections above. @@ -286,10 +287,8 @@ struct server { ALWAYS_ALIGN(64); int cur_sess; /* number of currently active sessions (including syn_sent) */ int served; /* # of active sessions currently being served (ie not pending) */ - int nbpend; /* number of pending connections */ int consecutive_errors; /* current number of consecutive errors */ struct freq_ctr sess_per_sec; /* sessions per second on this server */ - unsigned int queue_idx; /* count of pending connections which have been de-queued */ struct be_counters counters; /* statistics counters */ /* Below are some relatively stable settings, only changed under the lock */ diff --git a/src/backend.c b/src/backend.c index 1bd40fbaa..f6f3ef037 100644 --- a/src/backend.c +++ b/src/backend.c @@ -552,7 +552,7 @@ static struct server *get_server_rnd(struct stream *s, const struct server *avoi * the backend's queue instead. */ if (curr && - (curr->nbpend || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr)))) + (curr->queue.length || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr)))) curr = NULL; return curr; @@ -624,7 +624,7 @@ int assign_server(struct stream *s) ((s->sess->flags & SESS_FL_PREFER_LAST) || (!s->be->max_ka_queue || server_has_room(tmpsrv) || ( - tmpsrv->nbpend + 1 < s->be->max_ka_queue))) && + tmpsrv->queue.length + 1 < s->be->max_ka_queue))) && srv_currently_usable(tmpsrv)) { list_for_each_entry(conn, &srv_list->conn_list, session_list) { if (!(conn->flags & CO_FL_WAIT_XPRT)) { @@ -1001,9 +1001,9 @@ int assign_server_and_queue(struct stream *s) * not full, in which case we have to return FULL. */ if (srv->maxconn && - (srv->nbpend || srv->served >= srv_dynamic_maxconn(srv))) { + (srv->queue.length || srv->served >= srv_dynamic_maxconn(srv))) { - if (srv->maxqueue > 0 && srv->nbpend >= srv->maxqueue) + if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue) return SRV_STATUS_FULL; p = pendconn_add(s); @@ -2734,7 +2734,7 @@ smp_fetch_connslots(const struct arg *args, struct sample *smp, const char *kw, } smp->data.u.sint += (iterator->maxconn - iterator->cur_sess) - + (iterator->maxqueue - iterator->nbpend); + + (iterator->maxqueue - iterator->queue.length); } return 1; @@ -2981,7 +2981,7 @@ smp_fetch_srv_queue(const struct arg *args, struct sample *smp, const char *kw, { smp->flags = SMP_F_VOL_TEST; smp->data.type = SMP_T_SINT; - smp->data.u.sint = args->data.srv->nbpend; + smp->data.u.sint = args->data.srv->queue.length; return 1; } @@ -3123,7 +3123,7 @@ sample_conv_srv_queue(const struct arg *args, struct sample *smp, void *private) return 0; smp->data.type = SMP_T_SINT; - smp->data.u.sint = srv->nbpend; + smp->data.u.sint = srv->queue.length; return 1; } diff --git a/src/check.c b/src/check.c index 68cb1c358..9c885eaad 100644 --- a/src/check.c +++ b/src/check.c @@ -994,7 +994,7 @@ int httpchk_build_status_header(struct server *s, struct buffer *buf) (s->cur_eweight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv, (s->proxy->lbprm.tot_weight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv, s->cur_sess, s->proxy->beconn - s->proxy->queue.length, - s->nbpend); + s->queue.length); if ((s->cur_state == SRV_ST_STARTING) && now.tv_sec < s->last_change + s->slowstart && diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 743565d61..3d68fc56d 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1728,7 +1728,7 @@ spoe_handle_processing_appctx(struct appctx *appctx) */ if (global.nbthread > 1 && (agent->b.be->queue.length || - (srv && (srv->nbpend || (srv->maxconn && srv->served >=srv_dynamic_maxconn(srv)))))) { + (srv && (srv->queue.length || (srv->maxconn && srv->served >=srv_dynamic_maxconn(srv)))))) { SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE; appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; appctx->st1 = SPOE_APPCTX_ERR_NONE; diff --git a/src/haproxy.c b/src/haproxy.c index 82efcd23e..de13aba05 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -904,7 +904,7 @@ static void sig_dump_state(struct sig_handler *sh) "SIGHUP: Server %s/%s is %s. Conn: %d act, %d pend, %lld tot.", p->id, s->id, (s->cur_state != SRV_ST_STOPPED) ? "UP" : "DOWN", - s->cur_sess, s->nbpend, s->counters.cum_sess); + s->cur_sess, s->queue.length, s->counters.cum_sess); ha_warning("%s\n", trash.area); send_log(p, LOG_NOTICE, "%s\n", trash.area); s = s->next; diff --git a/src/lb_chash.c b/src/lb_chash.c index 23b2b1272..023219c98 100644 --- a/src/lb_chash.c +++ b/src/lb_chash.c @@ -443,7 +443,7 @@ struct server *chash_get_next_server(struct proxy *p, struct server *srvtoavoid) * case we simply remember it for later use if needed. */ s = eb32_entry(node, struct tree_occ, node)->server; - if (!s->maxconn || (!s->nbpend && s->served < srv_dynamic_maxconn(s))) { + if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) { if (s != srvtoavoid) { srv = s; break; diff --git a/src/lb_fas.c b/src/lb_fas.c index 53bd0392d..d90388b40 100644 --- a/src/lb_fas.c +++ b/src/lb_fas.c @@ -322,7 +322,7 @@ struct server *fas_get_next_server(struct proxy *p, struct server *srvtoavoid) struct server *s; s = eb32_entry(node, struct server, lb_node); - if (!s->maxconn || (!s->nbpend && s->served < srv_dynamic_maxconn(s))) { + if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) { if (s != srvtoavoid) { srv = s; break; diff --git a/src/lb_fwlc.c b/src/lb_fwlc.c index ba1ca95ae..091241cc5 100644 --- a/src/lb_fwlc.c +++ b/src/lb_fwlc.c @@ -57,7 +57,7 @@ static inline void fwlc_dequeue_srv(struct server *s) */ static inline void fwlc_queue_srv(struct server *s, unsigned int eweight) { - unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->nbpend); + unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length); s->lb_node.key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / eweight : 0; eb32_insert(s->lb_tree, &s->lb_node); @@ -70,7 +70,7 @@ static inline void fwlc_queue_srv(struct server *s, unsigned int eweight) */ static void fwlc_srv_reposition(struct server *s) { - unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->nbpend); + unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length); unsigned int new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / s->cur_eweight : 0; /* some calls will be made for no change (e.g connect_server() after @@ -86,7 +86,7 @@ static void fwlc_srv_reposition(struct server *s) * likely to have released a connection or taken one leading * to our target value (50% of the case in measurements). */ - inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->nbpend); + inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length); new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / s->cur_eweight : 0; if (!s->lb_node.node.leaf_p || s->lb_node.key != new_key) { eb32_delete(&s->lb_node); @@ -347,7 +347,7 @@ struct server *fwlc_get_next_server(struct proxy *p, struct server *srvtoavoid) struct server *s; s = eb32_entry(node, struct server, lb_node); - if (!s->maxconn || s->served + s->nbpend < srv_dynamic_maxconn(s) + s->maxqueue) { + if (!s->maxconn || s->served + s->queue.length < srv_dynamic_maxconn(s) + s->maxqueue) { if (s != srvtoavoid) { srv = s; break; diff --git a/src/lb_fwrr.c b/src/lb_fwrr.c index d7f618faf..74c7fb244 100644 --- a/src/lb_fwrr.c +++ b/src/lb_fwrr.c @@ -564,7 +564,7 @@ struct server *fwrr_get_next_server(struct proxy *p, struct server *srvtoavoid) fwrr_update_position(grp, srv); fwrr_dequeue_srv(srv); grp->curr_pos++; - if (!srv->maxconn || (!srv->nbpend && srv->served < srv_dynamic_maxconn(srv))) { + if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) { /* make sure it is not the server we are trying to exclude... */ if (srv != srvtoavoid || avoided) break; diff --git a/src/lb_map.c b/src/lb_map.c index b735678a8..592df91cc 100644 --- a/src/lb_map.c +++ b/src/lb_map.c @@ -230,7 +230,7 @@ struct server *map_get_server_rr(struct proxy *px, struct server *srvtoavoid) avoididx = 0; /* shut a gcc warning */ do { srv = px->lbprm.map.srv[newidx++]; - if (!srv->maxconn || (!srv->nbpend && srv->served < srv_dynamic_maxconn(srv))) { + if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) { /* make sure it is not the server we are try to exclude... */ /* ...but remember that is was selected yet avoided */ avoided = srv; diff --git a/src/queue.c b/src/queue.c index 40fabfc95..c9134afb9 100644 --- a/src/queue.c +++ b/src/queue.c @@ -133,7 +133,7 @@ unsigned int srv_dynamic_maxconn(const struct server *s) */ static void __pendconn_unlink_srv(struct pendconn *p) { - p->strm->logs.srv_queue_pos += p->srv->queue_idx - p->queue_idx; + p->strm->logs.srv_queue_pos += p->srv->queue.idx - p->queue_idx; eb32_delete(&p->node); } @@ -194,7 +194,7 @@ void pendconn_unlink(struct pendconn *p) } HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock); if (done) { - _HA_ATOMIC_DEC(&p->srv->nbpend); + _HA_ATOMIC_DEC(&p->srv->queue.length); _HA_ATOMIC_DEC(&p->px->totpend); } } @@ -273,8 +273,8 @@ static struct pendconn *pendconn_process_next_strm(struct server *srv, struct pr rsrv = srv; p = NULL; - if (srv->nbpend) - p = pendconn_first(&srv->pendconns); + if (srv->queue.length) + p = pendconn_first(&srv->queue.head); pp = NULL; if (srv_currently_usable(rsrv) && px->queue.length && @@ -320,9 +320,9 @@ static struct pendconn *pendconn_process_next_strm(struct server *srv, struct pr goto unlinked; use_p: __pendconn_unlink_srv(p); - _HA_ATOMIC_DEC(&srv->nbpend); + _HA_ATOMIC_DEC(&srv->queue.length); _HA_ATOMIC_DEC(&px->totpend); - srv->queue_idx++; + srv->queue.idx++; unlinked: p->strm_flags |= SF_ASSIGNED; p->target = srv; @@ -416,7 +416,7 @@ struct pendconn *pendconn_add(struct stream *strm) if (srv) { unsigned int old_max, new_max; - new_max = _HA_ATOMIC_ADD_FETCH(&srv->nbpend, 1); + new_max = _HA_ATOMIC_ADD_FETCH(&srv->queue.length, 1); old_max = srv->counters.nbpend_max; while (new_max > old_max) { if (likely(_HA_ATOMIC_CAS(&srv->counters.nbpend_max, &old_max, new_max))) @@ -425,8 +425,8 @@ struct pendconn *pendconn_add(struct stream *strm) __ha_barrier_atomic_store(); HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock); - p->queue_idx = srv->queue_idx - 1; // for increment - eb32_insert(&srv->pendconns, &p->node); + p->queue_idx = srv->queue.idx - 1; // for increment + eb32_insert(&srv->queue.head, &p->node); HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock); } else { @@ -465,7 +465,7 @@ int pendconn_redistribute(struct server *s) if ((s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP) return 0; - for (node = eb32_first(&s->pendconns); node; node = nodeb) { + for (node = eb32_first(&s->queue.head); node; node = nodeb) { nodeb = eb32_next(node); p = eb32_entry(node, struct pendconn, node); @@ -480,7 +480,7 @@ int pendconn_redistribute(struct server *s) xferred++; } if (xferred) { - _HA_ATOMIC_SUB(&s->nbpend, xferred); + _HA_ATOMIC_SUB(&s->queue.length, xferred); _HA_ATOMIC_SUB(&s->proxy->totpend, xferred); } return xferred; diff --git a/src/server.c b/src/server.c index 141f1ba09..5d869e6fb 100644 --- a/src/server.c +++ b/src/server.c @@ -1370,13 +1370,13 @@ void srv_append_status(struct buffer *msg, struct server *s, " %d sessions active, %d requeued, %d remaining in queue", s->proxy->srv_act, s->proxy->srv_bck, (s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "", - s->cur_sess, xferred, s->nbpend); + s->cur_sess, xferred, s->queue.length); else chunk_appendf(msg, ". %d active and %d backup servers online.%s" " %d sessions requeued, %d total in queue", s->proxy->srv_act, s->proxy->srv_bck, (s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "", - xferred, s->nbpend); + xferred, s->queue.length); } } @@ -2163,7 +2163,7 @@ struct server *new_server(struct proxy *proxy) srv->obj_type = OBJ_TYPE_SERVER; srv->proxy = proxy; - srv->pendconns = EB_ROOT; + srv->queue.head = EB_ROOT; LIST_APPEND(&servers_list, &srv->global_list); LIST_INIT(&srv->srv_rec_item); LIST_INIT(&srv->ip_rec_item); @@ -4642,7 +4642,7 @@ static int cli_parse_delete_server(char **args, char *payload, struct appctx *ap * cleanup function should be implemented to be used here. */ if (srv->cur_sess || srv->curr_idle_conns || - !eb_is_empty(&srv->pendconns)) { + !eb_is_empty(&srv->queue.head)) { cli_err(appctx, "Server still has connections attached to it, cannot remove it."); goto out; } diff --git a/src/stats.c b/src/stats.c index c8b5fb142..3458924b7 100644 --- a/src/stats.c +++ b/src/stats.c @@ -2141,7 +2141,7 @@ int stats_fill_sv_stats(struct proxy *px, struct server *sv, int flags, metric = mkf_str(FO_CONFIG|FS_SERVICE, proxy_mode_str(px->mode)); break; case ST_F_QCUR: - metric = mkf_u32(0, sv->nbpend); + metric = mkf_u32(0, sv->queue.length); break; case ST_F_QMAX: metric = mkf_u32(FN_MAX, sv->counters.nbpend_max);