mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-09 16:47:18 +02:00
MEDIUM: mux-quic: Add consumer-side fast-forwarding support
The QUIC multiplexer now implements callbacks to consume fast-forwarded data. It relies on the H3 stack to acquire the buffer and format the frame.
This commit is contained in:
parent
cd352c0dbe
commit
1bcc0f8892
@ -189,6 +189,8 @@ struct qcc_app_ops {
|
|||||||
int (*attach)(struct qcs *qcs, void *conn_ctx);
|
int (*attach)(struct qcs *qcs, void *conn_ctx);
|
||||||
ssize_t (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin);
|
ssize_t (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin);
|
||||||
size_t (*snd_buf)(struct qcs *qcs, struct htx *htx, size_t count);
|
size_t (*snd_buf)(struct qcs *qcs, struct htx *htx, size_t count);
|
||||||
|
size_t (*nego_ff)(struct qcs *qcs, size_t count);
|
||||||
|
size_t (*done_ff)(struct qcs *qcs);
|
||||||
int (*close)(struct qcs *qcs, enum qcc_app_ops_close_side side);
|
int (*close)(struct qcs *qcs, enum qcc_app_ops_close_side side);
|
||||||
void (*detach)(struct qcs *qcs);
|
void (*detach)(struct qcs *qcs);
|
||||||
int (*finalize)(void *ctx);
|
int (*finalize)(void *ctx);
|
||||||
|
65
src/h3.c
65
src/h3.c
@ -1836,6 +1836,69 @@ static size_t h3_snd_buf(struct qcs *qcs, struct htx *htx, size_t count)
|
|||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static size_t h3_nego_ff(struct qcs *qcs, size_t count)
|
||||||
|
{
|
||||||
|
struct buffer *res;
|
||||||
|
int hsize;
|
||||||
|
size_t sz, ret = 0;
|
||||||
|
|
||||||
|
h3_debug_printf(stderr, "%s\n", __func__);
|
||||||
|
|
||||||
|
/* FIXME: no check on ALLOC ? */
|
||||||
|
res = mux_get_buf(qcs);
|
||||||
|
|
||||||
|
/* h3 DATA headers : 1-byte frame type + varint frame length */
|
||||||
|
hsize = 1 + QUIC_VARINT_MAX_SIZE;
|
||||||
|
while (1) {
|
||||||
|
if (b_contig_space(res) >= hsize || !b_space_wraps(res))
|
||||||
|
break;
|
||||||
|
b_slow_realign(res, trash.area, b_data(res));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Not enough room for headers and at least one data byte, block the
|
||||||
|
* stream. It is expected that the stream connector layer will subscribe
|
||||||
|
* on SEND.
|
||||||
|
*/
|
||||||
|
if (b_contig_space(res) <= hsize) {
|
||||||
|
qcs->flags |= QC_SF_BLK_MROOM;
|
||||||
|
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Cannot forward more than available room in output buffer */
|
||||||
|
sz = b_contig_space(res) - hsize;
|
||||||
|
if (count > sz)
|
||||||
|
count = sz;
|
||||||
|
|
||||||
|
qcs->sd->iobuf.buf = res;
|
||||||
|
qcs->sd->iobuf.offset = hsize;
|
||||||
|
qcs->sd->iobuf.data = 0;
|
||||||
|
|
||||||
|
ret = count;
|
||||||
|
end:
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static size_t h3_done_ff(struct qcs *qcs)
|
||||||
|
{
|
||||||
|
size_t total = qcs->sd->iobuf.data;
|
||||||
|
|
||||||
|
h3_debug_printf(stderr, "%s\n", __func__);
|
||||||
|
|
||||||
|
if (qcs->sd->iobuf.data) {
|
||||||
|
b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.data);
|
||||||
|
b_putchr(qcs->sd->iobuf.buf, 0x00); /* h3 frame type = DATA */
|
||||||
|
b_quic_enc_int(qcs->sd->iobuf.buf, qcs->sd->iobuf.data, QUIC_VARINT_MAX_SIZE); /* h3 frame length */
|
||||||
|
b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
qcs->sd->iobuf.buf = NULL;
|
||||||
|
qcs->sd->iobuf.offset = 0;
|
||||||
|
qcs->sd->iobuf.data = 0;
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
/* Notify about a closure on <qcs> stream requested by the remote peer.
|
/* Notify about a closure on <qcs> stream requested by the remote peer.
|
||||||
*
|
*
|
||||||
* Stream channel <side> is explained relative to our endpoint : WR for
|
* Stream channel <side> is explained relative to our endpoint : WR for
|
||||||
@ -2132,6 +2195,8 @@ const struct qcc_app_ops h3_ops = {
|
|||||||
.attach = h3_attach,
|
.attach = h3_attach,
|
||||||
.decode_qcs = h3_decode_qcs,
|
.decode_qcs = h3_decode_qcs,
|
||||||
.snd_buf = h3_snd_buf,
|
.snd_buf = h3_snd_buf,
|
||||||
|
.nego_ff = h3_nego_ff,
|
||||||
|
.done_ff = h3_done_ff,
|
||||||
.close = h3_close,
|
.close = h3_close,
|
||||||
.detach = h3_detach,
|
.detach = h3_detach,
|
||||||
.finalize = h3_finalize,
|
.finalize = h3_finalize,
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
#include <haproxy/stconn.h>
|
#include <haproxy/stconn.h>
|
||||||
#include <haproxy/time.h>
|
#include <haproxy/time.h>
|
||||||
#include <haproxy/trace.h>
|
#include <haproxy/trace.h>
|
||||||
|
#include <haproxy/xref.h>
|
||||||
|
|
||||||
DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
|
DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
|
||||||
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
|
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
|
||||||
@ -2807,6 +2808,85 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
|
||||||
|
{
|
||||||
|
struct qcs *qcs = __sc_mux_strm(sc);
|
||||||
|
size_t ret = 0;
|
||||||
|
|
||||||
|
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
|
||||||
|
|
||||||
|
/* stream layer has been detached so no transfer must occur after. */
|
||||||
|
BUG_ON_HOT(qcs->flags & QC_SF_DETACH);
|
||||||
|
|
||||||
|
if (!qcs->qcc->app_ops->nego_ff || !qcs->qcc->app_ops->done_ff) {
|
||||||
|
/* Fast forwading is not supported by the QUIC application layer */
|
||||||
|
qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Alawys disable splicing */
|
||||||
|
qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
|
||||||
|
|
||||||
|
ret = qcs->qcc->app_ops->nego_ff(qcs, count);
|
||||||
|
if (!ret)
|
||||||
|
goto end;
|
||||||
|
|
||||||
|
/* forward remaining input data */
|
||||||
|
if (b_data(input)) {
|
||||||
|
size_t xfer = ret;
|
||||||
|
|
||||||
|
if (xfer > b_data(input))
|
||||||
|
xfer = b_data(input);
|
||||||
|
b_add(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset);
|
||||||
|
qcs->sd->iobuf.data = b_xfer(qcs->sd->iobuf.buf, input, xfer);
|
||||||
|
b_sub(qcs->sd->iobuf.buf, qcs->sd->iobuf.offset);
|
||||||
|
|
||||||
|
/* Cannot forward more data, wait for room */
|
||||||
|
if (b_data(input))
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
ret -= qcs->sd->iobuf.data;
|
||||||
|
|
||||||
|
end:
|
||||||
|
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static size_t qmux_done_ff(struct stconn *sc)
|
||||||
|
{
|
||||||
|
struct qcs *qcs = __sc_mux_strm(sc);
|
||||||
|
struct qcc *qcc = qcs->qcc;
|
||||||
|
struct sedesc *sd = qcs->sd;
|
||||||
|
size_t total = 0;
|
||||||
|
|
||||||
|
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
|
||||||
|
|
||||||
|
if (sd->iobuf.flags & IOBUF_FL_EOI)
|
||||||
|
qcs->flags |= QC_SF_FIN_STREAM;
|
||||||
|
|
||||||
|
if (!(qcs->flags & QC_SF_FIN_STREAM) && !sd->iobuf.data)
|
||||||
|
goto end;
|
||||||
|
|
||||||
|
total = qcs->qcc->app_ops->done_ff(qcs);
|
||||||
|
|
||||||
|
qcc_send_stream(qcs, 0);
|
||||||
|
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
|
||||||
|
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||||
|
|
||||||
|
end:
|
||||||
|
if (!b_data(&qcs->tx.buf))
|
||||||
|
b_free(&qcs->tx.buf);
|
||||||
|
|
||||||
|
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int qmux_resume_ff(struct stconn *sc, unsigned int flags)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* Called from the upper layer, to subscribe <es> to events <event_type>. The
|
/* Called from the upper layer, to subscribe <es> to events <event_type>. The
|
||||||
* event subscriber <es> is not allowed to change from a previous call as long
|
* event subscriber <es> is not allowed to change from a previous call as long
|
||||||
* as at least one event is still subscribed. The <event_type> must only be a
|
* as at least one event is still subscribed. The <event_type> must only be a
|
||||||
@ -2927,6 +3007,9 @@ static const struct mux_ops qmux_ops = {
|
|||||||
.detach = qmux_strm_detach,
|
.detach = qmux_strm_detach,
|
||||||
.rcv_buf = qmux_strm_rcv_buf,
|
.rcv_buf = qmux_strm_rcv_buf,
|
||||||
.snd_buf = qmux_strm_snd_buf,
|
.snd_buf = qmux_strm_snd_buf,
|
||||||
|
.nego_fastfwd = qmux_nego_ff,
|
||||||
|
.done_fastfwd = qmux_done_ff,
|
||||||
|
.resume_fastfwd = qmux_resume_ff,
|
||||||
.subscribe = qmux_strm_subscribe,
|
.subscribe = qmux_strm_subscribe,
|
||||||
.unsubscribe = qmux_strm_unsubscribe,
|
.unsubscribe = qmux_strm_unsubscribe,
|
||||||
.wake = qmux_wake,
|
.wake = qmux_wake,
|
||||||
|
Loading…
Reference in New Issue
Block a user