MEDIUM: connection: merge the send_wait and recv_wait entries

In practice all callers use the same wait_event notification for any I/O
so instead of keeping specific code to handle them separately, let's merge
them and it will allow us to create new events later.
This commit is contained in:
Willy Tarreau 2020-01-10 07:06:05 +01:00
parent 062df2c23a
commit 7872d1fc15
4 changed files with 49 additions and 55 deletions

View File

@ -403,8 +403,7 @@ static inline void conn_init(struct connection *conn)
conn->proxy_netns = NULL;
LIST_INIT(&conn->list);
LIST_INIT(&conn->session_list);
conn->send_wait = NULL;
conn->recv_wait = NULL;
conn->subs = NULL;
conn->idle_time = 0;
conn->src = NULL;
conn->dst = NULL;
@ -530,15 +529,10 @@ static inline const struct conn_stream *cs_get_first(const struct connection *co
static inline void conn_force_unsubscribe(struct connection *conn)
{
if (conn->recv_wait) {
conn->recv_wait->events &= ~SUB_RETRY_RECV;
conn->recv_wait = NULL;
}
if (conn->send_wait) {
conn->send_wait->events &= ~SUB_RETRY_SEND;
conn->send_wait = NULL;
}
if (!conn->subs)
return;
conn->subs->events = 0;
conn->subs = NULL;
}
/* Releases a connection previously allocated by conn_new() */

View File

@ -64,6 +64,11 @@ enum sub_event_type {
SUB_RETRY_SEND = 0x00000002, /* Schedule the tasklet when we can attempt to send again */
};
/* Describes a set of subscriptions. Multiple events may be registered at the
* same time. The callee should assume everything not pending for completion is
* implicitly possible. It's illegal to change the tasklet if events are still
* registered.
*/
struct wait_event {
struct tasklet *tasklet;
int events; /* set of enum sub_event_type above */
@ -452,8 +457,7 @@ struct connection {
enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */
/* second cache line */
struct wait_event *send_wait; /* Task to wake when we're ready to send */
struct wait_event *recv_wait; /* Task to wake when we're ready to recv */
struct wait_event *subs; /* Task to wake when awaited events are ready */
struct list list; /* attach point to various connection lists (idle, ...) */
struct list session_list; /* List of attached connections to a session */
union conn_handle handle; /* connection handle at the socket layer */

View File

@ -76,10 +76,11 @@ void conn_fd_handler(int fd)
* both of which will be detected below.
*/
flags = 0;
if (conn->send_wait != NULL) {
conn->send_wait->events &= ~SUB_RETRY_SEND;
tasklet_wakeup(conn->send_wait->tasklet);
conn->send_wait = NULL;
if (conn->subs && conn->subs->events & SUB_RETRY_SEND) {
tasklet_wakeup(conn->subs->tasklet);
conn->subs->events &= ~SUB_RETRY_SEND;
if (!conn->subs->events)
conn->subs = NULL;
} else
io_available = 1;
__conn_xprt_stop_send(conn);
@ -95,10 +96,11 @@ void conn_fd_handler(int fd)
* both of which will be detected below.
*/
flags = 0;
if (conn->recv_wait) {
conn->recv_wait->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(conn->recv_wait->tasklet);
conn->recv_wait = NULL;
if (conn->subs && conn->subs->events & SUB_RETRY_RECV) {
tasklet_wakeup(conn->subs->tasklet);
conn->subs->events &= ~SUB_RETRY_RECV;
if (!conn->subs->events)
conn->subs = NULL;
} else
io_available = 1;
__conn_xprt_stop_recv(conn);
@ -322,48 +324,42 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags)
int conn_unsubscribe(struct connection *conn, void *xprt_ctx, int event_type, void *param)
{
struct wait_event *sw;
struct wait_event *sw = param;
if (event_type & SUB_RETRY_RECV) {
sw = param;
BUG_ON(conn->recv_wait != sw);
conn->recv_wait = NULL;
sw->events &= ~SUB_RETRY_RECV;
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
BUG_ON(conn->subs && conn->subs != sw);
sw->events &= ~event_type;
if (!sw->events)
conn->subs = NULL;
if (event_type & SUB_RETRY_RECV)
__conn_xprt_stop_recv(conn);
}
if (event_type & SUB_RETRY_SEND) {
sw = param;
BUG_ON(conn->send_wait != sw);
conn->send_wait = NULL;
sw->events &= ~SUB_RETRY_SEND;
if (event_type & SUB_RETRY_SEND)
__conn_xprt_stop_send(conn);
}
conn_update_xprt_polling(conn);
return 0;
}
int conn_subscribe(struct connection *conn, void *xprt_ctx, int event_type, void *param)
{
struct wait_event *sw;
struct wait_event *sw = param;
if (event_type & SUB_RETRY_RECV) {
sw = param;
BUG_ON(conn->recv_wait != NULL || (sw->events & SUB_RETRY_RECV));
sw->events |= SUB_RETRY_RECV;
conn->recv_wait = sw;
event_type &= ~SUB_RETRY_RECV;
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
BUG_ON(conn->subs && conn->subs->events & event_type);
BUG_ON(conn->subs && conn->subs != sw);
conn->subs = sw;
sw->events |= event_type;
if (event_type & SUB_RETRY_RECV)
__conn_xprt_want_recv(conn);
}
if (event_type & SUB_RETRY_SEND) {
sw = param;
BUG_ON(conn->send_wait != NULL || (sw->events & SUB_RETRY_SEND));
sw->events |= SUB_RETRY_SEND;
conn->send_wait = sw;
event_type &= ~SUB_RETRY_SEND;
if (event_type & SUB_RETRY_SEND)
__conn_xprt_want_send(conn);
}
if (event_type != 0)
return (-1);
conn_update_xprt_polling(conn);
return 0;
}

View File

@ -59,10 +59,10 @@ static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short stat
* subscribed to receive events, and otherwise call the wake
* method, to make sure the event is noticed.
*/
if (ctx->conn->recv_wait) {
ctx->conn->recv_wait->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(ctx->conn->recv_wait->tasklet);
ctx->conn->recv_wait = NULL;
if (ctx->conn->subs) {
ctx->conn->subs->events = 0;
tasklet_wakeup(ctx->conn->subs->tasklet);
ctx->conn->subs = NULL;
} else if (ctx->cs->data_cb->wake)
ctx->cs->data_cb->wake(ctx->cs);
return NULL;