mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-01-31 06:51:11 +01:00
MEDIUM: stream: Use the new _HA_ATOMIC_* macros.
Use the new _HA_ATOMIC_* macros and add barriers where needed.
This commit is contained in:
parent
2be5a4c627
commit
dc6111e864
128
src/stream.c
128
src/stream.c
@ -190,7 +190,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||
s->si[0].flags = SI_FL_NONE;
|
||||
s->si[1].flags = SI_FL_ISBACK;
|
||||
|
||||
s->uniq_id = HA_ATOMIC_XADD(&global.req_count, 1);
|
||||
s->uniq_id = _HA_ATOMIC_XADD(&global.req_count, 1);
|
||||
|
||||
/* OK, we're keeping the stream, so let's properly initialize the stream */
|
||||
LIST_INIT(&s->back_refs);
|
||||
@ -353,7 +353,7 @@ static void stream_free(struct stream *s)
|
||||
if (objt_server(s->target)) { /* there may be requests left pending in queue */
|
||||
if (s->flags & SF_CURR_SESS) {
|
||||
s->flags &= ~SF_CURR_SESS;
|
||||
HA_ATOMIC_SUB(&__objt_server(s->target)->cur_sess, 1);
|
||||
_HA_ATOMIC_SUB(&__objt_server(s->target)->cur_sess, 1);
|
||||
}
|
||||
if (may_dequeue_tasks(objt_server(s->target), s->be))
|
||||
process_srv_queue(objt_server(s->target));
|
||||
@ -538,14 +538,14 @@ void stream_process_counters(struct stream *s)
|
||||
bytes = s->req.total - s->logs.bytes_in;
|
||||
s->logs.bytes_in = s->req.total;
|
||||
if (bytes) {
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_in, bytes);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.bytes_in, bytes);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_in, bytes);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.bytes_in, bytes);
|
||||
|
||||
if (objt_server(s->target))
|
||||
HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_in, bytes);
|
||||
_HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_in, bytes);
|
||||
|
||||
if (sess->listener && sess->listener->counters)
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
|
||||
_HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
|
||||
|
||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||
struct stkctr *stkctr = &s->stkctr[i];
|
||||
@ -578,14 +578,14 @@ void stream_process_counters(struct stream *s)
|
||||
bytes = s->res.total - s->logs.bytes_out;
|
||||
s->logs.bytes_out = s->res.total;
|
||||
if (bytes) {
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_out, bytes);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.bytes_out, bytes);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_out, bytes);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.bytes_out, bytes);
|
||||
|
||||
if (objt_server(s->target))
|
||||
HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_out, bytes);
|
||||
_HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_out, bytes);
|
||||
|
||||
if (sess->listener && sess->listener->counters)
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
|
||||
_HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
|
||||
|
||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||
struct stkctr *stkctr = &s->stkctr[i];
|
||||
@ -712,7 +712,7 @@ static int sess_update_st_cer(struct stream *s)
|
||||
|
||||
if (s->flags & SF_CURR_SESS) {
|
||||
s->flags &= ~SF_CURR_SESS;
|
||||
HA_ATOMIC_SUB(&__objt_server(s->target)->cur_sess, 1);
|
||||
_HA_ATOMIC_SUB(&__objt_server(s->target)->cur_sess, 1);
|
||||
}
|
||||
|
||||
if ((si->flags & SI_FL_ERR) &&
|
||||
@ -745,8 +745,8 @@ static int sess_update_st_cer(struct stream *s)
|
||||
}
|
||||
|
||||
if (objt_server(s->target))
|
||||
HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_conns, 1);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
|
||||
_HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_conns, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
|
||||
sess_change_server(s, NULL);
|
||||
if (may_dequeue_tasks(objt_server(s->target), s->be))
|
||||
process_srv_queue(objt_server(s->target));
|
||||
@ -793,8 +793,8 @@ static int sess_update_st_cer(struct stream *s)
|
||||
si->state = SI_ST_REQ;
|
||||
} else {
|
||||
if (objt_server(s->target))
|
||||
HA_ATOMIC_ADD(&__objt_server(s->target)->counters.retries, 1);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.retries, 1);
|
||||
_HA_ATOMIC_ADD(&__objt_server(s->target)->counters.retries, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.retries, 1);
|
||||
si->state = SI_ST_ASS;
|
||||
}
|
||||
|
||||
@ -951,8 +951,8 @@ static void sess_update_stream_int(struct stream *s)
|
||||
if (srv)
|
||||
srv_set_sess_last(srv);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
|
||||
|
||||
/* release other streams waiting for this server */
|
||||
sess_change_server(s, NULL);
|
||||
@ -1014,8 +1014,8 @@ static void sess_update_stream_int(struct stream *s)
|
||||
pendconn_cond_unlink(s->pend_pos);
|
||||
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
|
||||
si_shutr(si);
|
||||
si_shutw(si);
|
||||
req->flags |= CF_WRITE_TIMEOUT;
|
||||
@ -1085,9 +1085,9 @@ static void sess_set_term_flags(struct stream *s)
|
||||
if (!(s->flags & SF_FINST_MASK)) {
|
||||
if (s->si[1].state < SI_ST_REQ) {
|
||||
|
||||
HA_ATOMIC_ADD(&strm_fe(s)->fe_counters.failed_req, 1);
|
||||
_HA_ATOMIC_ADD(&strm_fe(s)->fe_counters.failed_req, 1);
|
||||
if (strm_li(s) && strm_li(s)->counters)
|
||||
HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1);
|
||||
_HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1);
|
||||
|
||||
s->flags |= SF_FINST_R;
|
||||
}
|
||||
@ -1238,7 +1238,7 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px,
|
||||
appctx_wakeup(appctx);
|
||||
|
||||
if (sess->fe == s->be) /* report it if the request was intercepted by the frontend */
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.intercepted_req, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.intercepted_req, 1);
|
||||
|
||||
/* The flag SF_ASSIGNED prevent from server assignment. */
|
||||
s->flags |= SF_ASSIGNED;
|
||||
@ -1814,10 +1814,10 @@ redo:
|
||||
si_shutw(si_f);
|
||||
si_report_error(si_f);
|
||||
if (!(req->analysers) && !(res->analysers)) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
if (!(s->flags & SF_ERR_MASK))
|
||||
s->flags |= SF_ERR_CLICL;
|
||||
if (!(s->flags & SF_FINST_MASK))
|
||||
@ -1831,14 +1831,14 @@ redo:
|
||||
si_shutr(si_b);
|
||||
si_shutw(si_b);
|
||||
si_report_error(si_b);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.failed_resp, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.failed_resp, 1);
|
||||
if (!(req->analysers) && !(res->analysers)) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
if (!(s->flags & SF_ERR_MASK))
|
||||
s->flags |= SF_ERR_SRVCL;
|
||||
if (!(s->flags & SF_FINST_MASK))
|
||||
@ -1896,7 +1896,7 @@ redo:
|
||||
if (srv) {
|
||||
if (s->flags & SF_CURR_SESS) {
|
||||
s->flags &= ~SF_CURR_SESS;
|
||||
HA_ATOMIC_SUB(&srv->cur_sess, 1);
|
||||
_HA_ATOMIC_SUB(&srv->cur_sess, 1);
|
||||
}
|
||||
sess_change_server(s, NULL);
|
||||
if (may_dequeue_tasks(srv, s->be))
|
||||
@ -2095,31 +2095,31 @@ redo:
|
||||
/* Report it if the client got an error or a read timeout expired */
|
||||
req->analysers = 0;
|
||||
if (req->flags & CF_READ_ERROR) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
s->flags |= SF_ERR_CLICL;
|
||||
}
|
||||
else if (req->flags & CF_READ_TIMEOUT) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
s->flags |= SF_ERR_CLITO;
|
||||
}
|
||||
else if (req->flags & CF_WRITE_ERROR) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
s->flags |= SF_ERR_SRVCL;
|
||||
}
|
||||
else {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
s->flags |= SF_ERR_SRVTO;
|
||||
}
|
||||
sess_set_term_flags(s);
|
||||
@ -2128,31 +2128,31 @@ redo:
|
||||
/* Report it if the server got an error or a read timeout expired */
|
||||
res->analysers = 0;
|
||||
if (res->flags & CF_READ_ERROR) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
s->flags |= SF_ERR_SRVCL;
|
||||
}
|
||||
else if (res->flags & CF_READ_TIMEOUT) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
|
||||
s->flags |= SF_ERR_SRVTO;
|
||||
}
|
||||
else if (res->flags & CF_WRITE_ERROR) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
s->flags |= SF_ERR_CLICL;
|
||||
}
|
||||
else {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||
if (srv)
|
||||
HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
_HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
|
||||
s->flags |= SF_ERR_CLITO;
|
||||
}
|
||||
sess_set_term_flags(s);
|
||||
@ -2565,7 +2565,7 @@ redo:
|
||||
}
|
||||
|
||||
if (s->flags & SF_BE_ASSIGNED)
|
||||
HA_ATOMIC_SUB(&s->be->beconn, 1);
|
||||
_HA_ATOMIC_SUB(&s->be->beconn, 1);
|
||||
|
||||
if (unlikely((global.mode & MODE_DEBUG) &&
|
||||
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
|
||||
@ -2587,12 +2587,12 @@ redo:
|
||||
n = 0;
|
||||
|
||||
if (sess->fe->mode == PR_MODE_HTTP) {
|
||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.p.http.rsp[n], 1);
|
||||
_HA_ATOMIC_ADD(&sess->fe->fe_counters.p.http.rsp[n], 1);
|
||||
}
|
||||
if ((s->flags & SF_BE_ASSIGNED) &&
|
||||
(s->be->mode == PR_MODE_HTTP)) {
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.p.http.rsp[n], 1);
|
||||
HA_ATOMIC_ADD(&s->be->be_counters.p.http.cum_req, 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.p.http.rsp[n], 1);
|
||||
_HA_ATOMIC_ADD(&s->be->be_counters.p.http.cum_req, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2672,16 +2672,18 @@ void sess_change_server(struct stream *sess, struct server *newsrv)
|
||||
return;
|
||||
|
||||
if (sess->srv_conn) {
|
||||
HA_ATOMIC_SUB(&sess->srv_conn->served, 1);
|
||||
HA_ATOMIC_SUB(&sess->srv_conn->proxy->served, 1);
|
||||
_HA_ATOMIC_SUB(&sess->srv_conn->served, 1);
|
||||
_HA_ATOMIC_SUB(&sess->srv_conn->proxy->served, 1);
|
||||
__ha_barrier_atomic_store();
|
||||
if (sess->srv_conn->proxy->lbprm.server_drop_conn)
|
||||
sess->srv_conn->proxy->lbprm.server_drop_conn(sess->srv_conn);
|
||||
stream_del_srv_conn(sess);
|
||||
}
|
||||
|
||||
if (newsrv) {
|
||||
HA_ATOMIC_ADD(&newsrv->served, 1);
|
||||
HA_ATOMIC_ADD(&newsrv->proxy->served, 1);
|
||||
_HA_ATOMIC_ADD(&newsrv->served, 1);
|
||||
_HA_ATOMIC_ADD(&newsrv->proxy->served, 1);
|
||||
__ha_barrier_atomic_store();
|
||||
if (newsrv->proxy->lbprm.server_take_conn)
|
||||
newsrv->proxy->lbprm.server_take_conn(newsrv);
|
||||
stream_add_srv_conn(sess, newsrv);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user