MEDIUM: mux-quic: reduce pacing CPU usage with passive wait

Pacing algorithm has been revamped in the previous commit to implement a
credit based solution. This is a far more adaptative solution, in
particular which allow to catch up in case pause between pacing emission
was longer than expected.

This allows QMUX to remove the active loop based on tasklet wake-up.
Instead, a new task is used when emission should be paced. The main
advantage is that CPU usage is drastically reduced.

New pacing task timer is reset each time qcc_io_send() is invoked. Timer
will be set only if pacing engine reports that emission must be
interrupted. In this case timer is set via qcc_wakeup_pacing() to the
delay reported by congestion algorithm, or 1ms if delay is too short. At
the end of qcc_io_cb(), pacing task is queued if timer has been set.

Pacing task execution is simple enough : it immediately wakes up QCC I/O
handler.

Note that to have decent performance, it requires to have a large enough
burst defined in configuration of quic-cc-algo. However, this value is
common to every listener clients, which may cause too much loss under
network conditions. This will be address in a future patch.

This should be backported up to 3.1.
This commit is contained in:
Amaury Denoyelle 2025-01-22 17:31:10 +01:00
parent 4489a61585
commit 8098be1fdc
5 changed files with 71 additions and 34 deletions

View File

@ -17269,14 +17269,13 @@ quic-cc-algo { cubic | newreno | bbr | nocc }[(<args,...>)]
It is possible to enable pacing if the algorithm is compatible. This is done It is possible to enable pacing if the algorithm is compatible. This is done
by specifying an optional burst argument as described in the next paragraph. by specifying an optional burst argument as described in the next paragraph.
The purpose of pacing is to smooth emission of data to reduce network losses. The purpose of pacing is to smooth emission of data to reduce network losses.
In some scenario, it can significantly improve network throughput by avoiding In most scenario, it can significantly improve network throughput by avoiding
retransmissions. However, it can also increase CPU usage if haproxy is forced retransmissions. Pacing support is still experimental, as such it requires
to wait too long between each emission. Pacing support is still experimental, "expose-experimental-directives". The BBR congestion control algorithm
as such it requires "expose-experimental-directives". The BBR congestion depends on the pacing support which is in this case implicitly enabled by
control algorithm depends on the pacing support which is in this case choosing the "bbr" algorithm. Note that haproxy's BBR implementation is still
implicitly enabled by choosing the "bbr" algorithm. Note that haproxy's BBR considered as experimental and cannot be enabled without
implementation is still considered as experimental and cannot be enabled "expose-experimental-directives".
without "expose-experimental-directives".
For further customization, a list of parameters can be specified after the For further customization, a list of parameters can be specified after the
algorithm token. It must be written between parenthesis, separated by a algorithm token. It must be written between parenthesis, separated by a

View File

@ -89,6 +89,7 @@ struct qcc {
struct list purg_list; /* list of qcs which can be purged */ struct list purg_list; /* list of qcs which can be purged */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */ struct wait_event wait_event; /* To be used if we're waiting for I/Os */
struct task *pacing_task; /* task used to wait when emission is interrupted due to pacing */
struct proxy *proxy; struct proxy *proxy;

View File

@ -14,8 +14,6 @@ static inline void quic_pacing_init(struct quic_pacer *pacer,
pacer->credit = cc->algo->pacing_burst(cc); pacer->credit = cc->algo->pacing_burst(cc);
} }
int quic_pacing_expired(const struct quic_pacer *pacer);
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent); void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
int quic_pacing_reload(struct quic_pacer *pacer); int quic_pacing_reload(struct quic_pacer *pacer);

View File

@ -2469,12 +2469,23 @@ static int qcc_build_frms(struct qcc *qcc, struct list *qcs_failed)
return total; return total;
} }
/* Schedule <qcc> after emission was interrupted on pacing. */
static void qcc_wakeup_pacing(struct qcc *qcc)
{
/* Sleep to be able to reemit at least a single packet */
const int inter = qcc->tx.pacer.cc->algo->pacing_inter(qcc->tx.pacer.cc);
/* Convert nano to milliseconds rounded up, with 1ms as minimal value. */
const int expire = MAX((inter + 999999) / 1000000, 1);
qcc->pacing_task->expire = tick_add_ifset(now_ms, MS_TO_TICKS(expire));
++qcc->tx.paced_sent_ctr;
}
/* Proceed to sending. Loop through all available streams for the <qcc> /* Proceed to sending. Loop through all available streams for the <qcc>
* instance and try to send as much as possible. * instance and try to send as much as possible.
* *
* Returns the total of bytes sent to the transport layer. * Returns the total of bytes sent to the transport layer.
*/ */
static int qcc_io_send(struct qcc *qcc, int after_pacing) static int qcc_io_send(struct qcc *qcc)
{ {
struct list *frms = &qcc->tx.frms; struct list *frms = &qcc->tx.frms;
/* Temporary list for QCS on error. */ /* Temporary list for QCS on error. */
@ -2485,6 +2496,11 @@ static int qcc_io_send(struct qcc *qcc, int after_pacing)
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (qcc_is_pacing_active(qcc->conn)) {
/* Always reset pacing_task timer to prevent unnecessary execution. */
qcc->pacing_task->expire = TICK_ETERNITY;
}
/* TODO if socket in transient error, sending should be temporarily /* TODO if socket in transient error, sending should be temporarily
* disabled for all frames. However, checking for send subscription is * disabled for all frames. However, checking for send subscription is
* not valid as this may be caused by a congestion error which only * not valid as this may be caused by a congestion error which only
@ -2535,9 +2551,7 @@ static int qcc_io_send(struct qcc *qcc, int after_pacing)
if (!LIST_ISEMPTY(frms) && qcc_is_pacing_active(qcc->conn)) { if (!LIST_ISEMPTY(frms) && qcc_is_pacing_active(qcc->conn)) {
if (!quic_pacing_reload(&qcc->tx.pacer)) { if (!quic_pacing_reload(&qcc->tx.pacer)) {
if (!after_pacing) qcc_wakeup_pacing(qcc);
++qcc->tx.paced_sent_ctr;
tasklet_wakeup(qcc->wait_event.tasklet, TASK_F_UEVT1);
total = 0; total = 0;
goto out; goto out;
} }
@ -2579,8 +2593,7 @@ static int qcc_io_send(struct qcc *qcc, int after_pacing)
if (ret == 1) { if (ret == 1) {
/* qcc_send_frames cannot return 1 if pacing not used. */ /* qcc_send_frames cannot return 1 if pacing not used. */
BUG_ON(!qcc_is_pacing_active(qcc->conn)); BUG_ON(!qcc_is_pacing_active(qcc->conn));
tasklet_wakeup(qcc->wait_event.tasklet, TASK_F_UEVT1); qcc_wakeup_pacing(qcc);
++qcc->tx.paced_sent_ctr;
} }
out: out:
@ -2708,7 +2721,7 @@ static void qcc_shutdown(struct qcc *qcc)
TRACE_STATE("perform graceful shutdown", QMUX_EV_QCC_END, qcc->conn); TRACE_STATE("perform graceful shutdown", QMUX_EV_QCC_END, qcc->conn);
if (qcc->app_ops && qcc->app_ops->shutdown) { if (qcc->app_ops && qcc->app_ops->shutdown) {
qcc->app_ops->shutdown(qcc->ctx); qcc->app_ops->shutdown(qcc->ctx);
qcc_io_send(qcc, 0); qcc_io_send(qcc);
} }
else { else {
qcc->err = quic_err_transport(QC_ERR_NO_ERROR); qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
@ -2780,7 +2793,7 @@ static int qcc_io_process(struct qcc *qcc)
if (!qc_test_fd(qcc->conn->handle.qc)) { if (!qc_test_fd(qcc->conn->handle.qc)) {
TRACE_DEVEL("proxy disabled with listener socket, closing connection", QMUX_EV_QCC_WAKE, qcc->conn); TRACE_DEVEL("proxy disabled with listener socket, closing connection", QMUX_EV_QCC_WAKE, qcc->conn);
qcc->conn->flags |= (CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH); qcc->conn->flags |= (CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH);
qcc_io_send(qcc, 0); qcc_io_send(qcc);
goto out; goto out;
} }
@ -2826,6 +2839,8 @@ static void qcc_release(struct qcc *qcc)
TRACE_ENTER(QMUX_EV_QCC_END, conn); TRACE_ENTER(QMUX_EV_QCC_END, conn);
task_destroy(qcc->pacing_task);
if (qcc->task) { if (qcc->task) {
task_destroy(qcc->task); task_destroy(qcc->task);
qcc->task = NULL; qcc->task = NULL;
@ -2893,17 +2908,10 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
{ {
struct qcc *qcc = ctx; struct qcc *qcc = ctx;
/* Check if woken up only for pacing but not yet expired. */
if ((status & (TASK_F_UEVT1|TASK_WOKEN_ANY)) == TASK_F_UEVT1 &&
!quic_pacing_expired(&qcc->tx.pacer)) {
/* hide any trace as no progress should be performed on this invokation. */
trace_disable();
}
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
if (!(qcc->wait_event.events & SUB_RETRY_SEND)) if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc, status & TASK_F_UEVT1); qcc_io_send(qcc);
qcc_io_recv(qcc); qcc_io_recv(qcc);
@ -2914,8 +2922,13 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
qcc_refresh_timeout(qcc); qcc_refresh_timeout(qcc);
/* Trigger pacing task is emission should be retried after some delay. */
if (qcc_is_pacing_active(qcc->conn)) {
if (tick_isset(qcc->pacing_task->expire))
task_queue(qcc->pacing_task);
}
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
trace_resume();
return NULL; return NULL;
@ -2924,11 +2937,31 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
qcc_release(qcc); qcc_release(qcc);
TRACE_LEAVE(QMUX_EV_QCC_WAKE); TRACE_LEAVE(QMUX_EV_QCC_WAKE);
trace_resume();
return NULL; return NULL;
} }
static struct task *qcc_pacing_task(struct task *t, void *ctx, unsigned int state)
{
struct qcc *qcc = ctx;
int expired = tick_is_expired(t->expire, now_ms);
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
if (!expired) {
if (!tick_isset(t->expire))
TRACE_DEVEL("cancelled pacing task", QMUX_EV_QCC_WAKE, qcc->conn);
goto requeue;
}
/* Reschedule I/O immediately. */
tasklet_wakeup_after(NULL, qcc->wait_event.tasklet);
requeue:
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
return t;
}
static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int state) static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int state)
{ {
struct qcc *qcc = ctx; struct qcc *qcc = ctx;
@ -2983,6 +3016,7 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
static void _qcc_init(struct qcc *qcc) static void _qcc_init(struct qcc *qcc)
{ {
qcc->conn = NULL; qcc->conn = NULL;
qcc->pacing_task = NULL;
qcc->task = NULL; qcc->task = NULL;
qcc->wait_event.tasklet = NULL; qcc->wait_event.tasklet = NULL;
qcc->app_ops = NULL; qcc->app_ops = NULL;
@ -3036,6 +3070,17 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
if (qcc_is_pacing_active(conn)) { if (qcc_is_pacing_active(conn)) {
quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc); quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc);
qcc->tx.paced_sent_ctr = 0; qcc->tx.paced_sent_ctr = 0;
/* Initialize pacing_task. */
qcc->pacing_task = task_new_here();
if (!qcc->pacing_task) {
TRACE_ERROR("pacing task alloc failure", QMUX_EV_QCC_NEW);
goto err;
}
qcc->pacing_task->process = qcc_pacing_task;
qcc->pacing_task->context = qcc;
qcc->pacing_task->expire = TICK_ETERNITY;
qcc->pacing_task->state |= TASK_F_WANTS_TIME;
} }
if (conn_is_back(conn)) { if (conn_is_back(conn)) {

View File

@ -3,12 +3,6 @@
#include <haproxy/quic_tx.h> #include <haproxy/quic_tx.h>
#include <haproxy/task.h> #include <haproxy/task.h>
/* Returns true if <pacer> timer is expired and emission can be retried. */
int quic_pacing_expired(const struct quic_pacer *pacer)
{
return pacer->credit;
}
/* Notify <pacer> about an emission of <sent> count of datagrams. */ /* Notify <pacer> about an emission of <sent> count of datagrams. */
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent) void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
{ {