diff --git a/include/haproxy/cs_utils.h b/include/haproxy/cs_utils.h index a3a44a9aa..a9ed6a31f 100644 --- a/include/haproxy/cs_utils.h +++ b/include/haproxy/cs_utils.h @@ -33,6 +33,10 @@ #include #include +void cs_update_rx(struct conn_stream *cs); +void cs_update_tx(struct conn_stream *cs); +void cs_update_both(struct conn_stream *csf, struct conn_stream *csb); + /* returns the channel which receives data from this conn-stream (input channel) */ static inline struct channel *cs_ic(struct conn_stream *cs) { @@ -268,6 +272,13 @@ static inline void cs_chk_snd(struct conn_stream *cs) cs->ops->chk_snd(cs); } +/* Combines both cs_update_rx() and cs_update_tx() at once */ +static inline void cs_update(struct conn_stream *cs) +{ + cs_update_rx(cs); + cs_update_tx(cs); +} + /* for debugging, reports the stream interface state name */ static inline const char *cs_state_str(int state) { diff --git a/include/haproxy/stream_interface-t.h b/include/haproxy/stream_interface-t.h index ea02d6c17..7d5e50c37 100644 --- a/include/haproxy/stream_interface-t.h +++ b/include/haproxy/stream_interface-t.h @@ -55,7 +55,7 @@ enum { /* Note that if an applet is registered, the update function will not be called * by the session handler, so it may be used to resync flags at the end of the - * applet handler. See si_update() for reference. + * applet handler. */ struct stream_interface { /* struct members used by the "buffer" side */ diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 3875385c8..45a8bacd3 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -37,10 +37,7 @@ void si_free(struct stream_interface *si); /* main event functions used to move data between sockets and buffers */ void si_applet_wake_cb(struct stream_interface *si); -void si_update_rx(struct stream_interface *si); -void si_update_tx(struct stream_interface *si); struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state); -void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b); int si_sync_recv(struct stream_interface *si); void si_sync_send(struct stream_interface *si); @@ -263,13 +260,6 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait return ret; } -/* Combines both si_update_rx() and si_update_tx() at once */ -static inline void si_update(struct stream_interface *si) -{ - si_update_rx(si); - si_update_tx(si); -} - /* The stream interface is only responsible for the connection during the early * states, before plugging a mux. Thus it should only care about CO_FL_ERROR * before CS_ST_EST, and after that it must absolutely ignore it since the mux diff --git a/src/conn_stream.c b/src/conn_stream.c index 736cfb2da..b3738f6df 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -940,3 +940,134 @@ static void cs_app_chk_snd_applet(struct conn_stream *cs) appctx_wakeup(__cs_appctx(cs)); } } + + +/* This function is designed to be called from within the stream handler to + * update the input channel's expiration timer and the conn-stream's + * Rx flags based on the channel's flags. It needs to be called only once + * after the channel's flags have settled down, and before they are cleared, + * though it doesn't harm to call it as often as desired (it just slightly + * hurts performance). It must not be called from outside of the stream + * handler, as what it does will be used to compute the stream task's + * expiration. + */ +void cs_update_rx(struct conn_stream *cs) +{ + struct channel *ic = cs_ic(cs); + + if (ic->flags & CF_SHUTR) { + si_rx_shut_blk(cs->si); + return; + } + + /* Read not closed, update FD status and timeout for reads */ + if (ic->flags & CF_DONT_READ) + si_rx_chan_blk(cs->si); + else + si_rx_chan_rdy(cs->si); + + if (!channel_is_empty(ic) || !channel_may_recv(ic)) { + /* stop reading, imposed by channel's policy or contents */ + si_rx_room_blk(cs->si); + } + else { + /* (re)start reading and update timeout. Note: we don't recompute the timeout + * every time we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ + si_rx_room_rdy(cs->si); + } + if (cs->si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP) + ic->rex = TICK_ETERNITY; + else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + + cs_chk_rcv(cs); +} + +/* This function is designed to be called from within the stream handler to + * update the output channel's expiration timer and the conn-stream's + * Tx flags based on the channel's flags. It needs to be called only once + * after the channel's flags have settled down, and before they are cleared, + * though it doesn't harm to call it as often as desired (it just slightly + * hurts performance). It must not be called from outside of the stream + * handler, as what it does will be used to compute the stream task's + * expiration. + */ +void cs_update_tx(struct conn_stream *cs) +{ + struct channel *oc = cs_oc(cs); + struct channel *ic = cs_ic(cs); + + if (oc->flags & CF_SHUTW) + return; + + /* Write not closed, update FD status and timeout for writes */ + if (channel_is_empty(oc)) { + /* stop writing */ + if (!(cs->si->flags & SI_FL_WAIT_DATA)) { + if ((oc->flags & CF_SHUTW_NOW) == 0) + cs->si->flags |= SI_FL_WAIT_DATA; + oc->wex = TICK_ETERNITY; + } + return; + } + + /* (re)start writing and update timeout. Note: we don't recompute the timeout + * every time we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ + cs->si->flags &= ~SI_FL_WAIT_DATA; + if (!tick_isset(oc->wex)) { + oc->wex = tick_add_ifset(now_ms, oc->wto); + if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) { + /* Note: depending on the protocol, we don't know if we're waiting + * for incoming data or not. So in order to prevent the socket from + * expiring read timeouts during writes, we refresh the read timeout, + * except if it was already infinite or if we have explicitly setup + * independent streams. + */ + ic->rex = tick_add_ifset(now_ms, ic->rto); + } + } +} + +/* Updates at once the channel flags, and timers of both conn-streams of a + * same stream, to complete the work after the analysers, then updates the data + * layer below. This will ensure that any synchronous update performed at the + * data layer will be reflected in the channel flags and/or conn-stream. + * Note that this does not change the conn-stream's current state, though + * it updates the previous state to the current one. + */ +void cs_update_both(struct conn_stream *csf, struct conn_stream *csb) +{ + struct channel *req = cs_ic(csf); + struct channel *res = cs_oc(csf); + + req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); + res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); + + __cs_strm(csb)->prev_conn_state = csb->state; + + /* let's recompute both sides states */ + if (cs_state_in(csf->state, CS_SB_RDY|CS_SB_EST)) + cs_update(csf); + + if (cs_state_in(csb->state, CS_SB_RDY|CS_SB_EST)) + cs_update(csb); + + /* stream ints are processed outside of process_stream() and must be + * handled at the latest moment. + */ + if (cs_appctx(csf) && + ((si_rx_endp_ready(csf->si) && !si_rx_blocked(csf->si)) || + (si_tx_endp_ready(csf->si) && !si_tx_blocked(csf->si)))) + appctx_wakeup(__cs_appctx(csf)); + + if (cs_appctx(csb) && + ((si_rx_endp_ready(csb->si) && !si_rx_blocked(csb->si)) || + (si_tx_endp_ready(csb->si) && !si_tx_blocked(csb->si)))) + appctx_wakeup(__cs_appctx(csb)); +} diff --git a/src/hlua.c b/src/hlua.c index 449452576..fa0a211df 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1953,7 +1953,7 @@ static void hlua_socket_handler(struct appctx *appctx) * interface. */ if (!channel_is_empty(cs_ic(cs))) - si_update(cs->si); + cs_update(cs); /* If write notifications are registered, we considers we want * to write, so we clear the blocking flag. diff --git a/src/stream.c b/src/stream.c index 44ed445ef..30c347bb0 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2455,7 +2455,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE)) stream_process_counters(s); - si_update_both(si_f, si_b); + cs_update_both(s->csf, s->csb); /* Trick: if a request is being waiting for the server to respond, * and if we know the server can timeout, we don't want the timeout diff --git a/src/stream_interface.c b/src/stream_interface.c index e252cd5d2..0f060ec05 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -77,14 +77,14 @@ void si_free(struct stream_interface *si) pool_free(pool_head_streaminterface, si); } -/* This function is the equivalent to si_update() except that it's +/* This function is the equivalent to cs_update() except that it's * designed to be called from outside the stream handlers, typically the lower * layers (applets, connections) after I/O completion. After updating the stream * interface and timeouts, it will try to forward what can be forwarded, then to * wake the associated task up if an important event requires special handling. * It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are * encouraged to watch to take appropriate action. - * It should not be called from within the stream itself, si_update() + * It should not be called from within the stream itself, cs_update() * is designed for this. */ static void stream_int_notify(struct stream_interface *si) @@ -474,98 +474,6 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state) return t; } -/* This function is designed to be called from within the stream handler to - * update the input channel's expiration timer and the stream interface's - * Rx flags based on the channel's flags. It needs to be called only once - * after the channel's flags have settled down, and before they are cleared, - * though it doesn't harm to call it as often as desired (it just slightly - * hurts performance). It must not be called from outside of the stream - * handler, as what it does will be used to compute the stream task's - * expiration. - */ -void si_update_rx(struct stream_interface *si) -{ - struct channel *ic = si_ic(si); - - if (ic->flags & CF_SHUTR) { - si_rx_shut_blk(si); - return; - } - - /* Read not closed, update FD status and timeout for reads */ - if (ic->flags & CF_DONT_READ) - si_rx_chan_blk(si); - else - si_rx_chan_rdy(si); - - if (!channel_is_empty(ic) || !channel_may_recv(ic)) { - /* stop reading, imposed by channel's policy or contents */ - si_rx_room_blk(si); - } - else { - /* (re)start reading and update timeout. Note: we don't recompute the timeout - * every time we get here, otherwise it would risk never to expire. We only - * update it if is was not yet set. The stream socket handler will already - * have updated it if there has been a completed I/O. - */ - si_rx_room_rdy(si); - } - if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP) - ic->rex = TICK_ETERNITY; - else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); - - cs_chk_rcv(si->cs); -} - -/* This function is designed to be called from within the stream handler to - * update the output channel's expiration timer and the stream interface's - * Tx flags based on the channel's flags. It needs to be called only once - * after the channel's flags have settled down, and before they are cleared, - * though it doesn't harm to call it as often as desired (it just slightly - * hurts performance). It must not be called from outside of the stream - * handler, as what it does will be used to compute the stream task's - * expiration. - */ -void si_update_tx(struct stream_interface *si) -{ - struct channel *oc = si_oc(si); - struct channel *ic = si_ic(si); - - if (oc->flags & CF_SHUTW) - return; - - /* Write not closed, update FD status and timeout for writes */ - if (channel_is_empty(oc)) { - /* stop writing */ - if (!(si->flags & SI_FL_WAIT_DATA)) { - if ((oc->flags & CF_SHUTW_NOW) == 0) - si->flags |= SI_FL_WAIT_DATA; - oc->wex = TICK_ETERNITY; - } - return; - } - - /* (re)start writing and update timeout. Note: we don't recompute the timeout - * every time we get here, otherwise it would risk never to expire. We only - * update it if is was not yet set. The stream socket handler will already - * have updated it if there has been a completed I/O. - */ - si->flags &= ~SI_FL_WAIT_DATA; - if (!tick_isset(oc->wex)) { - oc->wex = tick_add_ifset(now_ms, oc->wto); - if (tick_isset(ic->rex) && !(si->cs->flags & CS_FL_INDEP_STR)) { - /* Note: depending on the protocol, we don't know if we're waiting - * for incoming data or not. So in order to prevent the socket from - * expiring read timeouts during writes, we refresh the read timeout, - * except if it was already infinite or if we have explicitly setup - * independent streams. - */ - ic->rex = tick_add_ifset(now_ms, ic->rto); - } - } -} - /* This tries to perform a synchronous receive on the stream interface to * try to collect last arrived data. In practice it's only implemented on * conn_streams. Returns 0 if nothing was done, non-zero if new data or a @@ -615,44 +523,6 @@ void si_sync_send(struct stream_interface *si) si_cs_send(si->cs); } -/* Updates at once the channel flags, and timers of both stream interfaces of a - * same stream, to complete the work after the analysers, then updates the data - * layer below. This will ensure that any synchronous update performed at the - * data layer will be reflected in the channel flags and/or stream-interface. - * Note that this does not change the stream interface's current state, though - * it updates the previous state to the current one. - */ -void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b) -{ - struct channel *req = si_ic(si_f); - struct channel *res = si_oc(si_f); - - req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); - res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); - - si_strm(si_b)->prev_conn_state = si_b->cs->state; - - /* let's recompute both sides states */ - if (cs_state_in(si_f->cs->state, CS_SB_RDY|CS_SB_EST)) - si_update(si_f); - - if (cs_state_in(si_b->cs->state, CS_SB_RDY|CS_SB_EST)) - si_update(si_b); - - /* stream ints are processed outside of process_stream() and must be - * handled at the latest moment. - */ - if (cs_appctx(si_f->cs) && - ((si_rx_endp_ready(si_f) && !si_rx_blocked(si_f)) || - (si_tx_endp_ready(si_f) && !si_tx_blocked(si_f)))) - appctx_wakeup(__cs_appctx(si_f->cs)); - - if (cs_appctx(si_b->cs) && - ((si_rx_endp_ready(si_b) && !si_rx_blocked(si_b)) || - (si_tx_endp_ready(si_b) && !si_tx_blocked(si_b)))) - appctx_wakeup(__cs_appctx(si_b->cs)); -} - /* * This is the callback which is called by the connection layer to receive data * into the buffer from the connection. It iterates over the mux layer's