diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index f3cf5e0e9..4e400399b 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -357,11 +357,23 @@ static inline void si_shutw(struct stream_interface *si) si->ops->shutw(si); } -/* Updates the stream interface and timers, then updates the data layer below */ +/* Updates the stream interface and timers, to complete the work after the + * analysers, then clears the relevant channel flags, and the errors and + * expirations, then updates the data layer below. This will ensure that any + * synchronous update performed at the data layer will be reflected in the + * channel flags and/or stream-interface. + */ static inline void si_update(struct stream_interface *si) { - stream_int_update(si); - if (si->ops->update) + if (si->state == SI_ST_EST) + stream_int_update(si); + + si_ic(si)->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED); + si_oc(si)->flags &= ~(CF_WRITE_NULL|CF_WRITE_PARTIAL); + si->flags &= ~(SI_FL_ERR|SI_FL_EXP); + si->prev_state = si->state; + + if (si->ops->update && (si->state == SI_ST_CON || si->state == SI_ST_EST)) si->ops->update(si); } diff --git a/src/stream.c b/src/stream.c index 4e2db45af..f33713dc9 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1659,7 +1659,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) struct channel *req, *res; struct stream_interface *si_f, *si_b; struct conn_stream *cs; - int ret; activity[tid].stream++; @@ -2447,38 +2446,13 @@ redo: if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED)) stream_process_counters(s); - cs = objt_cs(si_f->end); - ret = 0; - if (cs && !(cs->conn->flags & CO_FL_ERROR) && - !(cs->flags & CS_FL_ERROR) && - !(si_oc(si_f)->flags & CF_SHUTW) && - !(si_f->wait_event.wait_reason & SUB_CAN_SEND) && - co_data(si_oc(si_f))) - ret = si_cs_send(cs); - cs = objt_cs(si_b->end); - if (cs && !(cs->conn->flags & CO_FL_ERROR) && - !(cs->flags & CS_FL_ERROR) && - !(si_oc(si_b)->flags & CF_SHUTW) && - !(si_b->wait_event.wait_reason & SUB_CAN_SEND) && - co_data(si_oc(si_b))) - ret |= si_cs_send(cs); + si_update(si_f); + si_update(si_b); - if (ret) + if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS || + (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)) goto redo; - if (si_f->state == SI_ST_EST) - si_update(si_f); - - if (si_b->state == SI_ST_EST) - si_update(si_b); - - req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED); - res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED); - si_f->prev_state = si_f->state; - si_b->prev_state = si_b->state; - si_f->flags &= ~(SI_FL_ERR|SI_FL_EXP); - si_b->flags &= ~(SI_FL_ERR|SI_FL_EXP); - /* Trick: if a request is being waiting for the server to respond, * and if we know the server can timeout, we don't want the timeout * to expire on the client side first, but we're still interested diff --git a/src/stream_interface.c b/src/stream_interface.c index 8574b4ca3..08814cf39 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -66,6 +66,7 @@ struct si_ops si_embedded_ops = { /* stream-interface operations for connections */ struct si_ops si_conn_ops = { + .update = stream_int_update_conn, .chk_rcv = stream_int_chk_rcv_conn, .chk_snd = stream_int_chk_snd_conn, .shutr = stream_int_shutr_conn, @@ -798,6 +799,33 @@ void stream_int_update(struct stream_interface *si) } } +/* Updates the active status of a connection outside of the connection handler + * based on the channel's flags and the stream interface's flags. It needs to + * be called once after the channels' flags have settled down and the stream + * has been updated. It is not designed to be called from within the connection + * handler itself. + */ +void stream_int_update_conn(struct stream_interface *si) +{ + struct channel *ic = si_ic(si); + struct channel *oc = si_oc(si); + struct conn_stream *cs = __objt_cs(si->end); + + if (!(ic->flags & CF_SHUTR)) { + /* Read not closed, it doesn't seem we have to do anything here */ + } + + if (!(oc->flags & CF_SHUTW)) { + /* Write not closed */ + if (!channel_is_empty(oc) && + !(cs->conn->flags & CO_FL_ERROR) && + !(cs->flags & CS_FL_ERROR) && + !(oc->flags & CF_SHUTW) && + !(si->wait_event.wait_reason & SUB_CAN_SEND)) + si_cs_send(cs); + } +} + /* * This function performs a shutdown-read on a stream interface attached to * a connection in a connected or init state (it does nothing for other