MINOR: stream-int/conn-stream: Move si_shut* and si_chk* in conn-stream scope

si_shutr(), si_shutw(), si_chk_rcv() and si_chk_snd() are moved in the
conn-stream scope and renamed, respectively, cs_shutr(), cs_shutw(),
cs_chk_rcv(), cs_chk_snd() and manipulate a conn-stream instead of a
stream-interface.
This commit is contained in:
Christopher Faulet 2022-03-31 17:44:45 +02:00
parent 69ef6c9ef4
commit da098e6c17
17 changed files with 97 additions and 97 deletions

View File

@ -1533,7 +1533,7 @@ static void promex_appctx_handle_io(struct appctx *appctx)
case PROMEX_ST_END:
if (!(res->flags & CF_SHUTR)) {
res->flags |= CF_READ_NULL;
si_shutr(cs->si);
cs_shutr(cs);
}
}
@ -1549,8 +1549,8 @@ static void promex_appctx_handle_io(struct appctx *appctx)
error:
res->flags |= CF_READ_NULL;
si_shutr(cs->si);
si_shutw(cs->si);
cs_shutr(cs);
cs_shutw(cs);
}
struct applet promex_applet = {

View File

@ -273,43 +273,43 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait
return ret;
}
/* Sends a shutr to the connection using the data layer */
static inline void si_shutr(struct stream_interface *si)
/* Sends a shutr to the endpoint using the data layer */
static inline void cs_shutr(struct conn_stream *cs)
{
si->ops->shutr(si);
cs->si->ops->shutr(cs->si);
}
/* Sends a shutw to the connection using the data layer */
static inline void si_shutw(struct stream_interface *si)
/* Sends a shutw to the endpoint using the data layer */
static inline void cs_shutw(struct conn_stream *cs)
{
si->ops->shutw(si);
cs->si->ops->shutw(cs->si);
}
/* This is to be used after making some room available in a channel. It will
* return without doing anything if the stream interface's RX path is blocked.
* return without doing anything if the conn-stream's RX path is blocked.
* It will automatically mark the stream interface as busy processing the end
* point in order to avoid useless repeated wakeups.
* It will then call ->chk_rcv() to enable receipt of new data.
*/
static inline void si_chk_rcv(struct stream_interface *si)
static inline void cs_chk_rcv(struct conn_stream *cs)
{
if (si->flags & SI_FL_RXBLK_CONN && cs_state_in(si_opposite(si)->cs->state, CS_SB_RDY|CS_SB_EST|CS_SB_DIS|CS_SB_CLO))
si_rx_conn_rdy(si);
if (cs->si->flags & SI_FL_RXBLK_CONN && cs_state_in(cs_opposite(cs)->state, CS_SB_RDY|CS_SB_EST|CS_SB_DIS|CS_SB_CLO))
si_rx_conn_rdy(cs->si);
if (si_rx_blocked(si) || !si_rx_endp_ready(si))
if (si_rx_blocked(cs->si) || !si_rx_endp_ready(cs->si))
return;
if (!cs_state_in(si->cs->state, CS_SB_RDY|CS_SB_EST))
if (!cs_state_in(cs->state, CS_SB_RDY|CS_SB_EST))
return;
si->flags |= SI_FL_RX_WAIT_EP;
si->ops->chk_rcv(si);
cs->si->flags |= SI_FL_RX_WAIT_EP;
cs->si->ops->chk_rcv(cs->si);
}
/* Calls chk_snd on the connection using the data layer */
static inline void si_chk_snd(struct stream_interface *si)
/* Calls chk_snd on the endpoint using the data layer */
static inline void cs_chk_snd(struct conn_stream *cs)
{
si->ops->chk_snd(si);
cs->si->ops->chk_snd(cs->si);
}
/* Combines both si_update_rx() and si_update_tx() at once */

View File

@ -1983,8 +1983,8 @@ void back_try_conn_req(struct stream *s)
process_srv_queue(srv);
/* Failed and not retryable. */
si_shutr(cs->si);
si_shutw(cs->si);
cs_shutr(cs);
cs_shutw(cs);
req->flags |= CF_WRITE_ERROR;
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
@ -2043,8 +2043,8 @@ void back_try_conn_req(struct stream *s)
if (srv)
_HA_ATOMIC_INC(&srv->counters.failed_conns);
_HA_ATOMIC_INC(&s->be->be_counters.failed_conns);
si_shutr(cs->si);
si_shutw(cs->si);
cs_shutr(cs);
cs_shutw(cs);
req->flags |= CF_WRITE_TIMEOUT;
if (!s->conn_err_type)
s->conn_err_type = STRM_ET_QUEUE_TO;
@ -2103,8 +2103,8 @@ void back_try_conn_req(struct stream *s)
/* give up */
s->conn_exp = TICK_ETERNITY;
s->flags &= ~SF_CONN_EXP;
si_shutr(cs->si);
si_shutw(cs->si);
cs_shutr(cs);
cs_shutw(cs);
cs->state = CS_ST_CLO;
if (s->srv_error)
s->srv_error(s, cs->si);
@ -2141,8 +2141,8 @@ void back_handle_st_req(struct stream *s)
*/
s->flags &= ~(SF_ERR_MASK | SF_FINST_MASK);
si_shutr(cs->si);
si_shutw(cs->si);
cs_shutr(cs);
cs_shutw(cs);
s->req.flags |= CF_WRITE_ERROR;
s->conn_err_type = STRM_ET_CONN_RES;
cs->state = CS_ST_CLO;
@ -2175,8 +2175,8 @@ void back_handle_st_req(struct stream *s)
}
/* we did not get any server, let's check the cause */
si_shutr(cs->si);
si_shutw(cs->si);
cs_shutr(cs);
cs_shutw(cs);
s->req.flags |= CF_WRITE_ERROR;
if (!s->conn_err_type)
s->conn_err_type = STRM_ET_CONN_OTHER;
@ -2217,7 +2217,7 @@ void back_handle_st_con(struct stream *s)
((req->flags & CF_SHUTW_NOW) &&
(channel_is_empty(req) || (s->be->options & PR_O_ABRT_CLOSE)))) {
cs->flags |= CS_FL_NOLINGER;
si_shutw(cs->si);
cs_shutw(cs);
s->conn_err_type |= STRM_ET_CONN_ABRT;
if (s->srv_error)
s->srv_error(s, cs->si);
@ -2312,8 +2312,8 @@ void back_handle_st_cer(struct stream *s)
if (may_dequeue_tasks(objt_server(s->target), s->be))
process_srv_queue(objt_server(s->target));
/* shutw is enough so stop a connecting socket */
si_shutw(cs->si);
/* shutw is enough to stop a connecting socket */
cs_shutw(cs);
s->req.flags |= CF_WRITE_ERROR;
s->res.flags |= CF_READ_ERROR;
@ -2346,8 +2346,8 @@ void back_handle_st_cer(struct stream *s)
if (may_dequeue_tasks(objt_server(s->target), s->be))
process_srv_queue(objt_server(s->target));
/* shutw is enough so stop a connecting socket */
si_shutw(cs->si);
/* shutw is enough to stop a connecting socket */
cs_shutw(cs);
s->req.flags |= CF_WRITE_ERROR;
s->res.flags |= CF_READ_ERROR;
@ -2433,7 +2433,7 @@ void back_handle_st_rdy(struct stream *s)
(channel_is_empty(req) || (s->be->options & PR_O_ABRT_CLOSE)))) {
/* give up */
cs->flags |= CS_FL_NOLINGER;
si_shutw(cs->si);
cs_shutw(cs);
s->conn_err_type |= STRM_ET_CONN_ABRT;
if (s->srv_error)
s->srv_error(s, cs->si);

View File

@ -1510,7 +1510,7 @@ static void http_cache_io_handler(struct appctx *appctx)
end:
if (!(res->flags & CF_SHUTR) && appctx->st0 == HTX_CACHE_END) {
res->flags |= CF_READ_NULL;
si_shutr(cs->si);
cs_shutr(cs);
}
out:

View File

@ -907,7 +907,7 @@ static void cli_io_handler(struct appctx *appctx)
/* Let's close for real now. We just close the request
* side, the conditions below will complete if needed.
*/
si_shutw(cs->si);
cs_shutw(cs);
free_trash_chunk(appctx->chunk);
appctx->chunk = NULL;
break;
@ -1154,7 +1154,7 @@ static void cli_io_handler(struct appctx *appctx)
* we forward the close to the request side so that it flows upstream to
* the client.
*/
si_shutw(cs->si);
cs_shutw(cs);
}
if ((req->flags & CF_SHUTW) && (cs->state == CS_ST_EST) && (appctx->st0 < CLI_ST_OUTPUT)) {
@ -1164,7 +1164,7 @@ static void cli_io_handler(struct appctx *appctx)
* the client side has closed. So we'll forward this state downstream
* on the response buffer.
*/
si_shutr(cs->si);
cs_shutr(cs);
res->flags |= CF_READ_NULL;
}
@ -2685,8 +2685,8 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
pcli_write_prompt(s);
s->csb->flags |= CS_FL_NOLINGER | CS_FL_NOHALF;
si_shutr(cs_si(s->csb));
si_shutw(cs_si(s->csb));
cs_shutr(s->csb);
cs_shutw(s->csb);
/*
* starting from there this the same code as

View File

@ -736,8 +736,8 @@ static void dns_session_io_handler(struct appctx *appctx)
return;
close:
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;
}

View File

@ -1254,8 +1254,8 @@ spoe_release_appctx(struct appctx *appctx)
if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
spoe_appctx->status_code = SPOE_FRM_ERR_IO;
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;
}
@ -1960,8 +1960,8 @@ spoe_handle_appctx(struct appctx *appctx)
appctx->st0 = SPOE_APPCTX_ST_END;
SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;
/* fall through */

View File

@ -1912,8 +1912,8 @@ static void hlua_socket_handler(struct appctx *appctx)
struct conn_stream *cs = appctx->owner;
if (appctx->ctx.hlua_cosocket.die) {
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;
notification_wake(&appctx->ctx.hlua_cosocket.wake_on_read);
notification_wake(&appctx->ctx.hlua_cosocket.wake_on_write);
@ -9296,7 +9296,7 @@ void hlua_applet_tcp_fct(struct appctx *ctx)
/* eat the whole request */
co_skip(cs_oc(cs), co_data(cs_oc(cs)));
res->flags |= CF_READ_NULL;
si_shutr(cs->si);
cs_shutr(cs);
return;
/* yield. */
@ -9341,8 +9341,8 @@ void hlua_applet_tcp_fct(struct appctx *ctx)
error:
/* For all other cases, just close the stream. */
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
ctx->ctx.hlua_apptcp.flags |= APPLET_DONE;
}
@ -9576,7 +9576,7 @@ void hlua_applet_http_fct(struct appctx *ctx)
if (ctx->ctx.hlua_apphttp.flags & APPLET_DONE) {
if (!(res->flags & CF_SHUTR)) {
res->flags |= CF_READ_NULL;
si_shutr(cs->si);
cs_shutr(cs);
}
/* eat the whole request */

View File

@ -4260,8 +4260,8 @@ void http_perform_server_redirect(struct stream *s, struct stream_interface *si)
goto fail;
/* return without error. */
si_shutr(si);
si_shutw(si);
cs_shutr(si->cs);
cs_shutw(si->cs);
s->conn_err_type = STRM_ET_NONE;
si->cs->state = CS_ST_CLO;

View File

@ -211,8 +211,8 @@ static int hc_cli_io_handler(struct appctx *appctx)
/* we must close only if F_END is the last flag */
if (appctx->ctx.cli.i0 == HC_CLI_F_RES_END) {
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
appctx->ctx.cli.i0 &= ~HC_CLI_F_RES_END;
goto out;
}
@ -948,8 +948,8 @@ static void httpclient_applet_io_handler(struct appctx *appctx)
return;
end:
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
return;
}

View File

@ -3682,8 +3682,8 @@ static void syslog_io_handler(struct appctx *appctx)
_HA_ATOMIC_INC(&frontend->fe_counters.cli_aborts);
close:
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;

View File

@ -3099,8 +3099,8 @@ static void peer_io_handler(struct appctx *appctx)
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
curpeer = NULL;
}
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;
goto out;
}

View File

@ -424,8 +424,8 @@ static void sink_forward_io_handler(struct appctx *appctx)
return;
close:
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;
}
@ -568,8 +568,8 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
return;
close:
si_shutw(cs->si);
si_shutr(cs->si);
cs_shutw(cs);
cs_shutr(cs);
cs_ic(cs)->flags |= CF_READ_NULL;
}

View File

@ -4340,7 +4340,7 @@ static void http_stats_io_handler(struct appctx *appctx)
if (appctx->st0 == STAT_HTTP_END) {
if (!(res->flags & CF_SHUTR)) {
res->flags |= CF_READ_NULL;
si_shutr(cs->si);
cs_shutr(cs);
}
/* eat the whole request */

View File

@ -956,7 +956,7 @@ static void back_establish(struct stream *s)
* delayed recv here to give a chance to the data to flow back
* by the time we process other tasks.
*/
si_chk_rcv(si);
cs_chk_rcv(si->cs);
}
req->wex = TICK_ETERNITY;
/* If we managed to get the whole response, and we don't have anything
@ -1664,26 +1664,26 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (unlikely((req->flags & (CF_SHUTW|CF_WRITE_TIMEOUT)) == CF_WRITE_TIMEOUT)) {
s->csb->flags |= CS_FL_NOLINGER;
si_shutw(si_b);
cs_shutw(s->csb);
}
if (unlikely((req->flags & (CF_SHUTR|CF_READ_TIMEOUT)) == CF_READ_TIMEOUT)) {
if (s->csf->flags & CS_FL_NOHALF)
s->csf->flags |= CS_FL_NOLINGER;
si_shutr(si_f);
cs_shutr(s->csf);
}
channel_check_timeouts(res);
if (unlikely((res->flags & (CF_SHUTW|CF_WRITE_TIMEOUT)) == CF_WRITE_TIMEOUT)) {
s->csf->flags |= CS_FL_NOLINGER;
si_shutw(si_f);
cs_shutw(s->csf);
}
if (unlikely((res->flags & (CF_SHUTR|CF_READ_TIMEOUT)) == CF_READ_TIMEOUT)) {
if (s->csb->flags & CS_FL_NOHALF)
s->csb->flags |= CS_FL_NOLINGER;
si_shutr(si_b);
cs_shutr(s->csb);
}
if (HAS_FILTERS(s))
@ -1738,8 +1738,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
srv = objt_server(s->target);
if (unlikely(s->csf->endp->flags & CS_EP_ERROR)) {
if (cs_state_in(s->csf->state, CS_SB_EST|CS_SB_DIS)) {
si_shutr(si_f);
si_shutw(si_f);
cs_shutr(s->csf);
cs_shutw(s->csf);
cs_report_error(si_f->cs);
if (!(req->analysers) && !(res->analysers)) {
_HA_ATOMIC_INC(&s->be->be_counters.cli_aborts);
@ -1758,8 +1758,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (unlikely(s->csb->endp->flags & CS_EP_ERROR)) {
if (cs_state_in(s->csb->state, CS_SB_EST|CS_SB_DIS)) {
si_shutr(si_b);
si_shutw(si_b);
cs_shutr(s->csb);
cs_shutw(s->csb);
cs_report_error(si_b->cs);
_HA_ATOMIC_INC(&s->be->be_counters.failed_resp);
if (srv)
@ -2291,7 +2291,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
channel_is_empty(req))) {
if (req->flags & CF_READ_ERROR)
s->csb->flags |= CS_FL_NOLINGER;
si_shutw(si_b);
cs_shutw(s->csb);
}
/* shutdown(write) done on server side, we must stop the client too */
@ -2303,7 +2303,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) {
if (s->csf->flags & CS_FL_NOHALF)
s->csf->flags |= CS_FL_NOLINGER;
si_shutr(si_f);
cs_shutr(s->csf);
}
/* Benchmarks have shown that it's optimal to do a full resync now */
@ -2416,7 +2416,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
/* shutdown(write) pending */
if (unlikely((res->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
channel_is_empty(res))) {
si_shutw(si_f);
cs_shutw(s->csf);
}
/* shutdown(write) done on the client side, we must stop the server too */
@ -2428,7 +2428,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (unlikely((res->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) {
if (s->csb->flags & CS_FL_NOHALF)
s->csb->flags |= CS_FL_NOLINGER;
si_shutr(si_b);
cs_shutr(s->csb);
}
if (s->csf->state == CS_ST_DIS ||

View File

@ -408,7 +408,7 @@ static void stream_int_notify(struct stream_interface *si)
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
(si->cs->state == CS_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
si_shutw(si);
cs_shutw(si->cs);
oc->wex = TICK_ETERNITY;
}
@ -457,7 +457,7 @@ static void stream_int_notify(struct stream_interface *si)
if (ic->pipe)
last_len += ic->pipe->data;
si_chk_snd(sio);
cs_chk_snd(sio->cs);
new_len = co_data(ic);
if (ic->pipe)
@ -473,14 +473,14 @@ static void stream_int_notify(struct stream_interface *si)
if (!(ic->flags & CF_DONT_READ))
si_rx_chan_rdy(si);
si_chk_rcv(si);
si_chk_rcv(sio);
cs_chk_rcv(si->cs);
cs_chk_rcv(sio->cs);
if (si_rx_blocked(si)) {
ic->rex = TICK_ETERNITY;
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
/* we must re-enable reading if si_chk_snd() has freed some space */
/* we must re-enable reading if cs_chk_snd() has freed some space */
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
@ -841,7 +841,7 @@ void si_update_rx(struct stream_interface *si)
else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
si_chk_rcv(si);
cs_chk_rcv(si->cs);
}
/* This function is designed to be called from within the stream handler to
@ -1158,25 +1158,25 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
cs_state_in(cs->state, CS_SB_RDY|CS_SB_EST)) {
si_shutw(si);
cs_shutw(cs);
goto out_wakeup;
}
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
si->flags |= SI_FL_WAIT_DATA;
cs->si->flags |= SI_FL_WAIT_DATA;
oc->wex = TICK_ETERNITY;
}
else {
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
si->flags &= ~SI_FL_WAIT_DATA;
cs->si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
}
if (likely(oc->flags & CF_WRITE_ACTIVITY)) {
struct channel *ic = si_ic(si);
struct channel *ic = cs_ic(cs);
/* update timeout if we have written something */
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
@ -1204,8 +1204,8 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
((channel_is_empty(oc) && !oc->to_forward) ||
!cs_state_in(cs->state, CS_SB_EST))))) {
out_wakeup:
if (!(si->cs->flags & CS_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
}
@ -1575,7 +1575,7 @@ static void stream_int_read0(struct stream_interface *si)
return;
do_close:
/* OK we completely close the socket here just as if we went through si_shut[rw]() */
/* OK we completely close the socket here just as if we went through cs_shut[rw]() */
cs_conn_close(cs);
oc->flags &= ~CF_SHUTW_NOW;

View File

@ -394,8 +394,8 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit)
else if (rule->action == ACT_TCP_CLOSE) {
chn_prod(rep)->flags |= CS_FL_NOLINGER | CS_FL_NOHALF;
cs_must_kill_conn(chn_prod(rep));
si_shutr(chn_prod(rep)->si);
si_shutw(chn_prod(rep)->si);
cs_shutr(chn_prod(rep));
cs_shutw(chn_prod(rep));
s->last_rule_file = rule->conf.file;
s->last_rule_line = rule->conf.line;
goto end;