diff --git a/include/types/connection.h b/include/types/connection.h index 421df3c02..1fa0b7381 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -310,7 +310,6 @@ struct xprt_ops { */ struct mux_ops { int (*init)(struct connection *conn); /* early initialization */ - void (*recv)(struct connection *conn); /* mux-layer recv callback */ int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */ void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */ size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */ @@ -336,7 +335,6 @@ struct mux_ops { * data movement. It may abort a connection by returning < 0. */ struct data_cb { - void (*recv)(struct conn_stream *cs); /* data-layer recv callback */ int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */ int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */ char name[8]; /* data layer name, zero-terminated */ diff --git a/src/checks.c b/src/checks.c index b64d32e20..2e2145967 100644 --- a/src/checks.c +++ b/src/checks.c @@ -70,6 +70,7 @@ static char * tcpcheck_get_step_comment(struct check *, int); static int tcpcheck_main(struct check *); static void __event_srv_chk_w(struct conn_stream *cs); static int wake_srv_chk(struct conn_stream *cs); +static void __event_srv_chk_r(struct conn_stream *cs); static struct pool_head *pool_head_email_alert = NULL; static struct pool_head *pool_head_tcpcheck_rule = NULL; @@ -709,9 +710,15 @@ static void chk_report_conn_err(struct check *check, int errno_bck, int expired) static struct task *event_srv_chk_io(struct task *t, void *ctx, unsigned short state) { struct conn_stream *cs = ctx; + struct check *check = cs->data; if (!(cs->wait_list.wait_reason & SUB_CAN_SEND)) wake_srv_chk(cs); + if (!(cs->wait_list.wait_reason & SUB_CAN_RECV)) { + HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock); + __event_srv_chk_r(cs); + HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock); + } return NULL; } @@ -803,9 +810,11 @@ static void __event_srv_chk_w(struct conn_stream *cs) * etc. * * Please do NOT place any return statement in this function and only leave - * via the out_unlock label. + * via the out label. + * + * This must be called with the server lock held. */ -static void event_srv_chk_r(struct conn_stream *cs) +static void __event_srv_chk_r(struct conn_stream *cs) { struct connection *conn = cs->conn; struct check *check = cs->data; @@ -815,17 +824,17 @@ static void event_srv_chk_r(struct conn_stream *cs) int done; unsigned short msglen; - HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock); - if (unlikely(check->result == CHK_RES_FAILED)) goto out_wakeup; - if (conn->flags & CO_FL_HANDSHAKE) - goto out_unlock; + if (conn->flags & CO_FL_HANDSHAKE) { + cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list); + goto out; + } /* wake() will take care of calling tcpcheck_main() */ if (check->type == PR_O2_TCPCHK_CHK) - goto out_unlock; + goto out; /* Warning! Linux returns EAGAIN on SO_ERROR if data are still available * but the connection was closed on the remote end. Fortunately, recv still @@ -1372,13 +1381,13 @@ static void event_srv_chk_r(struct conn_stream *cs) conn->flags |= CO_FL_ERROR; task_wakeup(t, TASK_WOKEN_IO); - out_unlock: - HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock); +out: return; wait_more_data: __cs_want_recv(cs); - goto out_unlock; + cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list); + goto out; } /* @@ -1443,7 +1452,6 @@ static int wake_srv_chk(struct conn_stream *cs) } struct data_cb check_conn_cb = { - .recv = event_srv_chk_r, .wake = wake_srv_chk, .name = "CHCK", }; @@ -2172,8 +2180,10 @@ static struct task *process_chk_conn(struct task *t, void *context, unsigned sho t->expire = tick_first(t->expire, t_con); } - if (check->type) + if (check->type) { cs_want_recv(cs); /* prepare for reading a possible reply */ + __event_srv_chk_r(cs); + } task_set_affinity(t, tid_bit); goto reschedule; @@ -2928,8 +2938,10 @@ static int tcpcheck_main(struct check *check) goto out_end_tcpcheck; } } - else + else { + conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list); break; + } } /* mark the step as started */ @@ -3091,8 +3103,10 @@ static int tcpcheck_main(struct check *check) __cs_want_send(cs); if (&check->current_step->list != head && - check->current_step->action == TCPCHK_ACT_EXPECT) + check->current_step->action == TCPCHK_ACT_EXPECT) { __cs_want_recv(cs); + __event_srv_chk_r(cs); + } goto out; out_end_tcpcheck: diff --git a/src/connection.c b/src/connection.c index 005e0e741..ad0386352 100644 --- a/src/connection.c +++ b/src/connection.c @@ -64,7 +64,7 @@ void conn_fd_handler(int fd) { struct connection *conn = fdtab[fd].owner; unsigned int flags; - int can_send = 0; + int io_available = 0; if (unlikely(!conn)) { activity[tid].conn_dead++; @@ -128,7 +128,8 @@ void conn_fd_handler(int fd) * both of which will be detected below. */ flags = 0; - can_send = LIST_ISEMPTY(&conn->send_wait_list); + io_available = (LIST_ISEMPTY(&conn->send_wait_list) && + LIST_ISEMPTY(&conn->sendrecv_wait_list));; while (!LIST_ISEMPTY(&conn->send_wait_list)) { struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n, struct wait_list *, list); @@ -138,7 +139,7 @@ void conn_fd_handler(int fd) tasklet_wakeup(sw->task); } while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) { - struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n, + struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n, struct wait_list *, list); LIST_DEL(&sw->list); LIST_INIT(&sw->list); @@ -159,7 +160,26 @@ void conn_fd_handler(int fd) * both of which will be detected below. */ flags = 0; - conn->mux->recv(conn); + io_available |= (LIST_ISEMPTY(&conn->recv_wait_list) && + LIST_ISEMPTY(&conn->sendrecv_wait_list)); + while (!LIST_ISEMPTY(&conn->recv_wait_list)) { + struct wait_list *sw = LIST_ELEM(conn->recv_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + sw->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(sw->task); + } + while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) { + struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&conn->send_wait_list, &sw->list); + sw->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(sw->task); + } + } /* It may happen during the data phase that a handshake is @@ -206,7 +226,7 @@ void conn_fd_handler(int fd) * Note that the wake callback is allowed to release the connection and * the fd (and return < 0 in this case). */ - if ((can_send || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) || + if ((io_available || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) || ((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED && (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) && conn->mux->wake(conn) < 0) diff --git a/src/mux_h2.c b/src/mux_h2.c index 3c873e9ed..8922f5c17 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -121,8 +121,6 @@ struct h2c { struct list fctl_list; /* list of streams blocked by connection's fctl */ struct buffer_wait buf_wait; /* wait list for buffer allocations */ struct list send_wait_list; /* list of tasks to wake when we're ready to send */ - struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */ - struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */ struct wait_list wait_list; /* We're in a wait list, to send */ }; @@ -186,6 +184,7 @@ struct h2s { enum h2_err errcode; /* H2 err code (H2_ERR_*) */ enum h2_ss st; struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */ + struct wait_list *recv_wait_list; /* Somebody subscribed to be waken up on recv */ }; /* descriptor for an h2 frame header */ @@ -222,6 +221,7 @@ static const struct h2s *h2_idle_stream = &(const struct h2s){ static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state); static void h2_send(struct h2c *h2c); +static void h2_recv(struct h2c *h2c); static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short state); static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id); static int h2_frt_decode_headers(struct h2s *h2s); @@ -280,8 +280,10 @@ static int h2_buf_available(void *target) if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc_margin(&h2c->dbuf, 0)) { h2c->flags &= ~H2_CF_DEM_DALLOC; - if (h2_recv_allowed(h2c)) + if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); + h2_recv(h2c); + } return 1; } @@ -292,8 +294,10 @@ static int h2_buf_available(void *target) if (h2c->flags & H2_CF_DEM_MROOM) { h2c->flags &= ~H2_CF_DEM_MROOM; - if (h2_recv_allowed(h2c)) + if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); + h2_recv(h2c); + } } return 1; } @@ -302,8 +306,10 @@ static int h2_buf_available(void *target) (h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s->cs && b_alloc_margin(&h2s->rxbuf, 0)) { h2c->flags &= ~H2_CF_DEM_SALLOC; - if (h2_recv_allowed(h2c)) + if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); + h2_recv(h2c); + } return 1; } @@ -408,11 +414,10 @@ static int h2c_frt_init(struct connection *conn) task_queue(t); conn_xprt_want_recv(conn); LIST_INIT(&h2c->send_wait_list); - LIST_INIT(&h2c->recv_wait_list); - LIST_INIT(&h2c->sendrecv_wait_list); LIST_INIT(&h2c->wait_list.list); - /* mux->wake will be called soon to complete the operation */ + /* Try to read, if nothing is available yet we'll just subscribe */ + h2_recv(h2c); return 0; fail: if (t) @@ -2228,17 +2233,16 @@ static int h2_process_mux(struct h2c *h2c) } -/*********************************************************/ -/* functions below are I/O callbacks from the connection */ -/*********************************************************/ - -/* callback called on recv event by the connection handler */ -static void h2_recv(struct connection *conn) +/* Attempt to read data, and subscribe if none available */ +static void h2_recv(struct h2c *h2c) { - struct h2c *h2c = conn->mux_ctx; + struct connection *conn = h2c->conn; struct buffer *buf; int max; + if (h2c->wait_list.wait_reason & SUB_CAN_RECV) + return; + if (!h2_recv_allowed(h2c)) return; @@ -2253,6 +2257,7 @@ static void h2_recv(struct connection *conn) conn->xprt->rcv_buf(conn, buf, max, 0); if (!b_data(buf)) { + conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_list); h2_release_buf(h2c, &h2c->dbuf); return; } @@ -2337,17 +2342,6 @@ static void h2_send(struct h2c *h2c) sw->wait_reason &= ~SUB_CAN_SEND; tasklet_wakeup(sw->task); } - while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) { - struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - LIST_ADDQ(&h2c->recv_wait_list, &sw->list); - sw->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(sw->task); - } - - } /* We're done, no more to send */ if (!b_data(&h2c->mbuf)) @@ -2364,6 +2358,8 @@ static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short status) if (!(h2c->wait_list.wait_reason & SUB_CAN_SEND)) h2_send(h2c); + if (!(h2c->wait_list.wait_reason & SUB_CAN_RECV)) + h2_recv(h2c); return NULL; } @@ -2377,6 +2373,9 @@ static int h2_wake(struct connection *conn) struct session *sess = conn->owner; h2_send(h2c); + if (h2_recv_allowed(h2c)) + h2_recv(h2c); + if (b_data(&h2c->dbuf) && !(h2c->flags & H2_CF_DEM_BLOCK_ANY)) { h2_process_demux(h2c); @@ -2436,11 +2435,11 @@ static int h2_wake(struct connection *conn) h2_release_buf(h2c, &h2c->dbuf); /* stop being notified of incoming data if we can't process them */ - if (!h2_recv_allowed(h2c)) { + if (!h2_recv_allowed(h2c)) __conn_xprt_stop_recv(conn); - } else { __conn_xprt_want_recv(conn); + h2_recv(h2c); } /* adjust output polling */ @@ -2554,6 +2553,7 @@ static void h2_update_poll(struct conn_stream *cs) h2s->h2c->flags &= ~H2_CF_DEM_SFULL; if (h2s->h2c->dsi == h2s->id) { conn_xprt_want_recv(cs->conn); + h2_recv(h2s->h2c); conn_xprt_want_send(cs->conn); } } @@ -2605,6 +2605,7 @@ static void h2_detach(struct conn_stream *cs) h2c->flags &= ~H2_CF_DEM_TOOMANY; if (h2_recv_allowed(h2c)) { __conn_xprt_want_recv(h2c->conn); + h2_recv(h2c); conn_xprt_want_send(h2c->conn); } } @@ -2625,6 +2626,7 @@ static void h2_detach(struct conn_stream *cs) h2c->flags &= ~H2_CF_DEM_BLOCK_ANY; h2c->flags &= ~H2_CF_MUX_BLOCK_ANY; conn_xprt_want_recv(cs->conn); + h2_recv(h2c); conn_xprt_want_send(cs->conn); } @@ -3477,30 +3479,14 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) sw = param; if (!(sw->wait_reason & SUB_CAN_RECV)) { sw->wait_reason |= SUB_CAN_RECV; - /* If we're already subscribed for send(), move it - * to the send+recv list - */ - if (sw->wait_reason & SUB_CAN_SEND) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list); - } else - LIST_ADDQ(&h2c->recv_wait_list, &sw->list); + h2s->recv_wait_list = sw; } return 0; case SUB_CAN_SEND: sw = param; if (!(sw->wait_reason & SUB_CAN_SEND)) { sw->wait_reason |= SUB_CAN_SEND; - /* If we're already subscribed for recv(), move it - * to the send+recv list - */ - if (sw->wait_reason & SUB_CAN_RECV) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list); - } else - LIST_ADDQ(&h2c->send_wait_list, &sw->list); + LIST_ADDQ(&h2c->send_wait_list, &sw->list); } return 0; default: @@ -3710,7 +3696,6 @@ static int h2_parse_max_concurrent_streams(char **args, int section_type, struct /* The mux operations */ const struct mux_ops h2_ops = { .init = h2_init, - .recv = h2_recv, .wake = h2_wake, .update_poll = h2_update_poll, .snd_buf = h2_snd_buf, diff --git a/src/mux_pt.c b/src/mux_pt.c index 71f26e753..466ac21b9 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -85,21 +85,6 @@ static void mux_pt_update_poll(struct conn_stream *cs) conn_cond_update_xprt_polling(conn); } -/* callback to be used by default for the pass-through mux. It simply calls the - * data layer recv() callback much must be set. - */ -static void mux_pt_recv(struct connection *conn) -{ - struct conn_stream *cs = conn->mux_ctx; - - if (conn->flags & CO_FL_ERROR) - cs->flags |= CS_FL_ERROR; - if (conn_xprt_read0_pending(conn)) - cs->flags |= CS_FL_EOS; - cs->data_cb->recv(cs); - cs_update_mux_polling(cs); -} - /* * Attach a new stream to a connection * (Used for outgoing connections) @@ -200,7 +185,6 @@ static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe) /* The mux operations */ const struct mux_ops mux_pt_ops = { .init = mux_pt_init, - .recv = mux_pt_recv, .wake = mux_pt_wake, .update_poll = mux_pt_update_poll, .rcv_buf = mux_pt_rcv_buf, diff --git a/src/stream_interface.c b/src/stream_interface.c index cfa613a3c..46e57d453 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -51,10 +51,9 @@ static void stream_int_shutr_applet(struct stream_interface *si); static void stream_int_shutw_applet(struct stream_interface *si); static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si); -static void si_cs_recv_cb(struct conn_stream *cs); +static void si_cs_recv(struct conn_stream *cs); static int si_cs_wake_cb(struct conn_stream *cs); static int si_idle_conn_wake_cb(struct conn_stream *cs); -static void si_idle_conn_null_cb(struct conn_stream *cs); static struct task * si_cs_send(struct conn_stream *cs); /* stream-interface operations for embedded tasks */ @@ -84,13 +83,11 @@ struct si_ops si_applet_ops = { }; struct data_cb si_conn_cb = { - .recv = si_cs_recv_cb, .wake = si_cs_wake_cb, .name = "STRM", }; struct data_cb si_idle_conn_cb = { - .recv = si_idle_conn_null_cb, .wake = si_idle_conn_wake_cb, .name = "IDLE", }; @@ -417,15 +414,6 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) } -/* Tiny I/O callback called on recv/send I/O events on idle connections. - * It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb() - * is notified and can kill the connection. - */ -static void si_idle_conn_null_cb(struct conn_stream *cs) -{ - conn_sock_drain(cs->conn); -} - /* Callback to be used by connection I/O handlers when some activity is detected * on an idle server connection. Its main purpose is to kill the connection once * a close was detected on it. It returns 0 if it did nothing serious, or -1 if @@ -439,6 +427,8 @@ static int si_idle_conn_wake_cb(struct conn_stream *cs) if (!conn_ctrl_ready(conn)) return 0; + conn_sock_drain(conn); + if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) { /* warning, we can't do anything on after this call ! */ si_release_endpoint(si); @@ -582,8 +572,8 @@ static int si_cs_wake_cb(struct conn_stream *cs) * for recv() (received only an empty response). */ if (!(cs->flags & CS_FL_EOS) && - (cs->flags & (CS_FL_DATA_RD_ENA|CS_FL_REOS|CS_FL_RCV_MORE)) > CS_FL_DATA_RD_ENA) - si_cs_recv_cb(cs); + (cs->flags & (CS_FL_DATA_RD_ENA))) + si_cs_recv(cs); /* If we have data to send, try it now */ if (!channel_is_empty(oc) && objt_cs(si->end)) @@ -753,7 +743,7 @@ wake_others: tasklet_wakeup(sw->task); } while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) { - struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n, + struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n, struct wait_list *, list); LIST_DEL(&sw->list); LIST_INIT(&sw->list); @@ -1148,7 +1138,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) * into the buffer from the connection. It iterates over the mux layer's * rcv_buf function. */ -static void si_cs_recv_cb(struct conn_stream *cs) +static void si_cs_recv(struct conn_stream *cs) { struct connection *conn = cs->conn; struct stream_interface *si = cs->data; @@ -1364,6 +1354,26 @@ static void si_cs_recv_cb(struct conn_stream *cs) } ic->last_read = now_ms; } + if (cur_read > 0) { + while (!LIST_ISEMPTY(&cs->recv_wait_list)) { + struct wait_list *sw = LIST_ELEM(cs->recv_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + sw->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(sw->task); + } + while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) { + struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&cs->send_wait_list, &sw->list); + sw->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(sw->task); + } + + } end_recv: if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)