mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-05-04 12:41:00 +02:00
MEDIUM: mux-pt: Add fast-forwarding support
The PT multiplexer now implements callbacks function to produce and consume fast-forwarded data. Only splicing is support because the mux-pt does not use its own buffers.
This commit is contained in:
parent
169df3b3a8
commit
ec22d3102d
202
src/mux_pt.c
202
src/mux_pt.c
@ -13,11 +13,12 @@
|
||||
#include <haproxy/api.h>
|
||||
#include <haproxy/buf.h>
|
||||
#include <haproxy/connection.h>
|
||||
#include <haproxy/pipe-t.h>
|
||||
#include <haproxy/pipe.h>
|
||||
#include <haproxy/stconn.h>
|
||||
#include <haproxy/stream.h>
|
||||
#include <haproxy/task.h>
|
||||
#include <haproxy/trace.h>
|
||||
#include <haproxy/xref.h>
|
||||
|
||||
struct mux_pt_ctx {
|
||||
struct sedesc *sd;
|
||||
@ -322,6 +323,8 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
|
||||
}
|
||||
conn->ctx = ctx;
|
||||
se_fl_set(ctx->sd, SE_FL_RCV_MORE);
|
||||
if (global.tune.options & GTUNE_USE_SPLICE)
|
||||
se_fl_set(ctx->sd, SE_FL_MAY_FASTFWD);
|
||||
|
||||
TRACE_LEAVE(PT_EV_CONN_NEW, conn);
|
||||
return 0;
|
||||
@ -562,6 +565,195 @@ static size_t mux_pt_snd_buf(struct stconn *sc, struct buffer *buf, size_t count
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline struct sedesc *mux_pt_opposite_sd(struct mux_pt_ctx *ctx)
|
||||
{
|
||||
struct xref *peer;
|
||||
struct sedesc *sdo;
|
||||
|
||||
peer = xref_get_peer_and_lock(&ctx->sd->xref);
|
||||
if (!peer)
|
||||
return NULL;
|
||||
|
||||
sdo = container_of(peer, struct sedesc, xref);
|
||||
xref_unlock(&ctx->sd->xref, peer);
|
||||
return sdo;
|
||||
}
|
||||
|
||||
static size_t mux_pt_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
|
||||
{
|
||||
struct connection *conn = __sc_conn(sc);
|
||||
struct mux_pt_ctx *ctx = conn->ctx;
|
||||
size_t ret = 0;
|
||||
|
||||
TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){count});
|
||||
|
||||
/* Use kernel splicing if it is supported by the sender and if there
|
||||
* are no input data _AND_ no output data.
|
||||
*
|
||||
* TODO: It may be good to add a flag to send obuf data first if any,
|
||||
* and then data in pipe, or the opposite. For now, it is not
|
||||
* supported to mix data.
|
||||
*/
|
||||
if (!b_data(input) && may_splice) {
|
||||
if (conn->xprt->snd_pipe && (ctx->sd->iobuf.pipe || (pipes_used < global.maxpipes && (ctx->sd->iobuf.pipe = get_pipe())))) {
|
||||
ctx->sd->iobuf.offset = 0;
|
||||
ctx->sd->iobuf.data = 0;
|
||||
ret = count;
|
||||
goto out;
|
||||
}
|
||||
ctx->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
|
||||
TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", PT_EV_TX_DATA, conn, sc);
|
||||
}
|
||||
|
||||
/* No buffer case */
|
||||
|
||||
out:
|
||||
TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){ret});
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void mux_pt_done_ff(struct stconn *sc)
|
||||
{
|
||||
struct connection *conn = __sc_conn(sc);
|
||||
struct mux_pt_ctx *ctx = conn->ctx;
|
||||
struct sedesc *sd = ctx->sd;
|
||||
size_t total = 0;
|
||||
|
||||
TRACE_ENTER(PT_EV_TX_DATA, conn, sc);
|
||||
|
||||
if (sd->iobuf.pipe) {
|
||||
total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
|
||||
if (!sd->iobuf.pipe->data) {
|
||||
put_pipe(sd->iobuf.pipe);
|
||||
sd->iobuf.pipe = NULL;
|
||||
}
|
||||
}
|
||||
else {
|
||||
BUG_ON(sd->iobuf.buf);
|
||||
}
|
||||
|
||||
out:
|
||||
if (conn->flags & CO_FL_ERROR) {
|
||||
if (conn_xprt_read0_pending(conn))
|
||||
se_fl_set(ctx->sd, SE_FL_EOS);
|
||||
se_fl_set_error(ctx->sd);
|
||||
TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc);
|
||||
}
|
||||
|
||||
TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total});
|
||||
}
|
||||
|
||||
static int mux_pt_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
|
||||
{
|
||||
struct connection *conn = __sc_conn(sc);
|
||||
struct mux_pt_ctx *ctx = conn->ctx;
|
||||
struct sedesc *sdo = NULL;
|
||||
size_t total = 0, try = 0;
|
||||
int ret = 0;
|
||||
|
||||
TRACE_ENTER(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){count});
|
||||
|
||||
se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
|
||||
conn->flags &= ~CO_FL_WAIT_ROOM;
|
||||
sdo = mux_pt_opposite_sd(ctx);
|
||||
if (!sdo) {
|
||||
TRACE_STATE("Opposite endpoint not available yet", PT_EV_RX_DATA, conn, sc);
|
||||
goto out;
|
||||
}
|
||||
|
||||
try = se_init_ff(sdo, &BUF_NULL, count, conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING));
|
||||
if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
|
||||
/* Fast forwading is not supported by the consumer */
|
||||
se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD);
|
||||
TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", PT_EV_RX_DATA, conn, sc);
|
||||
goto end;
|
||||
}
|
||||
if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
|
||||
se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
|
||||
TRACE_STATE("waiting for more room", PT_EV_RX_DATA|PT_EV_STRM_ERR, conn, sc);
|
||||
goto out;
|
||||
}
|
||||
|
||||
total += sdo->iobuf.data;
|
||||
|
||||
if (sdo->iobuf.pipe) {
|
||||
/* Here, not data was xferred */
|
||||
ret = conn->xprt->rcv_pipe(conn, conn->xprt_ctx, sdo->iobuf.pipe, try);
|
||||
if (ret < 0) {
|
||||
TRACE_ERROR("Error when trying to fast-forward data, disable it and abort",
|
||||
PT_EV_RX_DATA|PT_EV_STRM_ERR|PT_EV_CONN_ERR, conn, sc);
|
||||
se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD);
|
||||
BUG_ON(sdo->iobuf.pipe->data);
|
||||
put_pipe(sdo->iobuf.pipe);
|
||||
sdo->iobuf.pipe = NULL;
|
||||
goto end;
|
||||
}
|
||||
total += ret;
|
||||
}
|
||||
else {
|
||||
BUG_ON(sdo->iobuf.buf);
|
||||
ret = -1; /* abort splicing for now and fallback to buffer mode */
|
||||
goto end;
|
||||
}
|
||||
|
||||
ret = total;
|
||||
se_done_ff(sdo);
|
||||
|
||||
if (sdo->iobuf.pipe) {
|
||||
se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
|
||||
}
|
||||
|
||||
TRACE_DEVEL("Data fast-forwarded", PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret});
|
||||
|
||||
|
||||
out:
|
||||
if (conn->flags & CO_FL_ERROR) {
|
||||
if (conn_xprt_read0_pending(conn))
|
||||
se_fl_set(ctx->sd, SE_FL_EOS);
|
||||
se_fl_set(ctx->sd, SE_FL_ERROR);
|
||||
TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, sc);
|
||||
}
|
||||
else if (conn_xprt_read0_pending(conn)) {
|
||||
se_fl_set(ctx->sd, (SE_FL_EOS|SE_FL_EOI));
|
||||
TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, sc);
|
||||
}
|
||||
end:
|
||||
TRACE_LEAVE(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret});
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int mux_pt_resume_fastfwd(struct stconn *sc, unsigned int flags)
|
||||
{
|
||||
struct connection *conn = __sc_conn(sc);
|
||||
struct mux_pt_ctx *ctx = conn->ctx;
|
||||
struct sedesc *sd = ctx->sd;
|
||||
size_t total = 0;
|
||||
|
||||
TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){flags});
|
||||
|
||||
if (sd->iobuf.pipe) {
|
||||
total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
|
||||
if (!sd->iobuf.pipe->data) {
|
||||
put_pipe(sd->iobuf.pipe);
|
||||
sd->iobuf.pipe = NULL;
|
||||
}
|
||||
}
|
||||
else {
|
||||
BUG_ON(sd->iobuf.buf);
|
||||
}
|
||||
|
||||
out:
|
||||
if (conn->flags & CO_FL_ERROR) {
|
||||
if (conn_xprt_read0_pending(conn))
|
||||
se_fl_set(ctx->sd, SE_FL_EOS);
|
||||
se_fl_set_error(ctx->sd);
|
||||
TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc);
|
||||
}
|
||||
|
||||
TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total});
|
||||
return total;
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to subscribe <es> to events <event_type>. The
|
||||
* event subscriber <es> is not allowed to change from a previous call as long
|
||||
* as at least one event is still subscribed. The <event_type> must only be a
|
||||
@ -608,6 +800,10 @@ const struct mux_ops mux_tcp_ops = {
|
||||
.wake = mux_pt_wake,
|
||||
.rcv_buf = mux_pt_rcv_buf,
|
||||
.snd_buf = mux_pt_snd_buf,
|
||||
.init_fastfwd = mux_pt_init_ff,
|
||||
.done_fastfwd = mux_pt_done_ff,
|
||||
.fastfwd = mux_pt_fastfwd,
|
||||
.resume_fastfwd = mux_pt_resume_fastfwd,
|
||||
.subscribe = mux_pt_subscribe,
|
||||
.unsubscribe = mux_pt_unsubscribe,
|
||||
.attach = mux_pt_attach,
|
||||
@ -629,6 +825,10 @@ const struct mux_ops mux_pt_ops = {
|
||||
.wake = mux_pt_wake,
|
||||
.rcv_buf = mux_pt_rcv_buf,
|
||||
.snd_buf = mux_pt_snd_buf,
|
||||
.init_fastfwd = mux_pt_init_ff,
|
||||
.done_fastfwd = mux_pt_done_ff,
|
||||
.fastfwd = mux_pt_fastfwd,
|
||||
.resume_fastfwd = mux_pt_resume_fastfwd,
|
||||
.subscribe = mux_pt_subscribe,
|
||||
.unsubscribe = mux_pt_unsubscribe,
|
||||
.attach = mux_pt_attach,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user