mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 14:21:25 +02:00
MEDIUM: mux: Teach the mux_pt how to deal with idle connections.
In order to make the mux_pt able to handle idle connections, give it its own context, where it'll stores the connection, the current conn_stream if any, and a wait_event, so that it can subscribe to I/O events. Add a new parameter to the detach() method, that gives the mux a hint if it should destroy the connection or not when detaching a conn_stream. If 1, then the mux_pt immediately destroys the connecion, if 0, then it just subscribes to any read event. If a read happens, it will call conn_sock_drain(), and if there's a connection error, it'll free the connection, after removing it from the idle list.
This commit is contained in:
parent
47e9a1ad4e
commit
b6c32ee4c2
96
src/mux_pt.c
96
src/mux_pt.c
@ -13,6 +13,46 @@
|
|||||||
#include <common/config.h>
|
#include <common/config.h>
|
||||||
#include <proto/connection.h>
|
#include <proto/connection.h>
|
||||||
#include <proto/stream.h>
|
#include <proto/stream.h>
|
||||||
|
#include <proto/task.h>
|
||||||
|
|
||||||
|
static struct pool_head *pool_head_pt_ctx;
|
||||||
|
|
||||||
|
struct mux_pt_ctx {
|
||||||
|
struct conn_stream *cs;
|
||||||
|
struct connection *conn;
|
||||||
|
struct wait_event wait_event;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void mux_pt_destroy(struct mux_pt_ctx *ctx)
|
||||||
|
{
|
||||||
|
struct connection *conn = ctx->conn;
|
||||||
|
|
||||||
|
LIST_DEL(&conn->list);
|
||||||
|
conn_stop_tracking(conn);
|
||||||
|
conn_full_close(conn);
|
||||||
|
if (conn->destroy_cb)
|
||||||
|
conn->destroy_cb(conn);
|
||||||
|
/* We don't bother unsubscribing here, as we're about to destroy
|
||||||
|
* both the connection and the mux_pt_ctx
|
||||||
|
*/
|
||||||
|
conn_free(conn);
|
||||||
|
pool_free(pool_head_pt_ctx, ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Callback, used when we get I/Os while in idle mode */
|
||||||
|
static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short status)
|
||||||
|
{
|
||||||
|
struct mux_pt_ctx *ctx = tctx;
|
||||||
|
|
||||||
|
conn_sock_drain(ctx->conn);
|
||||||
|
if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))
|
||||||
|
mux_pt_destroy(ctx);
|
||||||
|
else
|
||||||
|
ctx->conn->xprt->subscribe(ctx->conn, SUB_CAN_RECV,
|
||||||
|
&ctx->wait_event);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Initialize the mux once it's attached. It is expected that conn->mux_ctx
|
/* Initialize the mux once it's attached. It is expected that conn->mux_ctx
|
||||||
* points to the existing conn_stream (for outgoing connections) or NULL (for
|
* points to the existing conn_stream (for outgoing connections) or NULL (for
|
||||||
@ -22,21 +62,38 @@
|
|||||||
static int mux_pt_init(struct connection *conn, struct proxy *prx)
|
static int mux_pt_init(struct connection *conn, struct proxy *prx)
|
||||||
{
|
{
|
||||||
struct conn_stream *cs = conn->mux_ctx;
|
struct conn_stream *cs = conn->mux_ctx;
|
||||||
|
struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
|
||||||
|
|
||||||
|
if (!ctx)
|
||||||
|
goto fail;
|
||||||
|
|
||||||
|
ctx->wait_event.task = tasklet_new();
|
||||||
|
if (!ctx->wait_event.task)
|
||||||
|
goto fail_free_ctx;
|
||||||
|
ctx->wait_event.task->context = ctx;
|
||||||
|
ctx->wait_event.task->process = mux_pt_io_cb;
|
||||||
|
ctx->wait_event.wait_reason = 0;
|
||||||
|
ctx->conn = conn;
|
||||||
|
|
||||||
if (!cs) {
|
if (!cs) {
|
||||||
cs = cs_new(conn);
|
cs = cs_new(conn);
|
||||||
if (!cs)
|
if (!cs)
|
||||||
goto fail;
|
goto fail_free_ctx;
|
||||||
|
|
||||||
if (stream_create_from_cs(cs) < 0)
|
if (stream_create_from_cs(cs) < 0)
|
||||||
goto fail_free;
|
goto fail_free;
|
||||||
|
|
||||||
conn->mux_ctx = cs;
|
|
||||||
}
|
}
|
||||||
|
conn->mux_ctx = ctx;
|
||||||
|
ctx->cs = cs;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
fail_free:
|
fail_free:
|
||||||
cs_free(cs);
|
cs_free(cs);
|
||||||
|
fail_free_ctx:
|
||||||
|
if (ctx->wait_event.task)
|
||||||
|
tasklet_free(ctx->wait_event.task);
|
||||||
|
pool_free(pool_head_pt_ctx, ctx);
|
||||||
fail:
|
fail:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -46,13 +103,22 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx)
|
|||||||
*/
|
*/
|
||||||
static int mux_pt_wake(struct connection *conn)
|
static int mux_pt_wake(struct connection *conn)
|
||||||
{
|
{
|
||||||
struct conn_stream *cs = conn->mux_ctx;
|
struct mux_pt_ctx *ctx = conn->mux_ctx;
|
||||||
int ret;
|
struct conn_stream *cs = ctx->cs;
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
|
if (cs) {
|
||||||
|
ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
|
||||||
|
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
|
} else {
|
||||||
|
conn_sock_drain(conn);
|
||||||
|
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
|
||||||
|
mux_pt_destroy(ctx);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* If we had early data, and we're done with the handshake
|
/* If we had early data, and we're done with the handshake
|
||||||
* then whe know the data are safe, and we can remove the flag.
|
* then whe know the data are safe, and we can remove the flag.
|
||||||
@ -77,7 +143,8 @@ static struct conn_stream *mux_pt_attach(struct connection *conn)
|
|||||||
*/
|
*/
|
||||||
static const struct conn_stream *mux_pt_get_first_cs(const struct connection *conn)
|
static const struct conn_stream *mux_pt_get_first_cs(const struct connection *conn)
|
||||||
{
|
{
|
||||||
struct conn_stream *cs = conn->mux_ctx;
|
struct mux_pt_ctx *ctx = conn->mux_ctx;
|
||||||
|
struct conn_stream *cs = ctx->cs;
|
||||||
|
|
||||||
return cs;
|
return cs;
|
||||||
}
|
}
|
||||||
@ -88,13 +155,12 @@ static const struct conn_stream *mux_pt_get_first_cs(const struct connection *co
|
|||||||
static void mux_pt_detach(struct conn_stream *cs)
|
static void mux_pt_detach(struct conn_stream *cs)
|
||||||
{
|
{
|
||||||
struct connection *conn = cs->conn;
|
struct connection *conn = cs->conn;
|
||||||
|
struct mux_pt_ctx *ctx = cs->conn->mux_ctx;
|
||||||
|
|
||||||
LIST_DEL(&conn->list);
|
/* Subscribe, to know if we got disconnected */
|
||||||
conn_stop_tracking(conn);
|
conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
|
||||||
conn_full_close(conn);
|
ctx->cs = NULL;
|
||||||
if (conn->destroy_cb)
|
mux_pt_destroy(ctx);
|
||||||
conn->destroy_cb(conn);
|
|
||||||
conn_free(conn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||||
@ -209,4 +275,6 @@ __attribute__((constructor))
|
|||||||
static void __mux_pt_init(void)
|
static void __mux_pt_init(void)
|
||||||
{
|
{
|
||||||
register_mux_proto(&mux_proto_pt);
|
register_mux_proto(&mux_proto_pt);
|
||||||
|
pool_head_pt_ctx = create_pool("mux_pt", sizeof(struct mux_pt_ctx),
|
||||||
|
MEM_F_SHARED);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user