MAJOR: quic: implement accept queue

Do not proceed to direct accept when creating a new quic_conn. Wait for
the QUIC handshake to succeeds to insert the quic_conn in the accept
queue. A tasklet is then woken up to call listener_accept to accept the
quic_conn.

The most important effect is that the connection/mux layers are not
instantiated at the same time as the quic_conn. This forces to delay
some process to be sure that the mux is allocated :
* initialization of mux transport parameters
* installation of the app-ops

Also, the mux instance is not checked now to wake up the quic_conn
tasklet. This is safe because the xprt-quic code is now ready to handle
the absence of the connection/mux layers.

Note that this commit has a deep impact as it changes significantly the
lower QUIC architecture. Most notably, it breaks the 0-RTT feature.
This commit is contained in:
Amaury Denoyelle 2022-01-19 16:01:05 +01:00
parent f68b2cb816
commit cfa2d5648f
7 changed files with 61 additions and 33 deletions

View File

@ -39,6 +39,8 @@ int quic_sock_accepting_conn(const struct receiver *rx);
struct connection *quic_sock_accept_conn(struct listener *l, int *status); struct connection *quic_sock_accept_conn(struct listener *l, int *status);
void quic_sock_fd_iocb(int fd); void quic_sock_fd_iocb(int fd);
void quic_accept_push_qc(struct quic_conn *qc);
#endif /* USE_QUIC */ #endif /* USE_QUIC */
#endif /* _HAPROXY_QUIC_SOCK_H */ #endif /* _HAPROXY_QUIC_SOCK_H */

View File

@ -65,7 +65,6 @@ struct receiver {
struct rx_settings *settings; /* points to the settings used by this receiver */ struct rx_settings *settings; /* points to the settings used by this receiver */
struct list proto_list; /* list in the protocol header */ struct list proto_list; /* list in the protocol header */
#ifdef USE_QUIC #ifdef USE_QUIC
struct mt_list pkts; /* QUIC Initial packets to accept new connections */
struct eb_root odcids; /* QUIC original destination connection IDs. */ struct eb_root odcids; /* QUIC original destination connection IDs. */
struct eb_root cids; /* QUIC connection IDs. */ struct eb_root cids; /* QUIC connection IDs. */
__decl_thread(HA_RWLOCK_T cids_lock); /* RW lock for connection IDs tree accesses */ __decl_thread(HA_RWLOCK_T cids_lock); /* RW lock for connection IDs tree accesses */

View File

@ -732,6 +732,7 @@ struct quic_conn {
struct quic_path *path; struct quic_path *path;
struct listener *li; /* only valid for frontend connections */ struct listener *li; /* only valid for frontend connections */
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
/* MUX */ /* MUX */
struct qcc *qcc; struct qcc *qcc;
struct task *timer_task; struct task *timer_task;

View File

@ -520,7 +520,6 @@ static void quic_add_listener(struct protocol *proto, struct listener *listener)
{ {
listener->flags |= LI_F_QUIC_LISTENER; listener->flags |= LI_F_QUIC_LISTENER;
MT_LIST_INIT(&listener->rx.pkts);
listener->rx.odcids = EB_ROOT_UNIQUE; listener->rx.odcids = EB_ROOT_UNIQUE;
listener->rx.cids = EB_ROOT_UNIQUE; listener->rx.cids = EB_ROOT_UNIQUE;
listener->rx.flags |= RX_F_LOCAL_ACCEPT; listener->rx.flags |= RX_F_LOCAL_ACCEPT;

View File

@ -140,30 +140,25 @@ int quic_sock_accepting_conn(const struct receiver *rx)
struct connection *quic_sock_accept_conn(struct listener *l, int *status) struct connection *quic_sock_accept_conn(struct listener *l, int *status)
{ {
struct quic_conn *qc; struct quic_conn *qc;
struct quic_rx_packet *pkt; struct li_per_thread *lthr = &l->per_thr[tid];
int ret;
qc = NULL; qc = MT_LIST_POP(&lthr->quic_accept.conns, struct quic_conn *, accept_list);
pkt = MT_LIST_POP(&l->rx.pkts, struct quic_rx_packet *, rx_list); if (!qc)
/* Should never happen. */ goto done;
if (!pkt)
if (!new_quic_cli_conn(qc, l, &qc->peer_addr))
goto err; goto err;
qc = pkt->qc;
if (!new_quic_cli_conn(qc, l, &pkt->saddr))
goto err;
ret = CO_AC_DONE;
done: done:
if (status) *status = CO_AC_DONE;
*status = ret;
return qc ? qc->conn : NULL; return qc ? qc->conn : NULL;
err: err:
ret = CO_AC_PAUSE; /* in case of error reinsert the element to process it later. */
goto done; MT_LIST_INSERT(&lthr->quic_accept.conns, &qc->accept_list);
*status = CO_AC_PAUSE;
return NULL;
} }
/* Function called on a read event from a listening socket. It tries /* Function called on a read event from a listening socket. It tries
@ -228,6 +223,32 @@ void quic_sock_fd_iocb(int fd)
/* per-thread accept queues */ /* per-thread accept queues */
struct quic_accept_queue *quic_accept_queues; struct quic_accept_queue *quic_accept_queues;
/* Install <qc> on the queue ready to be accepted. The queue task is then woken
* up.
*/
void quic_accept_push_qc(struct quic_conn *qc)
{
struct quic_accept_queue *queue = &quic_accept_queues[qc->tid];
struct li_per_thread *lthr = &qc->li->per_thr[qc->tid];
BUG_ON(MT_LIST_INLIST(&qc->accept_list));
/* 1. insert the listener in the accept queue
*
* Use TRY_APPEND as there is a possible race even with INLIST if
* multiple threads try to add the same listener instance from several
* quic_conn.
*/
if (!MT_LIST_INLIST(&(lthr->quic_accept.list)))
MT_LIST_TRY_APPEND(&queue->listeners, &(lthr->quic_accept.list));
/* 2. insert the quic_conn in the listener per-thread queue. */
MT_LIST_APPEND(&lthr->quic_accept.conns, &qc->accept_list);
/* 3. wake up the queue tasklet */
tasklet_wakeup(quic_accept_queues[qc->tid].tasklet);
}
/* Tasklet handler to accept QUIC connections. Call listener_accept on every /* Tasklet handler to accept QUIC connections. Call listener_accept on every
* listener instances registered in the accept queue. * listener instances registered in the accept queue.
*/ */

View File

@ -2523,8 +2523,6 @@ int ssl_sock_switchctx_cbk(SSL *ssl, int *al, void *arg)
if (!quic_transport_params_store(qc, 0, extension_data, if (!quic_transport_params_store(qc, 0, extension_data,
extension_data + extension_len)) extension_data + extension_len))
goto abort; goto abort;
quic_mux_transport_params_update(qc->qcc);
} }
#endif /* USE_QUIC */ #endif /* USE_QUIC */

View File

@ -44,6 +44,7 @@
#include <haproxy/quic_cc.h> #include <haproxy/quic_cc.h>
#include <haproxy/quic_frame.h> #include <haproxy/quic_frame.h>
#include <haproxy/quic_loss.h> #include <haproxy/quic_loss.h>
#include <haproxy/quic_sock.h>
#include <haproxy/cbuf.h> #include <haproxy/cbuf.h>
#include <haproxy/quic_tls.h> #include <haproxy/quic_tls.h>
#include <haproxy/sink.h> #include <haproxy/sink.h>
@ -1048,12 +1049,6 @@ int quic_set_app_ops(struct quic_conn *qc, const unsigned char *alpn, size_t alp
else else
return 0; return 0;
if (qcc_install_app_ops(qc->qcc, qc->app_ops))
return 0;
/* mux-quic can now be considered ready. */
qc->mux_state = QC_MUX_READY;
return 1; return 1;
} }
@ -1892,10 +1887,14 @@ static inline int qc_provide_cdata(struct quic_enc_level *el,
} }
TRACE_PROTO("SSL handshake OK", QUIC_EV_CONN_HDSHK, qc, &state); TRACE_PROTO("SSL handshake OK", QUIC_EV_CONN_HDSHK, qc, &state);
if (qc_is_listener(ctx->qc)) if (qc_is_listener(ctx->qc)) {
HA_ATOMIC_STORE(&qc->state, QUIC_HS_ST_CONFIRMED); HA_ATOMIC_STORE(&qc->state, QUIC_HS_ST_CONFIRMED);
else /* The connection is ready to be accepted. */
quic_accept_push_qc(qc);
}
else {
HA_ATOMIC_STORE(&qc->state, QUIC_HS_ST_COMPLETE); HA_ATOMIC_STORE(&qc->state, QUIC_HS_ST_COMPLETE);
}
} else { } else {
ssl_err = SSL_process_quic_post_handshake(ctx->ssl); ssl_err = SSL_process_quic_post_handshake(ctx->ssl);
if (ssl_err != 1) { if (ssl_err != 1) {
@ -3662,6 +3661,9 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
qc->path = &qc->paths[0]; qc->path = &qc->paths[0];
quic_path_init(qc->path, ipv4, default_quic_cc_algo, qc); quic_path_init(qc->path, ipv4, default_quic_cc_algo, qc);
/* required to use MTLIST_IN_LIST */
MT_LIST_INIT(&qc->accept_list);
TRACE_LEAVE(QUIC_EV_CONN_INIT, qc); TRACE_LEAVE(QUIC_EV_CONN_INIT, qc);
return qc; return qc;
@ -4557,9 +4559,6 @@ static ssize_t qc_lstnr_pkt_rcv(unsigned char *buf, const unsigned char *end,
if (likely(!qc_to_purge)) { if (likely(!qc_to_purge)) {
/* Enqueue this packet. */ /* Enqueue this packet. */
pkt->qc = qc; pkt->qc = qc;
MT_LIST_APPEND(&l->rx.pkts, &pkt->rx_list);
/* Try to accept a new connection. */
listener_accept(l);
} }
else { else {
quic_conn_drop(qc_to_purge); quic_conn_drop(qc_to_purge);
@ -4663,7 +4662,7 @@ static ssize_t qc_lstnr_pkt_rcv(unsigned char *buf, const unsigned char *end,
* initialized. * initialized.
*/ */
conn_ctx = HA_ATOMIC_LOAD(&qc->xprt_ctx); conn_ctx = HA_ATOMIC_LOAD(&qc->xprt_ctx);
if (conn_ctx && HA_ATOMIC_LOAD(&qc->qcc)) if (conn_ctx)
tasklet_wakeup(conn_ctx->wait_event.tasklet); tasklet_wakeup(conn_ctx->wait_event.tasklet);
TRACE_LEAVE(QUIC_EV_CONN_LPKT, qc ? qc : NULL, pkt); TRACE_LEAVE(QUIC_EV_CONN_LPKT, qc ? qc : NULL, pkt);
@ -5484,6 +5483,15 @@ static int qc_xprt_start(struct connection *conn, void *ctx)
return 0; return 0;
} }
quic_mux_transport_params_update(qc->qcc);
if (qcc_install_app_ops(qc->qcc, qc->app_ops)) {
TRACE_PROTO("Cannot install app layer", QUIC_EV_CONN_LPKT, qc);
return 0;
}
/* mux-quic can now be considered ready. */
qc->mux_state = QC_MUX_READY;
tasklet_wakeup(qctx->wait_event.tasklet); tasklet_wakeup(qctx->wait_event.tasklet);
return 1; return 1;
} }