diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 7fc6d7e21..79cba8562 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -381,6 +381,33 @@ static inline void si_chk_rcv(struct stream_interface *si) si->ops->chk_rcv(si); } +/* This tries to perform a synchronous receive on the stream interface to + * try to collect last arrived data. In practice it's only implemented on + * conn_streams. Returns 0 if nothing was done, non-zero if new data or a + * shutdown were collected. This may result on some delayed receive calls + * to be programmed and performed later, though it doesn't provide any + * such guarantee. + */ +static inline int si_sync_recv(struct stream_interface *si) +{ + struct conn_stream *cs; + + if (si->state != SI_ST_EST) + return 0; + + cs = objt_cs(si->end); + if (!cs) + return 0; // only conn_streams are supported + + if (si->wait_event.wait_reason & SUB_CAN_RECV) + return 0; // already subscribed + + if (si->flags & SI_FL_WAIT_ROOM && c_size(si_ic(si))) + return 0; // already failed + + return si_cs_recv(cs); +} + /* Calls chk_snd on the connection using the data layer */ static inline void si_chk_snd(struct stream_interface *si) { diff --git a/src/stream.c b/src/stream.c index 6e73f8c71..e9270bdac 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1685,7 +1685,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) unsigned int req_ana_back; struct channel *req, *res; struct stream_interface *si_f, *si_b; - struct conn_stream *cs; activity[tid].stream++; @@ -1696,15 +1695,9 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_b = &s->si[1]; /* First, attempt to receive pending data from I/O layers */ - cs = objt_cs(si_f->end); - if (cs && !(si_f->wait_event.wait_reason & SUB_CAN_RECV) && - (!(si_f->flags & SI_FL_WAIT_ROOM) || !c_size(req))) - si_cs_recv(cs); + si_sync_recv(si_f); + si_sync_recv(si_b); - cs = objt_cs(si_b->end); - if (cs && !(si_b->wait_event.wait_reason & SUB_CAN_RECV) && - (!(si_b->flags & SI_FL_WAIT_ROOM) || !c_size(res))) - si_cs_recv(cs); redo: //DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,