diff --git a/include/haproxy/protocol-t.h b/include/haproxy/protocol-t.h index 0c5bd9e1c..a37c9256a 100644 --- a/include/haproxy/protocol-t.h +++ b/include/haproxy/protocol-t.h @@ -119,7 +119,14 @@ struct protocol { void (*ignore_events)(struct connection *conn, int event_type); /* unsubscribe from socket events */ int (*get_src)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's source address; -1=fail */ int (*get_dst)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's dest address; -1=fail */ - int (*set_affinity)(struct connection *conn, int new_tid); + + /* functions related to thread affinity update */ + /* prepare rebind connection on a new thread, may fail */ + int (*set_affinity1)(struct connection *conn, int new_tid); + /* complete connection thread rebinding, no error possible */ + void (*set_affinity2)(struct connection *conn); + /* cancel connection thread rebinding */ + void (*reset_affinity)(struct connection *conn); /* functions acting on the receiver */ int (*rx_suspend)(struct receiver *rx); /* temporarily suspend this receiver for a soft restart */ diff --git a/include/haproxy/quic_conn.h b/include/haproxy/quic_conn.h index a3de4ab11..c7005c06c 100644 --- a/include/haproxy/quic_conn.h +++ b/include/haproxy/quic_conn.h @@ -177,7 +177,7 @@ void qc_kill_conn(struct quic_conn *qc); int qc_parse_hd_form(struct quic_rx_packet *pkt, unsigned char **buf, const unsigned char *end); -int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li); +int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li); void qc_finalize_affinity_rebind(struct quic_conn *qc); int qc_handle_conn_migration(struct quic_conn *qc, const struct sockaddr_storage *peer_addr, diff --git a/src/listener.c b/src/listener.c index a3485580b..5d07a6650 100644 --- a/src/listener.c +++ b/src/listener.c @@ -105,11 +105,14 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) } -/* tries to push a new accepted connection into ring . Returns - * non-zero if it succeeds, or zero if the ring is full. Supports multiple - * producers. +/* Tries to push a new accepted connection into ring . + * is called if not NULL just prior to the push operation. + * + * Returns non-zero if it succeeds, or zero if the ring is full. Supports + * multiple producers. */ -int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn) +int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn, + void (*accept_push_cb)(struct connection *)) { unsigned int pos, next; uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */ @@ -124,6 +127,9 @@ int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn next |= (idx & 0xffff0000U); } while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax())); + if (accept_push_cb) + accept_push_cb(conn); + ring->entry[pos] = conn; __ha_barrier_store(); return 1; @@ -1013,6 +1019,7 @@ static inline int listener_uses_maxconn(const struct listener *l) */ void listener_accept(struct listener *l) { + void (*li_set_affinity2)(struct connection *); struct connection *cli_conn; struct proxy *p; unsigned int max_accept; @@ -1023,6 +1030,7 @@ void listener_accept(struct listener *l) int ret; p = l->bind_conf->frontend; + li_set_affinity2 = l->rx.proto ? l->rx.proto->set_affinity2 : NULL; /* if l->bind_conf->maxaccept is -1, then max_accept is UINT_MAX. It is * not really illimited, but it is probably enough. @@ -1461,8 +1469,8 @@ void listener_accept(struct listener *l) * reservation in the target ring. */ - if (l->rx.proto && l->rx.proto->set_affinity) { - if (l->rx.proto->set_affinity(cli_conn, t)) { + if (l->rx.proto && l->rx.proto->set_affinity1) { + if (l->rx.proto->set_affinity1(cli_conn, t)) { /* Failed migration, stay on the same thread. */ goto local_accept; } @@ -1475,15 +1483,19 @@ void listener_accept(struct listener *l) * when processing this loop. */ ring = &accept_queue_rings[t]; - if (accept_queue_push_mp(ring, cli_conn)) { + if (accept_queue_push_mp(ring, cli_conn, li_set_affinity2)) { _HA_ATOMIC_INC(&activity[t].accq_pushed); tasklet_wakeup(ring->tasklet); + continue; } /* If the ring is full we do a synchronous accept on * the local thread here. */ _HA_ATOMIC_INC(&activity[t].accq_full); + + if (l->rx.proto && l->rx.proto->reset_affinity) + l->rx.proto->reset_affinity(cli_conn); } #endif // USE_THREAD diff --git a/src/proto_quic.c b/src/proto_quic.c index d03123e1e..4b8f0c20f 100644 --- a/src/proto_quic.c +++ b/src/proto_quic.c @@ -61,7 +61,7 @@ static int quic_bind_listener(struct listener *listener, char *errmsg, int errle static int quic_connect_server(struct connection *conn, int flags); static void quic_enable_listener(struct listener *listener); static void quic_disable_listener(struct listener *listener); -static int quic_set_affinity(struct connection *conn, int new_tid); +static int quic_set_affinity1(struct connection *conn, int new_tid); /* Note: must not be declared as its list will be overwritten */ struct protocol proto_quic4 = { @@ -80,7 +80,7 @@ struct protocol proto_quic4 = { .get_src = quic_sock_get_src, .get_dst = quic_sock_get_dst, .connect = quic_connect_server, - .set_affinity = quic_set_affinity, + .set_affinity1 = quic_set_affinity1, /* binding layer */ .rx_suspend = udp_suspend_receiver, @@ -124,7 +124,7 @@ struct protocol proto_quic6 = { .get_src = quic_sock_get_src, .get_dst = quic_sock_get_dst, .connect = quic_connect_server, - .set_affinity = quic_set_affinity, + .set_affinity1 = quic_set_affinity1, /* binding layer */ .rx_suspend = udp_suspend_receiver, @@ -668,10 +668,10 @@ static void quic_disable_listener(struct listener *l) * target is a listener, and the caller is responsible for guaranteeing that * the listener assigned to the connection is bound to the requested thread. */ -static int quic_set_affinity(struct connection *conn, int new_tid) +static int quic_set_affinity1(struct connection *conn, int new_tid) { struct quic_conn *qc = conn->handle.qc; - return qc_set_tid_affinity(qc, new_tid, objt_listener(conn->target)); + return qc_set_tid_affinity1(qc, new_tid, objt_listener(conn->target)); } static int quic_alloc_dghdlrs(void) diff --git a/src/proto_rhttp.c b/src/proto_rhttp.c index a6fc95574..abd28d0cf 100644 --- a/src/proto_rhttp.c +++ b/src/proto_rhttp.c @@ -39,7 +39,7 @@ struct protocol proto_rhttp = { .unbind = rhttp_unbind_receiver, .resume = default_resume_listener, .accept_conn = rhttp_accept_conn, - .set_affinity = rhttp_set_affinity, + .set_affinity1 = rhttp_set_affinity, /* address family */ .fam = &proto_fam_rhttp, diff --git a/src/quic_conn.c b/src/quic_conn.c index 46c74d943..24e52d036 100644 --- a/src/quic_conn.c +++ b/src/quic_conn.c @@ -1730,7 +1730,7 @@ void qc_notify_err(struct quic_conn *qc) * * Returns 0 on success else non-zero. */ -int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li) +int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li) { struct task *t1 = NULL, *t2 = NULL; struct tasklet *t3 = NULL;