MINOR: stream-int: split si_update() into si_update_rx() and si_update_tx()

We should not update the two directions at once, in fact we should update
the Rx path after recv() and the Tx path after send(). Let's start by
splitting the update function in two for this.
This commit is contained in:
Willy Tarreau 2019-06-06 08:19:20 +02:00
parent d66ed88a78
commit 236c4298b3
2 changed files with 88 additions and 63 deletions

View File

@ -45,7 +45,8 @@ void si_retnclose(struct stream_interface *si, const struct buffer *msg);
int conn_si_send_proxy(struct connection *conn, unsigned int flag); int conn_si_send_proxy(struct connection *conn, unsigned int flag);
struct appctx *si_register_handler(struct stream_interface *si, struct applet *app); struct appctx *si_register_handler(struct stream_interface *si, struct applet *app);
void si_applet_wake_cb(struct stream_interface *si); void si_applet_wake_cb(struct stream_interface *si);
void si_update(struct stream_interface *si); void si_update_rx(struct stream_interface *si);
void si_update_tx(struct stream_interface *si);
int si_cs_recv(struct conn_stream *cs); int si_cs_recv(struct conn_stream *cs);
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state); struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state);
void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b); void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b);
@ -529,6 +530,13 @@ static inline int si_connect(struct stream_interface *si, struct connection *con
return ret; return ret;
} }
/* Combines both si_update_rx() and si_update_tx() at once */
static inline void si_update(struct stream_interface *si)
{
si_update_rx(si);
si_update_tx(si);
}
/* Returns info about the conn_stream <cs>, if not NULL. It call the mux layer's /* Returns info about the conn_stream <cs>, if not NULL. It call the mux layer's
* get_cs_info() function, if it exists. On success, it returns a cs_info * get_cs_info() function, if it exists. On success, it returns a cs_info
* structure. Otherwise, on error, if the mux does not implement get_cs_info() * structure. Otherwise, on error, if the mux does not implement get_cs_info()

View File

@ -793,76 +793,93 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
} }
/* This function is designed to be called from within the stream handler to /* This function is designed to be called from within the stream handler to
* update the channels' expiration timers and the stream interface's flags * update the input channel's expiration timer and the stream interface's
* based on the channels' flags. It needs to be called only once after the * Rx flags based on the channel's flags. It needs to be called only once
* channels' flags have settled down, and before they are cleared, though it * after the channel's flags have settled down, and before they are cleared,
* doesn't harm to call it as often as desired (it just slightly hurts * though it doesn't harm to call it as often as desired (it just slightly
* performance). It must not be called from outside of the stream handler, * hurts performance). It must not be called from outside of the stream
* as what it does will be used to compute the stream task's expiration. * handler, as what it does will be used to compute the stream task's
* expiration.
*/ */
void si_update(struct stream_interface *si) void si_update_rx(struct stream_interface *si)
{ {
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
if (!(ic->flags & CF_SHUTR)) { if (ic->flags & CF_SHUTR) {
/* Read not closed, update FD status and timeout for reads */
if (ic->flags & CF_DONT_READ)
si_rx_chan_blk(si);
else
si_rx_chan_rdy(si);
if (!channel_is_empty(ic)) {
/* stop reading, imposed by channel's policy or contents */
si_rx_room_blk(si);
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si_rx_room_rdy(si);
}
if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
ic->rex = TICK_ETERNITY;
else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
si_chk_rcv(si);
}
else
si_rx_shut_blk(si); si_rx_shut_blk(si);
return;
}
if (!(oc->flags & CF_SHUTW)) { /* Read not closed, update FD status and timeout for reads */
/* Write not closed, update FD status and timeout for writes */ if (ic->flags & CF_DONT_READ)
if (channel_is_empty(oc)) { si_rx_chan_blk(si);
/* stop writing */ else
if (!(si->flags & SI_FL_WAIT_DATA)) { si_rx_chan_rdy(si);
if ((oc->flags & CF_SHUTW_NOW) == 0)
si->flags |= SI_FL_WAIT_DATA; if (!channel_is_empty(ic)) {
oc->wex = TICK_ETERNITY; /* stop reading, imposed by channel's policy or contents */
} si_rx_room_blk(si);
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si_rx_room_rdy(si);
}
if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
ic->rex = TICK_ETERNITY;
else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
si_chk_rcv(si);
}
/* This function is designed to be called from within the stream handler to
* update the output channel's expiration timer and the stream interface's
* Tx flags based on the channel's flags. It needs to be called only once
* after the channel's flags have settled down, and before they are cleared,
* though it doesn't harm to call it as often as desired (it just slightly
* hurts performance). It must not be called from outside of the stream
* handler, as what it does will be used to compute the stream task's
* expiration.
*/
void si_update_tx(struct stream_interface *si)
{
struct channel *oc = si_oc(si);
struct channel *ic = si_ic(si);
if (oc->flags & CF_SHUTW)
return;
/* Write not closed, update FD status and timeout for writes */
if (channel_is_empty(oc)) {
/* stop writing */
if (!(si->flags & SI_FL_WAIT_DATA)) {
if ((oc->flags & CF_SHUTW_NOW) == 0)
si->flags |= SI_FL_WAIT_DATA;
oc->wex = TICK_ETERNITY;
} }
else { return;
/* (re)start writing and update timeout. Note: we don't recompute the timeout }
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already /* (re)start writing and update timeout. Note: we don't recompute the timeout
* have updated it if there has been a completed I/O. * everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex)) {
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) {
/* Note: depending on the protocol, we don't know if we're waiting
* for incoming data or not. So in order to prevent the socket from
* expiring read timeouts during writes, we refresh the read timeout,
* except if it was already infinite or if we have explicitly setup
* independent streams.
*/ */
si->flags &= ~SI_FL_WAIT_DATA; ic->rex = tick_add_ifset(now_ms, ic->rto);
if (!tick_isset(oc->wex)) {
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) {
/* Note: depending on the protocol, we don't know if we're waiting
* for incoming data or not. So in order to prevent the socket from
* expiring read timeouts during writes, we refresh the read timeout,
* except if it was already infinite or if we have explicitly setup
* independent streams.
*/
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
} }
} }
} }