diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 47acb7606..890bfb8ec 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -48,6 +48,7 @@ extern struct data_cb si_idle_conn_cb; struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app); void si_applet_done(struct stream_interface *si); +void stream_int_update(struct stream_interface *si); /* returns the channel which receives data from this stream interface (input channel) */ static inline struct channel *si_ic(struct stream_interface *si) diff --git a/src/stream_interface.c b/src/stream_interface.c index af415ac6c..4075f686d 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -706,6 +706,74 @@ static void si_conn_send(struct connection *conn) } } +/* This function is designed to be called from within the stream handler to + * update the channels' expiration timers and the stream interface's flags + * based on the channels' flags. It needs to be called only once after the + * channels' flags have settled down, and before they are cleared, though it + * doesn't harm to call it as often as desired (it just slightly hurts + * performance). It must not be called from outside of the stream handler, + * as what it does will be used to compute the stream task's expiration. + */ +void stream_int_update(struct stream_interface *si) +{ + struct channel *ic = si_ic(si); + struct channel *oc = si_oc(si); + + if (!(ic->flags & CF_SHUTR)) { + /* Read not closed, update FD status and timeout for reads */ + if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { + /* stop reading */ + if (!(si->flags & SI_FL_WAIT_ROOM)) { + if (!(ic->flags & CF_DONT_READ)) /* full */ + si->flags |= SI_FL_WAIT_ROOM; + ic->rex = TICK_ETERNITY; + } + } + else { + /* (re)start reading and update timeout. Note: we don't recompute the timeout + * everytime we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ + si->flags &= ~SI_FL_WAIT_ROOM; + if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + } + } + + if (!(oc->flags & CF_SHUTW)) { + /* Write not closed, update FD status and timeout for writes */ + if (channel_is_empty(oc)) { + /* stop writing */ + if (!(si->flags & SI_FL_WAIT_DATA)) { + if ((oc->flags & CF_SHUTW_NOW) == 0) + si->flags |= SI_FL_WAIT_DATA; + oc->wex = TICK_ETERNITY; + } + } + else { + /* (re)start writing and update timeout. Note: we don't recompute the timeout + * everytime we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ + si->flags &= ~SI_FL_WAIT_DATA; + if (!tick_isset(oc->wex)) { + oc->wex = tick_add_ifset(now_ms, oc->wto); + if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) { + /* Note: depending on the protocol, we don't know if we're waiting + * for incoming data or not. So in order to prevent the socket from + * expiring read timeouts during writes, we refresh the read timeout, + * except if it was already infinite or if we have explicitly setup + * independent streams. + */ + ic->rex = tick_add_ifset(now_ms, ic->rto); + } + } + } + } +} + /* Updates the timers and flags of a stream interface attached to a connection, * depending on the buffers' flags. It should only be called once after the