diff --git a/src/stream_interface.c b/src/stream_interface.c index 29a76e44f..46fc9ebf1 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -246,7 +246,7 @@ static void stream_int_chk_rcv(struct stream_interface *si) __FUNCTION__, si, si->state, ic->flags, si_oc(si)->flags); - if (!channel_may_recv(ic) || ic->pipe) { + if (ic->pipe) { /* stop reading */ si->flags |= SI_FL_WAIT_ROOM; } @@ -460,7 +460,7 @@ void stream_int_notify(struct stream_interface *si) /* indicate that we may be waiting for data from the output channel or * we're about to close and can't expect more data if SHUTW_NOW is there. */ - if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc)) + if (!(oc->flags & (CF_SHUTW|CF_SHUTW_NOW))) si->flags |= SI_FL_WAIT_DATA; else if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) si->flags &= ~SI_FL_WAIT_DATA; @@ -477,8 +477,7 @@ void stream_int_notify(struct stream_interface *si) ic->rex = tick_add_ifset(now_ms, ic->rto); if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && - channel_may_recv(oc) && - (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) { + (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) { si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM; si_chk_rcv(si_opposite(si)); } @@ -513,7 +512,7 @@ void stream_int_notify(struct stream_interface *si) /* check if the consumer has freed some space either in the * buffer or in the pipe. */ - if (channel_may_recv(ic) && new_len < last_len) { + if (new_len < last_len) { si->flags &= ~SI_FL_WAIT_ROOM; si_chk_rcv(si); } @@ -522,8 +521,7 @@ void stream_int_notify(struct stream_interface *si) if (si->flags & SI_FL_WAIT_ROOM) { ic->rex = TICK_ETERNITY; } - else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && - channel_may_recv(ic)) { + else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL) { /* we must re-enable reading if si_chk_snd() has freed some space */ if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex)) ic->rex = tick_add_ifset(now_ms, ic->rto); @@ -751,15 +749,12 @@ void stream_int_update(struct stream_interface *si) si_want_put(si); /* Read not closed, update FD status and timeout for reads */ - if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { + if ((ic->flags & CF_DONT_READ) || co_data(ic)) { /* stop reading */ - if (!(si->flags & SI_FL_WAIT_ROOM)) { - if (!(ic->flags & CF_DONT_READ)) /* full */ - si_cant_put(si); - ic->rex = TICK_ETERNITY; - } + si_stop_put(si); + ic->rex = TICK_ETERNITY; } - else if (!(si->flags & SI_FL_WAIT_ROOM) || !co_data(ic)) { + else { /* (re)start reading and update timeout. Note: we don't recompute the timeout * everytime we get here, otherwise it would risk never to expire. We only * update it if is was not yet set. The stream socket handler will already @@ -1011,16 +1006,8 @@ static void stream_int_shutw_conn(struct stream_interface *si) */ static void stream_int_chk_rcv_conn(struct stream_interface *si) { - struct channel *ic = si_ic(si); - - if (!channel_may_recv(ic)) { - /* stop reading */ - si->flags |= SI_FL_WAIT_ROOM; - } - else { - /* (re)start reading */ - tasklet_wakeup(si->wait_event.task); - } + /* (re)start reading */ + tasklet_wakeup(si->wait_event.task); } @@ -1269,11 +1256,6 @@ int si_cs_recv(struct conn_stream *cs) ic->flags |= CF_READ_PARTIAL; ic->total += ret; - if (!channel_may_recv(ic)) { - si_cant_put(si); - break; - } - if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) break; @@ -1295,6 +1277,12 @@ int si_cs_recv(struct conn_stream *cs) if (ret >= global.tune.recv_enough) break; } + + /* if we are waiting for more space, don't try to read more data + * right now. + */ + if (si->flags & SI_FL_WAIT_ROOM) + break; } /* while !flags */ if (cur_read) { @@ -1537,7 +1525,7 @@ static void stream_int_chk_rcv_applet(struct stream_interface *si) __FUNCTION__, si, si->state, ic->flags, si_oc(si)->flags); - if (channel_may_recv(ic) && !ic->pipe) { + if (!ic->pipe) { /* (re)start reading */ appctx_wakeup(si_appctx(si)); }