diff --git a/src/stream.c b/src/stream.c index b4acc3f75..8e115e595 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2476,7 +2476,8 @@ redo: si_update_both(si_f, si_b); - if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS || + if (si_f->state == SI_ST_DIS || si_f->state != si_f->prev_state || + si_b->state == SI_ST_DIS || si_b->state != si_b->prev_state || (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)) goto redo; diff --git a/src/stream_interface.c b/src/stream_interface.c index 8d223fabf..4788a20ab 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -817,13 +817,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b { struct channel *req = si_ic(si_f); struct channel *res = si_oc(si_f); - - /* let's recompute both sides states */ - if (si_f->state == SI_ST_EST) - stream_int_update(si_f); - - if (si_b->state == SI_ST_EST) - stream_int_update(si_b); + struct conn_stream *cs; req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); @@ -834,11 +828,60 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b si_f->prev_state = si_f->state; si_b->prev_state = si_b->state; - if (si_f->ops->update && si_f->state == SI_ST_EST) - si_f->ops->update(si_f); + /* front stream-int */ + cs = objt_cs(si_f->end); + if (cs && + si_f->state == SI_ST_EST && + !(res->flags & CF_SHUTW) && /* Write not closed */ + !channel_is_empty(res) && + !(cs->flags & CS_FL_ERROR) && + !(cs->conn->flags & CO_FL_ERROR)) { + if (si_cs_send(cs)) + si_b->flags &= ~SI_FL_WAIT_ROOM; + } - if (si_b->ops->update && (si_b->state == SI_ST_EST || si_b->state == SI_ST_CON)) - si_b->ops->update(si_b); + /* back stream-int */ + cs = objt_cs(si_b->end); + if (cs && + (si_b->state == SI_ST_EST || si_b->state == SI_ST_CON) && + !(req->flags & CF_SHUTW) && /* Write not closed */ + !channel_is_empty(req) && + !(cs->flags & CS_FL_ERROR) && + !(cs->conn->flags & CO_FL_ERROR)) { + if (si_cs_send(cs)) + si_f->flags &= ~SI_FL_WAIT_ROOM; + } + + /* it's time to try to receive */ + if (!(req->flags & (CF_SHUTR|CF_DONT_READ))) + si_want_put(si_f); + + si_chk_rcv(si_f); + + if (!(res->flags & (CF_SHUTR|CF_DONT_READ))) + si_want_put(si_b); + + si_chk_rcv(si_b); + + /* let's recompute both sides states */ + if (si_f->state == SI_ST_EST) + stream_int_update(si_f); + + if (si_b->state == SI_ST_EST) + stream_int_update(si_b); + + /* stream ints are processed outside of process_stream() and must be + * handled at the latest moment. + */ + if (obj_type(si_f->end) == OBJ_TYPE_APPCTX && + (((si_f->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) || + ((si_f->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))) + appctx_wakeup(si_appctx(si_f)); + + if (obj_type(si_b->end) == OBJ_TYPE_APPCTX && + (((si_b->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) || + ((si_b->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))) + appctx_wakeup(si_appctx(si_b)); } /* Updates the active status of a connection outside of the connection handler