From b6c32ee4c24917d1a2d9a00292aaf24416fadd87 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Mon, 5 Nov 2018 18:28:43 +0100 Subject: [PATCH] MEDIUM: mux: Teach the mux_pt how to deal with idle connections. In order to make the mux_pt able to handle idle connections, give it its own context, where it'll stores the connection, the current conn_stream if any, and a wait_event, so that it can subscribe to I/O events. Add a new parameter to the detach() method, that gives the mux a hint if it should destroy the connection or not when detaching a conn_stream. If 1, then the mux_pt immediately destroys the connecion, if 0, then it just subscribes to any read event. If a read happens, it will call conn_sock_drain(), and if there's a connection error, it'll free the connection, after removing it from the idle list. --- src/mux_pt.c | 96 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 82 insertions(+), 14 deletions(-) diff --git a/src/mux_pt.c b/src/mux_pt.c index dd052c5fd..25b1cfe0d 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -13,6 +13,46 @@ #include #include #include +#include + +static struct pool_head *pool_head_pt_ctx; + +struct mux_pt_ctx { + struct conn_stream *cs; + struct connection *conn; + struct wait_event wait_event; +}; + +static void mux_pt_destroy(struct mux_pt_ctx *ctx) +{ + struct connection *conn = ctx->conn; + + LIST_DEL(&conn->list); + conn_stop_tracking(conn); + conn_full_close(conn); + if (conn->destroy_cb) + conn->destroy_cb(conn); + /* We don't bother unsubscribing here, as we're about to destroy + * both the connection and the mux_pt_ctx + */ + conn_free(conn); + pool_free(pool_head_pt_ctx, ctx); +} + +/* Callback, used when we get I/Os while in idle mode */ +static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short status) +{ + struct mux_pt_ctx *ctx = tctx; + + conn_sock_drain(ctx->conn); + if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) + mux_pt_destroy(ctx); + else + ctx->conn->xprt->subscribe(ctx->conn, SUB_CAN_RECV, + &ctx->wait_event); + + return NULL; +} /* Initialize the mux once it's attached. It is expected that conn->mux_ctx * points to the existing conn_stream (for outgoing connections) or NULL (for @@ -22,21 +62,38 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx) { struct conn_stream *cs = conn->mux_ctx; + struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx); + + if (!ctx) + goto fail; + + ctx->wait_event.task = tasklet_new(); + if (!ctx->wait_event.task) + goto fail_free_ctx; + ctx->wait_event.task->context = ctx; + ctx->wait_event.task->process = mux_pt_io_cb; + ctx->wait_event.wait_reason = 0; + ctx->conn = conn; if (!cs) { cs = cs_new(conn); if (!cs) - goto fail; + goto fail_free_ctx; if (stream_create_from_cs(cs) < 0) goto fail_free; - conn->mux_ctx = cs; } + conn->mux_ctx = ctx; + ctx->cs = cs; return 0; fail_free: cs_free(cs); +fail_free_ctx: + if (ctx->wait_event.task) + tasklet_free(ctx->wait_event.task); + pool_free(pool_head_pt_ctx, ctx); fail: return -1; } @@ -46,13 +103,22 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx) */ static int mux_pt_wake(struct connection *conn) { - struct conn_stream *cs = conn->mux_ctx; - int ret; + struct mux_pt_ctx *ctx = conn->mux_ctx; + struct conn_stream *cs = ctx->cs; + int ret = 0; - ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0; + if (cs) { + ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0; - if (ret < 0) - return ret; + if (ret < 0) + return ret; + } else { + conn_sock_drain(conn); + if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) { + mux_pt_destroy(ctx); + return -1; + } + } /* If we had early data, and we're done with the handshake * then whe know the data are safe, and we can remove the flag. @@ -77,7 +143,8 @@ static struct conn_stream *mux_pt_attach(struct connection *conn) */ static const struct conn_stream *mux_pt_get_first_cs(const struct connection *conn) { - struct conn_stream *cs = conn->mux_ctx; + struct mux_pt_ctx *ctx = conn->mux_ctx; + struct conn_stream *cs = ctx->cs; return cs; } @@ -88,13 +155,12 @@ static const struct conn_stream *mux_pt_get_first_cs(const struct connection *co static void mux_pt_detach(struct conn_stream *cs) { struct connection *conn = cs->conn; + struct mux_pt_ctx *ctx = cs->conn->mux_ctx; - LIST_DEL(&conn->list); - conn_stop_tracking(conn); - conn_full_close(conn); - if (conn->destroy_cb) - conn->destroy_cb(conn); - conn_free(conn); + /* Subscribe, to know if we got disconnected */ + conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event); + ctx->cs = NULL; + mux_pt_destroy(ctx); } static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode) @@ -209,4 +275,6 @@ __attribute__((constructor)) static void __mux_pt_init(void) { register_mux_proto(&mux_proto_pt); + pool_head_pt_ctx = create_pool("mux_pt", sizeof(struct mux_pt_ctx), + MEM_F_SHARED); }