diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 46df1bdb4..720244318 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -45,7 +45,8 @@ void si_retnclose(struct stream_interface *si, const struct buffer *msg); int conn_si_send_proxy(struct connection *conn, unsigned int flag); struct appctx *si_register_handler(struct stream_interface *si, struct applet *app); void si_applet_wake_cb(struct stream_interface *si); -void si_update(struct stream_interface *si); +void si_update_rx(struct stream_interface *si); +void si_update_tx(struct stream_interface *si); int si_cs_recv(struct conn_stream *cs); struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state); void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b); @@ -529,6 +530,13 @@ static inline int si_connect(struct stream_interface *si, struct connection *con return ret; } +/* Combines both si_update_rx() and si_update_tx() at once */ +static inline void si_update(struct stream_interface *si) +{ + si_update_rx(si); + si_update_tx(si); +} + /* Returns info about the conn_stream , if not NULL. It call the mux layer's * get_cs_info() function, if it exists. On success, it returns a cs_info * structure. Otherwise, on error, if the mux does not implement get_cs_info() diff --git a/src/stream_interface.c b/src/stream_interface.c index 046a80e43..e7273b730 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -793,76 +793,93 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state) } /* This function is designed to be called from within the stream handler to - * update the channels' expiration timers and the stream interface's flags - * based on the channels' flags. It needs to be called only once after the - * channels' flags have settled down, and before they are cleared, though it - * doesn't harm to call it as often as desired (it just slightly hurts - * performance). It must not be called from outside of the stream handler, - * as what it does will be used to compute the stream task's expiration. + * update the input channel's expiration timer and the stream interface's + * Rx flags based on the channel's flags. It needs to be called only once + * after the channel's flags have settled down, and before they are cleared, + * though it doesn't harm to call it as often as desired (it just slightly + * hurts performance). It must not be called from outside of the stream + * handler, as what it does will be used to compute the stream task's + * expiration. */ -void si_update(struct stream_interface *si) +void si_update_rx(struct stream_interface *si) { struct channel *ic = si_ic(si); - struct channel *oc = si_oc(si); - if (!(ic->flags & CF_SHUTR)) { - /* Read not closed, update FD status and timeout for reads */ - if (ic->flags & CF_DONT_READ) - si_rx_chan_blk(si); - else - si_rx_chan_rdy(si); - - if (!channel_is_empty(ic)) { - /* stop reading, imposed by channel's policy or contents */ - si_rx_room_blk(si); - } - else { - /* (re)start reading and update timeout. Note: we don't recompute the timeout - * everytime we get here, otherwise it would risk never to expire. We only - * update it if is was not yet set. The stream socket handler will already - * have updated it if there has been a completed I/O. - */ - si_rx_room_rdy(si); - } - if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP) - ic->rex = TICK_ETERNITY; - else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); - - si_chk_rcv(si); - } - else + if (ic->flags & CF_SHUTR) { si_rx_shut_blk(si); + return; + } - if (!(oc->flags & CF_SHUTW)) { - /* Write not closed, update FD status and timeout for writes */ - if (channel_is_empty(oc)) { - /* stop writing */ - if (!(si->flags & SI_FL_WAIT_DATA)) { - if ((oc->flags & CF_SHUTW_NOW) == 0) - si->flags |= SI_FL_WAIT_DATA; - oc->wex = TICK_ETERNITY; - } + /* Read not closed, update FD status and timeout for reads */ + if (ic->flags & CF_DONT_READ) + si_rx_chan_blk(si); + else + si_rx_chan_rdy(si); + + if (!channel_is_empty(ic)) { + /* stop reading, imposed by channel's policy or contents */ + si_rx_room_blk(si); + } + else { + /* (re)start reading and update timeout. Note: we don't recompute the timeout + * everytime we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ + si_rx_room_rdy(si); + } + if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP) + ic->rex = TICK_ETERNITY; + else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + + si_chk_rcv(si); +} + +/* This function is designed to be called from within the stream handler to + * update the output channel's expiration timer and the stream interface's + * Tx flags based on the channel's flags. It needs to be called only once + * after the channel's flags have settled down, and before they are cleared, + * though it doesn't harm to call it as often as desired (it just slightly + * hurts performance). It must not be called from outside of the stream + * handler, as what it does will be used to compute the stream task's + * expiration. + */ +void si_update_tx(struct stream_interface *si) +{ + struct channel *oc = si_oc(si); + struct channel *ic = si_ic(si); + + if (oc->flags & CF_SHUTW) + return; + + /* Write not closed, update FD status and timeout for writes */ + if (channel_is_empty(oc)) { + /* stop writing */ + if (!(si->flags & SI_FL_WAIT_DATA)) { + if ((oc->flags & CF_SHUTW_NOW) == 0) + si->flags |= SI_FL_WAIT_DATA; + oc->wex = TICK_ETERNITY; } - else { - /* (re)start writing and update timeout. Note: we don't recompute the timeout - * everytime we get here, otherwise it would risk never to expire. We only - * update it if is was not yet set. The stream socket handler will already - * have updated it if there has been a completed I/O. + return; + } + + /* (re)start writing and update timeout. Note: we don't recompute the timeout + * everytime we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ + si->flags &= ~SI_FL_WAIT_DATA; + if (!tick_isset(oc->wex)) { + oc->wex = tick_add_ifset(now_ms, oc->wto); + if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) { + /* Note: depending on the protocol, we don't know if we're waiting + * for incoming data or not. So in order to prevent the socket from + * expiring read timeouts during writes, we refresh the read timeout, + * except if it was already infinite or if we have explicitly setup + * independent streams. */ - si->flags &= ~SI_FL_WAIT_DATA; - if (!tick_isset(oc->wex)) { - oc->wex = tick_add_ifset(now_ms, oc->wto); - if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) { - /* Note: depending on the protocol, we don't know if we're waiting - * for incoming data or not. So in order to prevent the socket from - * expiring read timeouts during writes, we refresh the read timeout, - * except if it was already infinite or if we have explicitly setup - * independent streams. - */ - ic->rex = tick_add_ifset(now_ms, ic->rto); - } - } + ic->rex = tick_add_ifset(now_ms, ic->rto); } } }