diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index c0f7cdae3..2d3fe5651 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -315,9 +315,8 @@ static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struc } /* Try to allocate a buffer for the stream-int's input channel. It relies on - * channel_alloc_buffer() for this so it abides by its rules. It returns 0 in - * case of failure, non-zero otherwise. The stream-int's flag SI_FL_WAIT_ROOM - * is cleared before trying. If no buffer are available, the requester, + * channel_alloc_buffer() for this so it abides by its rules. It returns 0 on + * failure, non-zero otherwise. If no buffer is available, the requester, * represented by pointer, will be added in the list of objects waiting * for an available buffer, and SI_FL_WAIT_ROOM will be set on the stream-int. * The requester will be responsible for calling this function to try again @@ -327,7 +326,6 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait { int ret; - si->flags &= ~SI_FL_WAIT_ROOM; ret = channel_alloc_buffer(si_ic(si), wait); if (!ret) si_cant_put(si); diff --git a/src/stream_interface.c b/src/stream_interface.c index 3372fc2a6..b29cb2f75 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -480,8 +480,10 @@ void stream_int_notify(struct stream_interface *si) if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && channel_may_recv(oc) && - (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) + (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) { + si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM; si_chk_rcv(si_opposite(si)); + } } /* Notify the other side when we've injected data into the IC that @@ -513,8 +515,10 @@ void stream_int_notify(struct stream_interface *si) /* check if the consumer has freed some space either in the * buffer or in the pipe. */ - if (channel_may_recv(ic) && new_len < last_len) + if (channel_may_recv(ic) && new_len < last_len) { si->flags &= ~SI_FL_WAIT_ROOM; + si_chk_rcv(si); + } } if (si->flags & SI_FL_WAIT_ROOM) { @@ -561,7 +565,6 @@ static int si_cs_process(struct conn_stream *cs) struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); - int wait_room = si->flags & SI_FL_WAIT_ROOM; /* If we have data to send, try it now */ if (!channel_is_empty(oc) && !(si->wait_event.wait_reason & SUB_CAN_SEND)) @@ -597,10 +600,6 @@ static int si_cs_process(struct conn_stream *cs) stream_int_notify(si); channel_release_buffer(ic, &(si_strm(si)->buffer_wait)); - /* Try to run again if we free'd some room in the process */ - if (wait_room && !(si->flags & SI_FL_WAIT_ROOM)) - tasklet_wakeup(si->wait_event.task); - return 0; } @@ -769,7 +768,7 @@ void stream_int_update(struct stream_interface *si) * have updated it if there has been a completed I/O. */ si->flags &= ~SI_FL_WAIT_ROOM; - tasklet_wakeup(si->wait_event.task); + si_chk_rcv(si); if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex)) ic->rex = tick_add_ifset(now_ms, ic->rto); }