mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 22:31:28 +02:00
MINOR: stream: move the conn_stream specific calls to the stream-int
There are still some unwelcome synchronous calls to si_cs_recv() in process_stream(). Let's have a new function si_sync_recv() to perform a synchronous receive call on a stream interface regardless of the type of its endpoint, and move these calls there. For now it only implements conn_streams since it doesn't seem useful to support applets there. The function implements an extra check for the stream interface to be in an established state before attempting anything.
This commit is contained in:
parent
00b3b8c361
commit
ade6478a8c
@ -381,6 +381,33 @@ static inline void si_chk_rcv(struct stream_interface *si)
|
|||||||
si->ops->chk_rcv(si);
|
si->ops->chk_rcv(si);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This tries to perform a synchronous receive on the stream interface to
|
||||||
|
* try to collect last arrived data. In practice it's only implemented on
|
||||||
|
* conn_streams. Returns 0 if nothing was done, non-zero if new data or a
|
||||||
|
* shutdown were collected. This may result on some delayed receive calls
|
||||||
|
* to be programmed and performed later, though it doesn't provide any
|
||||||
|
* such guarantee.
|
||||||
|
*/
|
||||||
|
static inline int si_sync_recv(struct stream_interface *si)
|
||||||
|
{
|
||||||
|
struct conn_stream *cs;
|
||||||
|
|
||||||
|
if (si->state != SI_ST_EST)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
cs = objt_cs(si->end);
|
||||||
|
if (!cs)
|
||||||
|
return 0; // only conn_streams are supported
|
||||||
|
|
||||||
|
if (si->wait_event.wait_reason & SUB_CAN_RECV)
|
||||||
|
return 0; // already subscribed
|
||||||
|
|
||||||
|
if (si->flags & SI_FL_WAIT_ROOM && c_size(si_ic(si)))
|
||||||
|
return 0; // already failed
|
||||||
|
|
||||||
|
return si_cs_recv(cs);
|
||||||
|
}
|
||||||
|
|
||||||
/* Calls chk_snd on the connection using the data layer */
|
/* Calls chk_snd on the connection using the data layer */
|
||||||
static inline void si_chk_snd(struct stream_interface *si)
|
static inline void si_chk_snd(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
|
11
src/stream.c
11
src/stream.c
@ -1685,7 +1685,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
unsigned int req_ana_back;
|
unsigned int req_ana_back;
|
||||||
struct channel *req, *res;
|
struct channel *req, *res;
|
||||||
struct stream_interface *si_f, *si_b;
|
struct stream_interface *si_f, *si_b;
|
||||||
struct conn_stream *cs;
|
|
||||||
|
|
||||||
activity[tid].stream++;
|
activity[tid].stream++;
|
||||||
|
|
||||||
@ -1696,15 +1695,9 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
si_b = &s->si[1];
|
si_b = &s->si[1];
|
||||||
|
|
||||||
/* First, attempt to receive pending data from I/O layers */
|
/* First, attempt to receive pending data from I/O layers */
|
||||||
cs = objt_cs(si_f->end);
|
si_sync_recv(si_f);
|
||||||
if (cs && !(si_f->wait_event.wait_reason & SUB_CAN_RECV) &&
|
si_sync_recv(si_b);
|
||||||
(!(si_f->flags & SI_FL_WAIT_ROOM) || !c_size(req)))
|
|
||||||
si_cs_recv(cs);
|
|
||||||
|
|
||||||
cs = objt_cs(si_b->end);
|
|
||||||
if (cs && !(si_b->wait_event.wait_reason & SUB_CAN_RECV) &&
|
|
||||||
(!(si_b->flags & SI_FL_WAIT_ROOM) || !c_size(res)))
|
|
||||||
si_cs_recv(cs);
|
|
||||||
redo:
|
redo:
|
||||||
|
|
||||||
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
|
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user