TMP merge common code step 2

This commit is contained in:
Amaury Denoyelle 2025-12-10 15:26:53 +01:00
parent 670e8b2f9a
commit 909158c7e0
4 changed files with 187 additions and 272 deletions

View File

@ -34,6 +34,8 @@ void qcs_notify_send(struct qcs *qcs);
void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset);
void qcc_notify_buf(struct qcc *qcc, uint64_t free_size);
void _qmux_ctrl_send(struct qcs *qcs, uint64_t data, uint64_t offset);
struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small);
struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs);
@ -50,6 +52,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
int qcc_recv_max_streams(struct qcc *qcc, uint64_t max, int bidi);
int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size);
int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err);
int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream);
static inline int qmux_stream_rx_bufsz(void)
{

View File

@ -5,4 +5,8 @@
int qcc_qos_recv(struct qcc *qcc);
int qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream);
int qcc_qos_send_tp(struct qcc *qcc);
#endif /* _HAPROXY_MUX_QUIC_QOS_H */

View File

@ -613,6 +613,9 @@ static uint64_t qcs_prep_bytes(const struct qcs *qcs)
}
}
/* Callback for notification about emission of a STREAM frame of <data> length
* starting at <offset>.
*/
void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset)
{
struct qcc *qcc = qcs->qcc;
@ -2575,11 +2578,6 @@ static int _qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
return -1;
}
if (stream && qcc_is_pacing_active(qcc->conn))
pacer = &qcc->tx.pacer;
@ -2613,97 +2611,15 @@ static int _qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
* Returns 0 if all data sent with success. On fatal error, a negative error
* code is returned. A positive 1 is used if emission should be paced.
*/
static int _qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream)
int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
{
struct connection *conn = qcc->conn;
struct quic_frame *frm, *frm_old;
unsigned char *pos, *old, *end;
size_t ret;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
list_for_each_entry_safe(frm, frm_old, frms, list) {
loop:
struct quic_frame *split_frm = NULL, *old_frm;
b_reset(&trash);
old = pos = (unsigned char *)b_orig(&trash);
end = (unsigned char *)b_wrap(&trash);
TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0,
"frm type %02llx", (ullong)frm->type);
if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) {
size_t flen, split_size;
flen = quic_strm_frm_fillbuf(end - pos, frm, &split_size);
if (!flen)
continue;
if (split_size) {
split_frm = quic_strm_frm_split(frm, split_size);
if (!split_frm) {
ABORT_NOW();
continue;
}
old_frm = frm;
frm = split_frm;
}
}
qc_build_frm(&pos, end, frm, NULL, NULL);
BUG_ON(pos - old > global.tune.bufsize);
BUG_ON(pos == old);
b_add(&trash, pos - old);
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0);
if (!ret) {
TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn);
if (split_frm)
LIST_INSERT(frms, &split_frm->list);
break;
}
if (ret != b_data(&trash)) {
/* TODO */
ABORT_NOW();
}
if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) {
qcs_on_data_sent(frm->stream.stream,
frm->stream.len, frm->stream.offset);
}
LIST_DEL_INIT(&frm->list);
if (split_frm) {
frm = old_frm;
goto loop;
}
if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
return -1;
}
if (conn->flags & CO_FL_ERROR) {
/* TODO */
//ABORT_NOW();
}
else if (!LIST_ISEMPTY(frms) && !(qcc->wait_event.events & SUB_RETRY_SEND)) {
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event);
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return 0;
}
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
* connection <qcc>.
*
* Returns 0 if all data sent with success. On fatal error, a negative error
* code is returned. A positive 1 is used if emission should be paced.
*/
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
{
return qmux_is_quic(qcc) ? _qcc_send_frames(qcc, frms, stream) :
_qcc_qos_send_frames(qcc, frms, stream);
qcc_qos_send_frames(qcc, frms, stream);
}
/* Emit a RESET_STREAM on <qcs>.
@ -3096,7 +3012,7 @@ static int qcc_io_send(struct qcc *qcc)
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (qcc_is_pacing_active(qcc->conn)) {
if (qmux_is_quic(qcc) && qcc_is_pacing_active(qcc->conn)) {
/* Always reset pacing_task timer to prevent unnecessary execution. */
qcc->pacing_task->expire = TICK_ETERNITY;
}
@ -3107,6 +3023,14 @@ static int qcc_io_send(struct qcc *qcc)
* apply for STREAM frames.
*/
if (!qmux_is_quic(qcc)) {
if (!(qcc->flags & QC_CF_QSTP_SENT)) {
if (qcc_qos_send_tp(qcc))
return 0;
qcc->flags |= QC_CF_QSTP_SENT;
}
}
/* Check for transport error. */
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@ -3154,30 +3078,35 @@ static int qcc_io_send(struct qcc *qcc)
goto out;
}
if (!LIST_ISEMPTY(frms) && qcc_is_pacing_active(qcc->conn)) {
if (!quic_pacing_reload(&qcc->tx.pacer)) {
qcc_wakeup_pacing(qcc);
total = 0;
goto out;
if (qmux_is_quic(qcc)) {
if (!LIST_ISEMPTY(frms) && qcc_is_pacing_active(qcc->conn)) {
if (!quic_pacing_reload(&qcc->tx.pacer)) {
qcc_wakeup_pacing(qcc);
total = 0;
goto out;
}
}
}
/* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached.
/* Retry sending until no frame to send, data rejected or
* connection flow-control limit reached.
*/
while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
window_conn = qfctl_rcap(&qcc->tx.fc);
resent = 0;
/* Reloop over <qcc.send_list>. Useful for streams which have
* fulfilled their qc_stream_desc buf and have now release it.
/* Reloop over <qcc.send_list>. Useful for streams
* which have fulfilled their qc_stream_desc buf and
* have now release it.
*/
list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) {
/* Only streams blocked on flow-control or waiting on a
* new qc_stream_desc should be present in send_list as
* long as transport layer can handle all data.
/* Only streams blocked on flow-control or
* waiting on a new qc_stream_desc should be
* present in send_list as long as transport
* layer can handle all data.
*/
BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc));
BUG_ON((!qmux_is_quic(qcc) || qcs->stream->buf) &&
!qfctl_rblocked(&qcs->tx.fc));
/* Total sent bytes must not exceed connection window. */
BUG_ON(resent > window_conn);
@ -3197,7 +3126,7 @@ static int qcc_io_send(struct qcc *qcc)
if (ret == 1) {
/* qcc_send_frames cannot return 1 if pacing not used. */
BUG_ON(!qcc_is_pacing_active(qcc->conn));
BUG_ON(!qmux_is_quic(qcc) || !qcc_is_pacing_active(qcc->conn));
qcc_wakeup_pacing(qcc);
}
@ -3447,7 +3376,7 @@ static int qcc_io_process(struct qcc *qcc)
/* If using listener socket, soft-stop is not supported. The
* connection must be closed immediately.
*/
if (!qc_test_fd(qcc->conn->handle.qc)) {
if (qmux_is_quic(qcc) && !qc_test_fd(qcc->conn->handle.qc)) {
TRACE_DEVEL("proxy disabled with listener socket, closing connection", QMUX_EV_QCC_WAKE, qcc->conn);
qcc->conn->flags |= (CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH);
qcc_io_send(qcc);
@ -3614,6 +3543,9 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int state)
conn_in_list = 0;
}
if (!qmux_is_quic(qcc))
qcc_io_recv(qcc);
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc);
@ -3627,7 +3559,7 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int state)
qcc_refresh_timeout(qcc);
/* Trigger pacing task is emission should be retried after some delay. */
if (qcc_is_pacing_active(conn)) {
if (qmux_is_quic(qcc) && qcc_is_pacing_active(conn)) {
if (tick_isset(qcc->pacing_task->expire))
task_queue(qcc->pacing_task);
}
@ -3775,8 +3707,6 @@ static void _qcc_init(struct qcc *qcc)
LIST_INIT(&qcc->tx.frms);
}
struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status);
static int qmux_init(struct connection *conn, struct proxy *prx,
struct session *sess, struct buffer *input)
{
@ -3800,6 +3730,9 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->glitches = 0;
qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
if (strcmp(conn->mux->name, "QOS") == 0)
qcc->flags |= QC_CF_QOS;
if (qmux_is_quic(qcc)) {
/* Server parameters, params used for RX flow control. */
lparams = &conn->handle.qc->rx.params;
@ -3881,7 +3814,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
LIST_INIT(&qcc->buf_wait_list);
LIST_INIT(&qcc->purg_list);
qcc->wait_event.tasklet->process = qmux_is_quic(qcc) ? qcc_io_cb : qcc_qos_io_cb;
qcc->wait_event.tasklet->process = qcc_io_cb;
qcc->wait_event.tasklet->context = qcc;
qcc->wait_event.tasklet->state |= TASK_F_WANTS_TIME;
qcc->wait_event.events = 0;
@ -3926,6 +3859,13 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
goto err;
}
}
else {
/* TODO hardcoded HTTP/3 ops */
if (qcc_install_app_ops(qcc, &h3_ops)) {
TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn);
goto err;
}
}
if (qcc->app_ops == &h3_ops && !conn_is_back(conn))
proxy_inc_fe_cum_sess_ver_ctr(sess->listener, prx, 3);
@ -3937,18 +3877,20 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
/* init read cycle */
tasklet_wakeup(qcc->wait_event.tasklet);
/* MUX is initialized before QUIC handshake completion if early data
* received. Flag connection to delay stream processing if
* wait-for-handshake is active.
*/
if (conn->handle.qc->state < QUIC_HS_ST_COMPLETE) {
if (!(conn->flags & CO_FL_EARLY_SSL_HS)) {
TRACE_STATE("flag connection with early data", QMUX_EV_QCC_WAKE, conn);
conn->flags |= CO_FL_EARLY_SSL_HS;
/* subscribe for handshake completion */
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV,
&qcc->wait_event);
qcc->flags |= QC_CF_WAIT_HS;
if (qmux_is_quic(qcc)) {
/* MUX is initialized before QUIC handshake completion if early data
* received. Flag connection to delay stream processing if
* wait-for-handshake is active.
*/
if (conn->handle.qc->state < QUIC_HS_ST_COMPLETE) {
if (!(conn->flags & CO_FL_EARLY_SSL_HS)) {
TRACE_STATE("flag connection with early data", QMUX_EV_QCC_WAKE, conn);
conn->flags |= CO_FL_EARLY_SSL_HS;
/* subscribe for handshake completion */
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV,
&qcc->wait_event);
qcc->flags |= QC_CF_WAIT_HS;
}
}
}
}
@ -4719,154 +4661,6 @@ static struct mux_proto_list mux_proto_quic =
INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic);
static int qcc_qos_send_tp(struct qcc *qcc)
{
struct quic_frame *frm;
struct list list = LIST_HEAD_INIT(list);
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
frm = qc_frm_alloc(QUIC_FT_QS_TP);
if (!frm) {
TRACE_ERROR("frame alloc failure", QMUX_EV_QCC_SEND, qcc->conn);
goto err;
}
LIST_APPEND(&list, &frm->list);
if (qcc_send_frames(qcc, &list, 0)) {
TRACE_DEVEL("QoS frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto err;
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return 0;
err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
return -1;
}
static int qcc_qos_io_send(struct qcc *qcc)
{
struct list *frms = &qcc->tx.frms;
/* Temporary list for QCS on error. */
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp;
uint64_t window_conn __maybe_unused = qfctl_rcap(&qcc->tx.fc);
int ret __maybe_unused = 0, total = 0, resent __maybe_unused;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (!(qcc->flags & QC_CF_QSTP_SENT)) {
if (qcc_qos_send_tp(qcc))
return 0;
qcc->flags |= QC_CF_QSTP_SENT;
}
/* Check for transport error. */
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
/* Check for locally detected connection error. */
if (qcc->flags & QC_CF_ERRL) {
/* Prepare a CONNECTION_CLOSE if not already done. */
if (!(qcc->flags & QC_CF_ERRL_DONE)) {
TRACE_DATA("report a connection error", QMUX_EV_QCC_SEND|QMUX_EV_QCC_ERR, qcc->conn);
quic_set_connection_close(qcc->conn->handle.qc, qcc->err);
qcc->flags |= QC_CF_ERRL_DONE;
}
goto out;
}
if (qcc->app_st < QCC_APP_ST_INIT) {
if (qcc_app_init(qcc))
goto out;
}
if (qcc->conn->flags & CO_FL_SOCK_WR_SH) {
qcc->conn->flags |= CO_FL_ERROR;
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
}
if (qcc_emit_rs_ss(qcc)) {
TRACE_DEVEL("emission interrupted on STOP_SENDING/RESET_STREAM send error", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
/* Encode new STREAM frames if list has been previously cleared. */
if (LIST_ISEMPTY(frms) && !LIST_ISEMPTY(&qcc->send_list)) {
total = qcc_build_frms(qcc, &qcs_failed);
if (LIST_ISEMPTY(frms))
goto out;
}
ret = qcc_send_frames(qcc, frms, 1);
out:
/* Re-insert on-error QCS at the end of the send-list. */
if (!LIST_ISEMPTY(&qcs_failed)) {
list_for_each_entry_safe(qcs, qcs_tmp, &qcs_failed, el_send) {
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcc->send_list, &qcs->el_send);
}
if (!qfctl_rblocked(&qcc->tx.fc))
tasklet_wakeup(qcc->wait_event.tasklet);
}
if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) {
TRACE_ERROR("error reported by transport layer",
QMUX_EV_QCC_SEND, qcc->conn);
qcc->flags |= QC_CF_ERR_CONN;
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return total;
}
struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status)
{
struct qcc *qcc = ctx;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
qcc_io_recv(qcc);
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_qos_io_send(qcc);
qcc_io_recv(qcc);
if (qcc_io_process(qcc)) {
TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn);
goto release;
}
qcc_refresh_timeout(qcc);
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
return t;
release:
qcc_shutdown(qcc);
qcc_release(qcc);
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
return NULL;
}
static const struct mux_ops qmux_qos_ops = {
.init = qmux_init,
.destroy = qmux_destroy,

View File

@ -78,3 +78,117 @@ int qcc_qos_recv(struct qcc *qcc)
err:
return -1;
}
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
* connection <qcc>.
*
* Returns 0 if all data sent with success. On fatal error, a negative error
* code is returned. A positive 1 is used if emission should be paced.
*/
int qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream)
{
struct connection *conn = qcc->conn;
struct quic_frame *frm, *frm_old;
unsigned char *pos, *old, *end;
size_t ret;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
list_for_each_entry_safe(frm, frm_old, frms, list) {
loop:
struct quic_frame *split_frm = NULL, *old_frm;
b_reset(&trash);
old = pos = (unsigned char *)b_orig(&trash);
end = (unsigned char *)b_wrap(&trash);
TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0,
"frm type %02llx", (ullong)frm->type);
if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) {
size_t flen, split_size;
flen = quic_strm_frm_fillbuf(end - pos, frm, &split_size);
if (!flen)
continue;
if (split_size) {
split_frm = quic_strm_frm_split(frm, split_size);
if (!split_frm) {
ABORT_NOW();
continue;
}
old_frm = frm;
frm = split_frm;
}
}
qc_build_frm(&pos, end, frm, NULL, NULL);
BUG_ON(pos - old > global.tune.bufsize);
BUG_ON(pos == old);
b_add(&trash, pos - old);
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0);
if (!ret) {
TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn);
if (split_frm)
LIST_INSERT(frms, &split_frm->list);
break;
}
if (ret != b_data(&trash)) {
/* TODO */
ABORT_NOW();
}
if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) {
qcs_on_data_sent(frm->stream.stream,
frm->stream.len, frm->stream.offset);
}
LIST_DEL_INIT(&frm->list);
if (split_frm) {
frm = old_frm;
goto loop;
}
}
if (conn->flags & CO_FL_ERROR) {
/* TODO */
//ABORT_NOW();
}
else if (!LIST_ISEMPTY(frms) && !(qcc->wait_event.events & SUB_RETRY_SEND)) {
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event);
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return 0;
}
int qcc_qos_send_tp(struct qcc *qcc)
{
struct quic_frame *frm;
struct list list = LIST_HEAD_INIT(list);
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
frm = qc_frm_alloc(QUIC_FT_QS_TP);
if (!frm) {
TRACE_ERROR("frame alloc failure", QMUX_EV_QCC_SEND, qcc->conn);
goto err;
}
LIST_APPEND(&list, &frm->list);
if (qcc_send_frames(qcc, &list, 0)) {
TRACE_DEVEL("QoS frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto err;
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return 0;
err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
return -1;
}