From ade6478a8cf0e522c1c2be9244d816c8b81ec21d Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Sat, 17 Nov 2018 19:51:07 +0100 Subject: [PATCH] MINOR: stream: move the conn_stream specific calls to the stream-int There are still some unwelcome synchronous calls to si_cs_recv() in process_stream(). Let's have a new function si_sync_recv() to perform a synchronous receive call on a stream interface regardless of the type of its endpoint, and move these calls there. For now it only implements conn_streams since it doesn't seem useful to support applets there. The function implements an extra check for the stream interface to be in an established state before attempting anything. --- include/proto/stream_interface.h | 27 +++++++++++++++++++++++++++ src/stream.c | 11 ++--------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 7fc6d7e21..79cba8562 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -381,6 +381,33 @@ static inline void si_chk_rcv(struct stream_interface *si) si->ops->chk_rcv(si); } +/* This tries to perform a synchronous receive on the stream interface to + * try to collect last arrived data. In practice it's only implemented on + * conn_streams. Returns 0 if nothing was done, non-zero if new data or a + * shutdown were collected. This may result on some delayed receive calls + * to be programmed and performed later, though it doesn't provide any + * such guarantee. + */ +static inline int si_sync_recv(struct stream_interface *si) +{ + struct conn_stream *cs; + + if (si->state != SI_ST_EST) + return 0; + + cs = objt_cs(si->end); + if (!cs) + return 0; // only conn_streams are supported + + if (si->wait_event.wait_reason & SUB_CAN_RECV) + return 0; // already subscribed + + if (si->flags & SI_FL_WAIT_ROOM && c_size(si_ic(si))) + return 0; // already failed + + return si_cs_recv(cs); +} + /* Calls chk_snd on the connection using the data layer */ static inline void si_chk_snd(struct stream_interface *si) { diff --git a/src/stream.c b/src/stream.c index 6e73f8c71..e9270bdac 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1685,7 +1685,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) unsigned int req_ana_back; struct channel *req, *res; struct stream_interface *si_f, *si_b; - struct conn_stream *cs; activity[tid].stream++; @@ -1696,15 +1695,9 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_b = &s->si[1]; /* First, attempt to receive pending data from I/O layers */ - cs = objt_cs(si_f->end); - if (cs && !(si_f->wait_event.wait_reason & SUB_CAN_RECV) && - (!(si_f->flags & SI_FL_WAIT_ROOM) || !c_size(req))) - si_cs_recv(cs); + si_sync_recv(si_f); + si_sync_recv(si_b); - cs = objt_cs(si_b->end); - if (cs && !(si_b->wait_event.wait_reason & SUB_CAN_RECV) && - (!(si_b->flags & SI_FL_WAIT_ROOM) || !c_size(res))) - si_cs_recv(cs); redo: //DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,