MAJOR: mux-quic: support pacing emission

This commit is contained in:
Amaury Denoyelle 2024-10-25 16:55:43 +02:00
parent 64472afb99
commit 2ba0856739
3 changed files with 81 additions and 14 deletions

View File

@ -37,6 +37,8 @@ static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer)
int quic_pacing_expired(const struct quic_pacer *pacer); int quic_pacing_expired(const struct quic_pacer *pacer);
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc);
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent); void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
#endif /* _HAPROXY_QUIC_PACING_H */ #endif /* _HAPROXY_QUIC_PACING_H */

View File

@ -390,6 +390,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
void qcc_wakeup(struct qcc *qcc) void qcc_wakeup(struct qcc *qcc)
{ {
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
}
static void qcc_wakeup_pacing(struct qcc *qcc)
{
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet); tasklet_wakeup(qcc->wait_event.tasklet);
} }
@ -2078,18 +2085,22 @@ static int qcc_subscribe_send(struct qcc *qcc)
* *
* Returns 0 if all data sent with success else non-zero. * Returns 0 if all data sent with success else non-zero.
*/ */
static int qcc_send_frames(struct qcc *qcc, struct list *frms) static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
{ {
enum quic_tx_err ret; enum quic_tx_err ret;
struct quic_pacer *pacer = NULL;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (LIST_ISEMPTY(frms)) { if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
return 1; return -1;
} }
ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL); if (stream)
pacer = &qcc->tx.pacer;
ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
if (ret == QUIC_TX_ERR_FATAL) { if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc); qcc_subscribe_send(qcc);
@ -2099,18 +2110,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
/* If there is frames left at this stage, transport layer is blocked. /* If there is frames left at this stage, transport layer is blocked.
* Subscribe on it to retry later. * Subscribe on it to retry later.
*/ */
if (!LIST_ISEMPTY(frms)) { if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc); qcc_subscribe_send(qcc);
goto err; goto err;
} }
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return 0; return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;
err: err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
return 1; return -1;
} }
/* Emit a RESET_STREAM on <qcs>. /* Emit a RESET_STREAM on <qcs>.
@ -2135,7 +2146,7 @@ static int qcs_send_reset(struct qcs *qcs)
frm->reset_stream.final_size = qcs->tx.fc.off_real; frm->reset_stream.final_size = qcs->tx.fc.off_real;
LIST_APPEND(&frms, &frm->list); LIST_APPEND(&frms, &frm->list);
if (qcc_send_frames(qcs->qcc, &frms)) { if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms)) if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcs->qcc->conn->handle.qc, &frm); qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@ -2186,7 +2197,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
frm->stop_sending.app_error_code = qcs->err; frm->stop_sending.app_error_code = qcs->err;
LIST_APPEND(&frms, &frm->list); LIST_APPEND(&frms, &frm->list);
if (qcc_send_frames(qcs->qcc, &frms)) { if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms)) if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcc->conn->handle.qc, &frm); qc_frm_free(qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@ -2254,11 +2265,12 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
static int qcc_io_send(struct qcc *qcc) static int qcc_io_send(struct qcc *qcc)
{ {
/* Temporary list for QCS on error. */ /* Temporary list for QCS on error. */
struct list *frms = quic_pacing_frms(&qcc->tx.pacer); struct quic_pacer *pacer = &qcc->tx.pacer;
struct list *frms = quic_pacing_frms(pacer);
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
int ret, total = 0, resent; int ret = 0, total = 0, resent;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
@ -2268,6 +2280,8 @@ static int qcc_io_send(struct qcc *qcc)
* apply for STREAM frames. * apply for STREAM frames.
*/ */
quic_pacing_reset(pacer);
/* Check for transport error. */ /* Check for transport error. */
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@ -2292,7 +2306,7 @@ static int qcc_io_send(struct qcc *qcc)
} }
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
if (qcc_send_frames(qcc, &qcc->lfctl.frms)) { if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto out; goto out;
} }
@ -2368,10 +2382,15 @@ static int qcc_io_send(struct qcc *qcc)
} }
} }
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
qcc_wakeup_pacing(qcc);
return 1;
}
/* Retry sending until no frame to send, data rejected or connection /* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached. * flow-control limit reached.
*/ */
while ((ret = qcc_send_frames(qcc, frms)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
window_conn = qfctl_rcap(&qcc->tx.fc); window_conn = qfctl_rcap(&qcc->tx.fc);
resent = 0; resent = 0;
@ -2402,8 +2421,13 @@ static int qcc_io_send(struct qcc *qcc)
} }
sent_done: sent_done:
/* Deallocate frames that the transport layer has rejected. */ if (ret == 1) {
quic_pacing_reset(&qcc->tx.pacer); qcc_wakeup_pacing(qcc);
}
else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
/* Deallocate frames that the transport layer has rejected. */
quic_pacing_reset(pacer);
}
/* Re-insert on-error QCS at the end of the send-list. */ /* Re-insert on-error QCS at the end of the send-list. */
if (!LIST_ISEMPTY(&qcs_failed)) { if (!LIST_ISEMPTY(&qcs_failed)) {
@ -2750,12 +2774,39 @@ static void qcc_release(struct qcc *qcc)
TRACE_LEAVE(QMUX_EV_QCC_END); TRACE_LEAVE(QMUX_EV_QCC_END);
} }
static void qcc_purge_sending(struct qcc *qcc)
{
struct quic_conn *qc = qcc->conn->handle.qc;
struct quic_pacer *pacer = &qcc->tx.pacer;
enum quic_tx_err ret;
ret = quic_pacing_send(pacer, qc);
if (ret == QUIC_TX_ERR_AGAIN) {
BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
qcc_wakeup_pacing(qcc);
}
else if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
qcc_subscribe_send(qcc);
}
else {
if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
qcc_subscribe_send(qcc);
}
}
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
{ {
struct qcc *qcc = ctx; struct qcc *qcc = ctx;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
if (status & TASK_F_USR1) {
qcc_purge_sending(qcc);
return NULL;
}
if (!(qcc->wait_event.events & SUB_RETRY_SEND)) if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc); qcc_io_send(qcc);

View File

@ -9,6 +9,20 @@ int quic_pacing_expired(const struct quic_pacer *pacer)
return !pacer->next || pacer->next <= now_mono_time(); return !pacer->next || pacer->next <= now_mono_time();
} }
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
{
enum quic_tx_err ret;
if (!quic_pacing_expired(pacer))
return QUIC_TX_ERR_AGAIN;
BUG_ON(LIST_ISEMPTY(&pacer->frms));
ret = qc_send_mux(qc, &pacer->frms, pacer);
/* TODO handle QUIC_TX_ERR_FATAL */
return ret;
}
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent) void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
{ {
pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent; pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;