diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index ac623be2a..741585746 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -34,6 +34,8 @@ void qcs_notify_send(struct qcs *qcs); void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset); void qcc_notify_buf(struct qcc *qcc, uint64_t free_size); +void _qmux_ctrl_send(struct qcs *qcs, uint64_t data, uint64_t offset); + struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs); struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small); struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs); @@ -50,6 +52,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max); int qcc_recv_max_streams(struct qcc *qcc, uint64_t max, int bidi); int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size); int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err); +int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream); static inline int qmux_stream_rx_bufsz(void) { diff --git a/include/haproxy/mux_quic_qos.h b/include/haproxy/mux_quic_qos.h index 79dcdf943..627bcf797 100644 --- a/include/haproxy/mux_quic_qos.h +++ b/include/haproxy/mux_quic_qos.h @@ -5,4 +5,8 @@ int qcc_qos_recv(struct qcc *qcc); +int qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream); + +int qcc_qos_send_tp(struct qcc *qcc); + #endif /* _HAPROXY_MUX_QUIC_QOS_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 120505150..8408634d0 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -613,6 +613,9 @@ static uint64_t qcs_prep_bytes(const struct qcs *qcs) } } +/* Callback for notification about emission of a STREAM frame of length + * starting at . + */ void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset) { struct qcc *qcc = qcs->qcc; @@ -2575,11 +2578,6 @@ static int _qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); - if (LIST_ISEMPTY(frms)) { - TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); - return -1; - } - if (stream && qcc_is_pacing_active(qcc->conn)) pacer = &qcc->tx.pacer; @@ -2613,97 +2611,15 @@ static int _qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) * Returns 0 if all data sent with success. On fatal error, a negative error * code is returned. A positive 1 is used if emission should be paced. */ -static int _qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream) +int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) { - struct connection *conn = qcc->conn; - struct quic_frame *frm, *frm_old; - unsigned char *pos, *old, *end; - size_t ret; - - TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); - list_for_each_entry_safe(frm, frm_old, frms, list) { - loop: - struct quic_frame *split_frm = NULL, *old_frm; - - b_reset(&trash); - old = pos = (unsigned char *)b_orig(&trash); - end = (unsigned char *)b_wrap(&trash); - - TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0, - "frm type %02llx", (ullong)frm->type); - - if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) { - size_t flen, split_size; - - flen = quic_strm_frm_fillbuf(end - pos, frm, &split_size); - if (!flen) - continue; - - if (split_size) { - split_frm = quic_strm_frm_split(frm, split_size); - if (!split_frm) { - ABORT_NOW(); - continue; - } - - old_frm = frm; - frm = split_frm; - } - - } - - qc_build_frm(&pos, end, frm, NULL, NULL); - BUG_ON(pos - old > global.tune.bufsize); - BUG_ON(pos == old); - b_add(&trash, pos - old); - - ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0); - if (!ret) { - TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn); - if (split_frm) - LIST_INSERT(frms, &split_frm->list); - break; - } - - if (ret != b_data(&trash)) { - /* TODO */ - ABORT_NOW(); - } - - if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) { - qcs_on_data_sent(frm->stream.stream, - frm->stream.len, frm->stream.offset); - } - - LIST_DEL_INIT(&frm->list); - if (split_frm) { - frm = old_frm; - goto loop; - } + if (LIST_ISEMPTY(frms)) { + TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); + return -1; } - if (conn->flags & CO_FL_ERROR) { - /* TODO */ - //ABORT_NOW(); - } - else if (!LIST_ISEMPTY(frms) && !(qcc->wait_event.events & SUB_RETRY_SEND)) { - conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event); - } - - TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); - return 0; -} - -/* Wrapper for send on transport layer. Send a list of frames for the - * connection . - * - * Returns 0 if all data sent with success. On fatal error, a negative error - * code is returned. A positive 1 is used if emission should be paced. - */ -static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) -{ return qmux_is_quic(qcc) ? _qcc_send_frames(qcc, frms, stream) : - _qcc_qos_send_frames(qcc, frms, stream); + qcc_qos_send_frames(qcc, frms, stream); } /* Emit a RESET_STREAM on . @@ -3096,7 +3012,7 @@ static int qcc_io_send(struct qcc *qcc) TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); - if (qcc_is_pacing_active(qcc->conn)) { + if (qmux_is_quic(qcc) && qcc_is_pacing_active(qcc->conn)) { /* Always reset pacing_task timer to prevent unnecessary execution. */ qcc->pacing_task->expire = TICK_ETERNITY; } @@ -3107,6 +3023,14 @@ static int qcc_io_send(struct qcc *qcc) * apply for STREAM frames. */ + if (!qmux_is_quic(qcc)) { + if (!(qcc->flags & QC_CF_QSTP_SENT)) { + if (qcc_qos_send_tp(qcc)) + return 0; + qcc->flags |= QC_CF_QSTP_SENT; + } + } + /* Check for transport error. */ if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); @@ -3154,30 +3078,35 @@ static int qcc_io_send(struct qcc *qcc) goto out; } - if (!LIST_ISEMPTY(frms) && qcc_is_pacing_active(qcc->conn)) { - if (!quic_pacing_reload(&qcc->tx.pacer)) { - qcc_wakeup_pacing(qcc); - total = 0; - goto out; + if (qmux_is_quic(qcc)) { + if (!LIST_ISEMPTY(frms) && qcc_is_pacing_active(qcc->conn)) { + if (!quic_pacing_reload(&qcc->tx.pacer)) { + qcc_wakeup_pacing(qcc); + total = 0; + goto out; + } } } - /* Retry sending until no frame to send, data rejected or connection - * flow-control limit reached. + /* Retry sending until no frame to send, data rejected or + * connection flow-control limit reached. */ while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { window_conn = qfctl_rcap(&qcc->tx.fc); resent = 0; - /* Reloop over . Useful for streams which have - * fulfilled their qc_stream_desc buf and have now release it. + /* Reloop over . Useful for streams + * which have fulfilled their qc_stream_desc buf and + * have now release it. */ list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) { - /* Only streams blocked on flow-control or waiting on a - * new qc_stream_desc should be present in send_list as - * long as transport layer can handle all data. + /* Only streams blocked on flow-control or + * waiting on a new qc_stream_desc should be + * present in send_list as long as transport + * layer can handle all data. */ - BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc)); + BUG_ON((!qmux_is_quic(qcc) || qcs->stream->buf) && + !qfctl_rblocked(&qcs->tx.fc)); /* Total sent bytes must not exceed connection window. */ BUG_ON(resent > window_conn); @@ -3197,7 +3126,7 @@ static int qcc_io_send(struct qcc *qcc) if (ret == 1) { /* qcc_send_frames cannot return 1 if pacing not used. */ - BUG_ON(!qcc_is_pacing_active(qcc->conn)); + BUG_ON(!qmux_is_quic(qcc) || !qcc_is_pacing_active(qcc->conn)); qcc_wakeup_pacing(qcc); } @@ -3447,7 +3376,7 @@ static int qcc_io_process(struct qcc *qcc) /* If using listener socket, soft-stop is not supported. The * connection must be closed immediately. */ - if (!qc_test_fd(qcc->conn->handle.qc)) { + if (qmux_is_quic(qcc) && !qc_test_fd(qcc->conn->handle.qc)) { TRACE_DEVEL("proxy disabled with listener socket, closing connection", QMUX_EV_QCC_WAKE, qcc->conn); qcc->conn->flags |= (CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH); qcc_io_send(qcc); @@ -3614,6 +3543,9 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int state) conn_in_list = 0; } + if (!qmux_is_quic(qcc)) + qcc_io_recv(qcc); + if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_io_send(qcc); @@ -3627,7 +3559,7 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int state) qcc_refresh_timeout(qcc); /* Trigger pacing task is emission should be retried after some delay. */ - if (qcc_is_pacing_active(conn)) { + if (qmux_is_quic(qcc) && qcc_is_pacing_active(conn)) { if (tick_isset(qcc->pacing_task->expire)) task_queue(qcc->pacing_task); } @@ -3775,8 +3707,6 @@ static void _qcc_init(struct qcc *qcc) LIST_INIT(&qcc->tx.frms); } -struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status); - static int qmux_init(struct connection *conn, struct proxy *prx, struct session *sess, struct buffer *input) { @@ -3800,6 +3730,9 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->glitches = 0; qcc->err = quic_err_transport(QC_ERR_NO_ERROR); + if (strcmp(conn->mux->name, "QOS") == 0) + qcc->flags |= QC_CF_QOS; + if (qmux_is_quic(qcc)) { /* Server parameters, params used for RX flow control. */ lparams = &conn->handle.qc->rx.params; @@ -3881,7 +3814,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, LIST_INIT(&qcc->buf_wait_list); LIST_INIT(&qcc->purg_list); - qcc->wait_event.tasklet->process = qmux_is_quic(qcc) ? qcc_io_cb : qcc_qos_io_cb; + qcc->wait_event.tasklet->process = qcc_io_cb; qcc->wait_event.tasklet->context = qcc; qcc->wait_event.tasklet->state |= TASK_F_WANTS_TIME; qcc->wait_event.events = 0; @@ -3926,6 +3859,13 @@ static int qmux_init(struct connection *conn, struct proxy *prx, goto err; } } + else { + /* TODO hardcoded HTTP/3 ops */ + if (qcc_install_app_ops(qcc, &h3_ops)) { + TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn); + goto err; + } + } if (qcc->app_ops == &h3_ops && !conn_is_back(conn)) proxy_inc_fe_cum_sess_ver_ctr(sess->listener, prx, 3); @@ -3937,18 +3877,20 @@ static int qmux_init(struct connection *conn, struct proxy *prx, /* init read cycle */ tasklet_wakeup(qcc->wait_event.tasklet); - /* MUX is initialized before QUIC handshake completion if early data - * received. Flag connection to delay stream processing if - * wait-for-handshake is active. - */ - if (conn->handle.qc->state < QUIC_HS_ST_COMPLETE) { - if (!(conn->flags & CO_FL_EARLY_SSL_HS)) { - TRACE_STATE("flag connection with early data", QMUX_EV_QCC_WAKE, conn); - conn->flags |= CO_FL_EARLY_SSL_HS; - /* subscribe for handshake completion */ - conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, - &qcc->wait_event); - qcc->flags |= QC_CF_WAIT_HS; + if (qmux_is_quic(qcc)) { + /* MUX is initialized before QUIC handshake completion if early data + * received. Flag connection to delay stream processing if + * wait-for-handshake is active. + */ + if (conn->handle.qc->state < QUIC_HS_ST_COMPLETE) { + if (!(conn->flags & CO_FL_EARLY_SSL_HS)) { + TRACE_STATE("flag connection with early data", QMUX_EV_QCC_WAKE, conn); + conn->flags |= CO_FL_EARLY_SSL_HS; + /* subscribe for handshake completion */ + conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, + &qcc->wait_event); + qcc->flags |= QC_CF_WAIT_HS; + } } } } @@ -4719,154 +4661,6 @@ static struct mux_proto_list mux_proto_quic = INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic); -static int qcc_qos_send_tp(struct qcc *qcc) -{ - struct quic_frame *frm; - struct list list = LIST_HEAD_INIT(list); - - TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); - - frm = qc_frm_alloc(QUIC_FT_QS_TP); - if (!frm) { - TRACE_ERROR("frame alloc failure", QMUX_EV_QCC_SEND, qcc->conn); - goto err; - } - - LIST_APPEND(&list, &frm->list); - if (qcc_send_frames(qcc, &list, 0)) { - TRACE_DEVEL("QoS frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); - goto err; - } - - TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); - return 0; - - err: - TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); - return -1; -} - -static int qcc_qos_io_send(struct qcc *qcc) -{ - struct list *frms = &qcc->tx.frms; - /* Temporary list for QCS on error. */ - struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); - struct qcs *qcs, *qcs_tmp; - uint64_t window_conn __maybe_unused = qfctl_rcap(&qcc->tx.fc); - int ret __maybe_unused = 0, total = 0, resent __maybe_unused; - - TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); - - if (!(qcc->flags & QC_CF_QSTP_SENT)) { - if (qcc_qos_send_tp(qcc)) - return 0; - qcc->flags |= QC_CF_QSTP_SENT; - } - - /* Check for transport error. */ - if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { - TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); - goto out; - } - - /* Check for locally detected connection error. */ - if (qcc->flags & QC_CF_ERRL) { - /* Prepare a CONNECTION_CLOSE if not already done. */ - if (!(qcc->flags & QC_CF_ERRL_DONE)) { - TRACE_DATA("report a connection error", QMUX_EV_QCC_SEND|QMUX_EV_QCC_ERR, qcc->conn); - quic_set_connection_close(qcc->conn->handle.qc, qcc->err); - qcc->flags |= QC_CF_ERRL_DONE; - } - goto out; - } - - if (qcc->app_st < QCC_APP_ST_INIT) { - if (qcc_app_init(qcc)) - goto out; - } - - if (qcc->conn->flags & CO_FL_SOCK_WR_SH) { - qcc->conn->flags |= CO_FL_ERROR; - TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); - goto out; - } - - if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { - if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) { - TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); - goto out; - } - } - - if (qcc_emit_rs_ss(qcc)) { - TRACE_DEVEL("emission interrupted on STOP_SENDING/RESET_STREAM send error", QMUX_EV_QCC_SEND, qcc->conn); - goto out; - } - - /* Encode new STREAM frames if list has been previously cleared. */ - if (LIST_ISEMPTY(frms) && !LIST_ISEMPTY(&qcc->send_list)) { - total = qcc_build_frms(qcc, &qcs_failed); - if (LIST_ISEMPTY(frms)) - goto out; - } - - ret = qcc_send_frames(qcc, frms, 1); - - out: - /* Re-insert on-error QCS at the end of the send-list. */ - if (!LIST_ISEMPTY(&qcs_failed)) { - list_for_each_entry_safe(qcs, qcs_tmp, &qcs_failed, el_send) { - LIST_DEL_INIT(&qcs->el_send); - LIST_APPEND(&qcc->send_list, &qcs->el_send); - } - - if (!qfctl_rblocked(&qcc->tx.fc)) - tasklet_wakeup(qcc->wait_event.tasklet); - } - - if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) { - TRACE_ERROR("error reported by transport layer", - QMUX_EV_QCC_SEND, qcc->conn); - qcc->flags |= QC_CF_ERR_CONN; - } - - TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); - return total; -} - -struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status) -{ - struct qcc *qcc = ctx; - - TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - - qcc_io_recv(qcc); - - if (!(qcc->wait_event.events & SUB_RETRY_SEND)) - qcc_qos_io_send(qcc); - - qcc_io_recv(qcc); - - if (qcc_io_process(qcc)) { - TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn); - goto release; - } - - qcc_refresh_timeout(qcc); - - TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); - - return t; - - release: - qcc_shutdown(qcc); - qcc_release(qcc); - - TRACE_LEAVE(QMUX_EV_QCC_WAKE); - - return NULL; -} - static const struct mux_ops qmux_qos_ops = { .init = qmux_init, .destroy = qmux_destroy, diff --git a/src/mux_quic_qos.c b/src/mux_quic_qos.c index c86579811..a214092ef 100644 --- a/src/mux_quic_qos.c +++ b/src/mux_quic_qos.c @@ -78,3 +78,117 @@ int qcc_qos_recv(struct qcc *qcc) err: return -1; } + +/* Wrapper for send on transport layer. Send a list of frames for the + * connection . + * + * Returns 0 if all data sent with success. On fatal error, a negative error + * code is returned. A positive 1 is used if emission should be paced. + */ +int qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream) +{ + struct connection *conn = qcc->conn; + struct quic_frame *frm, *frm_old; + unsigned char *pos, *old, *end; + size_t ret; + + TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); + list_for_each_entry_safe(frm, frm_old, frms, list) { + loop: + struct quic_frame *split_frm = NULL, *old_frm; + + b_reset(&trash); + old = pos = (unsigned char *)b_orig(&trash); + end = (unsigned char *)b_wrap(&trash); + + TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0, + "frm type %02llx", (ullong)frm->type); + + if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) { + size_t flen, split_size; + + flen = quic_strm_frm_fillbuf(end - pos, frm, &split_size); + if (!flen) + continue; + + if (split_size) { + split_frm = quic_strm_frm_split(frm, split_size); + if (!split_frm) { + ABORT_NOW(); + continue; + } + + old_frm = frm; + frm = split_frm; + } + + } + + qc_build_frm(&pos, end, frm, NULL, NULL); + BUG_ON(pos - old > global.tune.bufsize); + BUG_ON(pos == old); + b_add(&trash, pos - old); + + ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0); + if (!ret) { + TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn); + if (split_frm) + LIST_INSERT(frms, &split_frm->list); + break; + } + + if (ret != b_data(&trash)) { + /* TODO */ + ABORT_NOW(); + } + + if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) { + qcs_on_data_sent(frm->stream.stream, + frm->stream.len, frm->stream.offset); + } + + LIST_DEL_INIT(&frm->list); + if (split_frm) { + frm = old_frm; + goto loop; + } + } + + if (conn->flags & CO_FL_ERROR) { + /* TODO */ + //ABORT_NOW(); + } + else if (!LIST_ISEMPTY(frms) && !(qcc->wait_event.events & SUB_RETRY_SEND)) { + conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event); + } + + TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); + return 0; +} + +int qcc_qos_send_tp(struct qcc *qcc) +{ + struct quic_frame *frm; + struct list list = LIST_HEAD_INIT(list); + + TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); + + frm = qc_frm_alloc(QUIC_FT_QS_TP); + if (!frm) { + TRACE_ERROR("frame alloc failure", QMUX_EV_QCC_SEND, qcc->conn); + goto err; + } + + LIST_APPEND(&list, &frm->list); + if (qcc_send_frames(qcc, &list, 0)) { + TRACE_DEVEL("QoS frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); + goto err; + } + + TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); + return 0; + + err: + TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); + return -1; +}