This commit is contained in:
Amaury Denoyelle 2024-10-31 10:32:44 +01:00
parent 0443ee67cc
commit ee1c7ad0ff
5 changed files with 82 additions and 24 deletions

View File

@ -7,7 +7,10 @@
struct quic_pacer {
struct list frms;
const struct quic_cc_path *path;
ullong next;
unsigned int curr;
unsigned int next;
int sent;
};
#endif /* _HAPROXY_QUIC_PACING_T_H */

View File

@ -11,7 +11,10 @@ static inline void quic_pacing_init(struct quic_pacer *pacer,
{
LIST_INIT(&pacer->frms);
pacer->path = path;
pacer->next = 0;
pacer->curr = now_ms;
pacer->next = now_ms;
pacer->sent = 0;
}
static inline void quic_pacing_reset(struct quic_pacer *pacer)
@ -30,9 +33,10 @@ static inline struct list *quic_pacing_frms(struct quic_pacer *pacer)
return &pacer->frms;
}
static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer)
static inline int quic_pacing_pkt_ms(const struct quic_pacer *pacer)
{
return pacer->path->loss.srtt * 1000000 / (pacer->path->cwnd / pacer->path->mtu + 1);
return (pacer->path->cwnd / (pacer->path->mtu + 1)) /
(pacer->path->loss.srtt + 1) + 1;
}
int quic_pacing_expired(const struct quic_pacer *pacer);

View File

@ -267,12 +267,14 @@ static inline int qcc_is_dead(const struct qcc *qcc)
/* Return true if the mux timeout should be armed. */
static inline int qcc_may_expire(struct qcc *qcc)
{
return !qcc->nb_sc;
//return !qcc->nb_sc;
return 1;
}
/* Refresh the timeout on <qcc> if needed depending on its state. */
static void qcc_refresh_timeout(struct qcc *qcc)
{
#if 0
const struct proxy *px = qcc->proxy;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
@ -386,18 +388,25 @@ static void qcc_refresh_timeout(struct qcc *qcc)
leave:
TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
#endif
}
void qcc_wakeup(struct qcc *qcc)
{
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
qcc->task->expire = TICK_ETERNITY;
task_queue(qcc->task);
}
static void qcc_wakeup_pacing(struct qcc *qcc)
{
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
//HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
//tasklet_wakeup(qcc->wait_event.tasklet);
qcc->task->expire = qcc->tx.pacer.next;
BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
task_queue(qcc->task);
}
/* Mark a stream as open if it was idle. This can be used on every
@ -2774,7 +2783,7 @@ static void qcc_release(struct qcc *qcc)
TRACE_LEAVE(QMUX_EV_QCC_END);
}
static void qcc_purge_sending(struct qcc *qcc)
static int qcc_purge_sending(struct qcc *qcc)
{
struct quic_conn *qc = qcc->conn->handle.qc;
struct quic_pacer *pacer = &qcc->tx.pacer;
@ -2783,9 +2792,10 @@ static void qcc_purge_sending(struct qcc *qcc)
ret = quic_pacing_send(pacer, qc);
if (ret == QUIC_TX_ERR_AGAIN) {
BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
qcc_wakeup_pacing(qcc);
return 1;
}
else if (ret == QUIC_TX_ERR_FATAL) {
if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
qcc_subscribe_send(qcc);
@ -2794,6 +2804,8 @@ static void qcc_purge_sending(struct qcc *qcc)
if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
qcc_subscribe_send(qcc);
}
return 0;
}
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
@ -2844,12 +2856,14 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
}
if (!qcc_may_expire(qcc)) {
ABORT_NOW();
TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
t->expire = TICK_ETERNITY;
goto requeue;
}
}
#if 0
task_destroy(t);
if (!qcc) {
@ -2870,12 +2884,23 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
qcc_shutdown(qcc);
qcc_release(qcc);
}
#endif
if (qcc_purge_sending(qcc)) {
t->expire = qcc->tx.pacer.next;
goto requeue;
}
else {
t->expire = TICK_ETERNITY;
goto requeue;
}
out:
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
return NULL;
requeue:
BUG_ON(tick_is_expired(t->expire, now_ms));
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
return t;
}
@ -2984,7 +3009,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
}
qcc->task->process = qcc_timeout_task;
qcc->task->context = qcc;
qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
//qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
qcc->task->expire = TICK_ETERNITY;
qcc_reset_idle_start(qcc);
LIST_INIT(&qcc->opening_list);

View File

@ -6,7 +6,7 @@ struct quic_conn;
int quic_pacing_expired(const struct quic_pacer *pacer)
{
return !pacer->next || pacer->next <= now_mono_time();
return tick_is_expired(pacer->next, now_ms);
}
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
@ -25,5 +25,18 @@ enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
{
pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;
const int pkt_ms = quic_pacing_pkt_ms(pacer);
if (pacer->curr == now_ms) {
pacer->sent += sent;
}
else {
pacer->curr = now_ms;
pacer->sent = sent;
}
if (pacer->sent >= pkt_ms) {
pacer->next = now_ms + (pacer->sent / pkt_ms);
fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms);
}
}

View File

@ -495,13 +495,20 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
}
if (pacer) {
const ullong ns_pkts = quic_pacing_ns_pkt(pacer);
max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
//const ullong ns_pkts = quic_pacing_ns_pkt(pacer);
//max_dgram = global.tune.quic_frontend_max_tx_burst * 1000000 / (ns_pkts + 1) + 1;
const int pkt_ms = quic_pacing_pkt_ms(pacer);
max_dgram = pkt_ms;
if (global.tune.quic_frontend_max_tx_burst)
max_dgram *= global.tune.quic_frontend_max_tx_burst;
fprintf(stderr, "max_dgram = %d (%lu/%d)\n", max_dgram, qc->path->cwnd, qc->path->loss.srtt);
}
TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
qel_register_send(&send_list, qc->ael, frms);
sent = qc_send(qc, 0, &send_list, max_dgram);
BUG_ON(max_dgram && sent > max_dgram);
if (sent <= 0) {
ret = QUIC_TX_ERR_FATAL;
}
@ -552,6 +559,7 @@ static inline void qc_select_tls_ver(struct quic_conn *qc,
static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
struct list *qels, int max_dgrams)
{
//int max_dgrams_copy = max_dgrams;
int ret, cc, padding;
struct quic_tx_packet *first_pkt, *prv_pkt;
unsigned char *end, *pos;
@ -609,13 +617,11 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
TRACE_PROTO("TX prep pkts", QUIC_EV_CONN_PHPKTS, qc, qel);
/* Start to decrement <max_dgrams> after the first packet built. */
if (!dglen && pos != (unsigned char *)b_head(buf)) {
if (max_dgrams && !--max_dgrams) {
BUG_ON(LIST_ISEMPTY(frms));
TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
goto out;
}
TRACE_PRINTF(TRACE_LEVEL_ERROR, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "%d/%d", dgram_cnt, max_dgrams);
if (max_dgrams && dgram_cnt == max_dgrams) {
BUG_ON(LIST_ISEMPTY(frms));
TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
goto out;
}
if (!first_pkt)
@ -678,6 +684,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
wrlen >= QUIC_INITIAL_PACKET_MINLEN)) {
qc_txb_store(buf, wrlen, first_pkt);
++dgram_cnt;
BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
}
TRACE_PROTO("could not prepare anymore packet", QUIC_EV_CONN_PHPKTS, qc, qel);
break;
@ -748,6 +755,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
dglen = 0;
++dgram_cnt;
BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
/* man 7 udp UDP_SEGMENT
* The segment size must be chosen such that at
@ -763,6 +771,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
padding = 0;
prv_pkt = NULL;
++dgram_cnt;
BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
gso_dgram_cnt = 0;
}
@ -778,7 +787,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
out:
if (first_pkt) {
qc_txb_store(buf, wrlen, first_pkt);
++dgram_cnt;
//++dgram_cnt;
}
if (cc && total) {
@ -787,8 +796,10 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
qc->tx.cc_dgram_len = dglen;
}
BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
ret = dgram_cnt;
leave:
TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QUIC_EV_CONN_PHPKTS, qc, 0, 0, 0, "ret=%d", ret);
TRACE_LEAVE(QUIC_EV_CONN_PHPKTS, qc);
return ret;
}
@ -849,7 +860,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list,
BUG_ON_HOT(b_data(buf));
b_reset(buf);
prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams);
prep_pkts = qc_prep_pkts(qc, buf, send_list, max_dgrams ? max_dgrams - ret : 0);
if (b_data(buf) && !qc_send_ppkts(buf, qc->xprt_ctx)) {
ret = -1;
@ -864,6 +875,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list,
}
ret += prep_pkts;
BUG_ON(max_dgrams && ret > max_dgrams);
if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) {
TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc);
break;