mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 14:21:25 +02:00
BUG/MEDIUM: process_stream: Don't use si_cs_io_cb() in process_stream().
Instead of using si_cs_io_cb() in process_stream() use si_cs_send/si_cs_recv instead, as si_cs_io_cb() may lead to process_stream being woken up when it shouldn't be, and thus timeout would never get triggered.
This commit is contained in:
parent
9cf4634a2b
commit
0e367bbb01
@ -53,6 +53,8 @@ void stream_int_update(struct stream_interface *si);
|
|||||||
void stream_int_update_conn(struct stream_interface *si);
|
void stream_int_update_conn(struct stream_interface *si);
|
||||||
void stream_int_update_applet(struct stream_interface *si);
|
void stream_int_update_applet(struct stream_interface *si);
|
||||||
void stream_int_notify(struct stream_interface *si);
|
void stream_int_notify(struct stream_interface *si);
|
||||||
|
int si_cs_recv(struct conn_stream *cs);
|
||||||
|
int si_cs_send(struct conn_stream *cs);
|
||||||
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state);
|
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state);
|
||||||
|
|
||||||
/* returns the channel which receives data from this stream interface (input channel) */
|
/* returns the channel which receives data from this stream interface (input channel) */
|
||||||
|
32
src/stream.c
32
src/stream.c
@ -1646,6 +1646,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
unsigned int req_ana_back;
|
unsigned int req_ana_back;
|
||||||
struct channel *req, *res;
|
struct channel *req, *res;
|
||||||
struct stream_interface *si_f, *si_b;
|
struct stream_interface *si_f, *si_b;
|
||||||
|
struct conn_stream *cs;
|
||||||
|
int ret;
|
||||||
|
|
||||||
activity[tid].stream++;
|
activity[tid].stream++;
|
||||||
|
|
||||||
@ -1656,8 +1658,16 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
si_b = &s->si[1];
|
si_b = &s->si[1];
|
||||||
|
|
||||||
/* First, attempd to do I/Os */
|
/* First, attempd to do I/Os */
|
||||||
si_cs_io_cb(NULL, si_f, 0);
|
cs = objt_cs(si_f->end);
|
||||||
si_cs_io_cb(NULL, si_b, 0);
|
if (cs) {
|
||||||
|
si_cs_send(cs);
|
||||||
|
si_cs_recv(cs);
|
||||||
|
}
|
||||||
|
cs = objt_cs(si_b->end);
|
||||||
|
if (cs) {
|
||||||
|
si_cs_send(cs);
|
||||||
|
si_cs_recv(cs);
|
||||||
|
}
|
||||||
|
|
||||||
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
|
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
|
||||||
// si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
|
// si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
|
||||||
@ -2489,8 +2499,22 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
|
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
|
||||||
stream_release_buffers(s);
|
stream_release_buffers(s);
|
||||||
/* We may have free'd some space in buffers, or have more to send/recv, try again */
|
/* We may have free'd some space in buffers, or have more to send/recv, try again */
|
||||||
si_cs_io_cb(NULL, si_f, 0);
|
cs = objt_cs(si_f->end);
|
||||||
si_cs_io_cb(NULL, si_b, 0);
|
ret = 0;
|
||||||
|
if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
|
||||||
|
ret |= si_cs_send(cs);
|
||||||
|
si_cs_recv(cs);
|
||||||
|
ret |= (ci_data(si_ic(si_f)) != 0 ) | (cs->conn->flags & CO_FL_ERROR);
|
||||||
|
}
|
||||||
|
cs = objt_cs(si_b->end);
|
||||||
|
if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
|
||||||
|
ret |= si_cs_send(cs);
|
||||||
|
si_cs_recv(cs);
|
||||||
|
ret |= (ci_data(si_ic(si_b)) != 0 ) | (cs->conn->flags & CO_FL_ERROR);
|
||||||
|
|
||||||
|
}
|
||||||
|
if (ret)
|
||||||
|
task_wakeup(t, TASK_WOKEN_IO);
|
||||||
return t; /* nothing more to do */
|
return t; /* nothing more to do */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,10 +51,10 @@ static void stream_int_shutr_applet(struct stream_interface *si);
|
|||||||
static void stream_int_shutw_applet(struct stream_interface *si);
|
static void stream_int_shutw_applet(struct stream_interface *si);
|
||||||
static void stream_int_chk_rcv_applet(struct stream_interface *si);
|
static void stream_int_chk_rcv_applet(struct stream_interface *si);
|
||||||
static void stream_int_chk_snd_applet(struct stream_interface *si);
|
static void stream_int_chk_snd_applet(struct stream_interface *si);
|
||||||
static int si_cs_recv(struct conn_stream *cs);
|
int si_cs_recv(struct conn_stream *cs);
|
||||||
static int si_cs_process(struct conn_stream *cs);
|
static int si_cs_process(struct conn_stream *cs);
|
||||||
static int si_idle_conn_wake_cb(struct conn_stream *cs);
|
static int si_idle_conn_wake_cb(struct conn_stream *cs);
|
||||||
static int si_cs_send(struct conn_stream *cs);
|
int si_cs_send(struct conn_stream *cs);
|
||||||
|
|
||||||
/* stream-interface operations for embedded tasks */
|
/* stream-interface operations for embedded tasks */
|
||||||
struct si_ops si_embedded_ops = {
|
struct si_ops si_embedded_ops = {
|
||||||
@ -622,7 +622,7 @@ static int si_cs_process(struct conn_stream *cs)
|
|||||||
* caller to commit polling changes. The caller should check conn->flags
|
* caller to commit polling changes. The caller should check conn->flags
|
||||||
* for errors.
|
* for errors.
|
||||||
*/
|
*/
|
||||||
static int si_cs_send(struct conn_stream *cs)
|
int si_cs_send(struct conn_stream *cs)
|
||||||
{
|
{
|
||||||
struct connection *conn = cs->conn;
|
struct connection *conn = cs->conn;
|
||||||
struct stream_interface *si = cs->data;
|
struct stream_interface *si = cs->data;
|
||||||
@ -1114,7 +1114,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
|||||||
* into the buffer from the connection. It iterates over the mux layer's
|
* into the buffer from the connection. It iterates over the mux layer's
|
||||||
* rcv_buf function.
|
* rcv_buf function.
|
||||||
*/
|
*/
|
||||||
static int si_cs_recv(struct conn_stream *cs)
|
int si_cs_recv(struct conn_stream *cs)
|
||||||
{
|
{
|
||||||
struct connection *conn = cs->conn;
|
struct connection *conn = cs->conn;
|
||||||
struct stream_interface *si = cs->data;
|
struct stream_interface *si = cs->data;
|
||||||
@ -1128,6 +1128,8 @@ static int si_cs_recv(struct conn_stream *cs)
|
|||||||
* happens when we send too large a request to a backend server
|
* happens when we send too large a request to a backend server
|
||||||
* which rejects it before reading it all.
|
* which rejects it before reading it all.
|
||||||
*/
|
*/
|
||||||
|
if (!conn_xprt_ready(conn))
|
||||||
|
return 0;
|
||||||
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
|
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
|
||||||
return 1; // We want to make sure si_cs_wake() is called, so that process_strema is woken up, on failure
|
return 1; // We want to make sure si_cs_wake() is called, so that process_strema is woken up, on failure
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user