From 3d685fcb7dc4082ea68608b03d0e5eee400c15c8 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Tue, 17 Jun 2025 16:43:20 +0200 Subject: [PATCH] MINOR: xprt: Add recvmsg() and sendmsg() parameters to rcv_buf() and snd_buf(). In rcv_buf() and snd_buf(), use sendmsg/recvmsg instead of send and recv, and add two new optional parameters to provide msg_control and msg_controllen. Those are unused for now, but will be used later for kTLS. --- include/haproxy/connection-t.h | 4 ++-- src/connection.c | 2 +- src/mux_fcgi.c | 6 +++--- src/mux_h1.c | 6 +++--- src/mux_h2.c | 6 +++--- src/mux_pt.c | 4 ++-- src/mux_spop.c | 6 +++--- src/raw_sock.c | 31 ++++++++++++++++++++++++++----- src/ssl_sock.c | 12 ++++++++---- src/xprt_handshake.c | 4 ++-- 10 files changed, 53 insertions(+), 28 deletions(-) diff --git a/include/haproxy/connection-t.h b/include/haproxy/connection-t.h index 571943131..69a41d74e 100644 --- a/include/haproxy/connection-t.h +++ b/include/haproxy/connection-t.h @@ -439,8 +439,8 @@ union conn_handle { * and the other ones are used to setup and release the transport layer. */ struct xprt_ops { - size_t (*rcv_buf)(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags); /* recv callback */ - size_t (*snd_buf)(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags); /* send callback */ + size_t (*rcv_buf)(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags); /* recv callback */ + size_t (*snd_buf)(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags); /* send callback */ int (*rcv_pipe)(struct connection *conn, void *xprt_ctx, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */ int (*snd_pipe)(struct connection *conn, void *xprt_ctx, struct pipe *pipe, unsigned int count); /* send-to-pipe callback */ void (*shutr)(struct connection *conn, void *xprt_ctx, int); /* shutr function */ diff --git a/src/connection.c b/src/connection.c index 18b4a4111..9c5a0fe7a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -908,7 +908,7 @@ int conn_ctrl_send(struct connection *conn, const void *buf, int len, int flags) /* snd_buf() already takes care of updating conn->flags and handling * the FD polling status. */ - ret = xprt->snd_buf(conn, NULL, &buffer, buffer.data, flags); + ret = xprt->snd_buf(conn, NULL, &buffer, buffer.data, NULL, 0, flags); if (conn->flags & CO_FL_ERROR) ret = -1; return ret; diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index b217ec8f5..0b143b893 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -2844,7 +2844,7 @@ static int fcgi_recv(struct fcgi_conn *fconn) else max = b_room(buf); - ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, 0) : 0; + ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, NULL, NULL, 0) : 0; if (max && !ret && fcgi_recv_allowed(fconn)) { TRACE_DATA("failed to receive data, subscribing", FCGI_EV_FCONN_RECV, conn); @@ -2948,7 +2948,7 @@ static int fcgi_send(struct fcgi_conn *fconn) if (b_data(buf)) { int ret; - ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), flags); + ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0, flags); if (!ret) { done = 1; break; @@ -3341,7 +3341,7 @@ do_leave: for (buf = br_head(fconn->mbuf); b_size(buf); buf = br_del_head(fconn->mbuf)) { if (b_data(buf)) { int ret = fconn->conn->xprt->snd_buf(fconn->conn, fconn->conn->xprt_ctx, - buf, b_data(buf), 0); + buf, b_data(buf), NULL, 0, 0); if (!ret) break; b_del(buf, ret); diff --git a/src/mux_h1.c b/src/mux_h1.c index 67c55fd3b..204964613 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -3931,7 +3931,7 @@ static int h1_recv(struct h1c *h1c) */ h1c->ibuf.head = sizeof(struct htx); } - ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &h1c->ibuf, max, flags); + ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &h1c->ibuf, max, NULL, 0, flags); HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, ret); } @@ -3993,7 +3993,7 @@ static int h1_send(struct h1c *h1c) if (h1c->flags & H1C_F_CO_STREAMER) flags |= CO_SFL_STREAMER; - ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &h1c->obuf, b_data(&h1c->obuf), flags); + ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &h1c->obuf, b_data(&h1c->obuf), NULL, 0, flags); if (ret > 0) { TRACE_DATA("data sent", H1_EV_H1C_SEND, h1c->conn, 0, 0, (size_t[]){ret}); if (h1c->flags & H1C_F_OUT_FULL) { @@ -5097,7 +5097,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) #endif if (!sdo->iobuf.pipe) { b_add(sdo->iobuf.buf, sdo->iobuf.offset); - ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, flags); + ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, NULL, NULL, flags); if (ret < try) { TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn); h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); diff --git a/src/mux_h2.c b/src/mux_h2.c index f16e93cdb..65eb133e5 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -4754,7 +4754,7 @@ static int h2_recv(struct h2c *h2c) else max = b_room(buf); - ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, 0) : 0; + ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, NULL, NULL, 0) : 0; if (max && !ret && h2_recv_allowed(h2c)) { TRACE_DATA("failed to receive data, subscribing", H2_EV_H2C_RECV, h2c->conn); @@ -4862,7 +4862,7 @@ static int h2_send(struct h2c *h2c) for (buf = br_head(h2c->mbuf); b_size(buf); buf = br_del_head(h2c->mbuf)) { if (b_data(buf)) { - int ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), + int ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0, flags | (to_send > 1 ? CO_SFL_MSG_MORE : 0)); if (!ret) { done = 1; @@ -5282,7 +5282,7 @@ do_leave: for (buf = br_head(h2c->mbuf); b_size(buf); buf = br_del_head(h2c->mbuf)) { if (b_data(buf)) { - int ret = h2c->conn->xprt->snd_buf(h2c->conn, h2c->conn->xprt_ctx, buf, b_data(buf), 0); + int ret = h2c->conn->xprt->snd_buf(h2c->conn, h2c->conn->xprt_ctx, buf, b_data(buf), NULL, 0, 0); if (!ret) break; b_del(buf, ret); diff --git a/src/mux_pt.c b/src/mux_pt.c index 2c96a2f80..5a26a153e 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -525,7 +525,7 @@ static size_t mux_pt_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count goto end; } b_realign_if_empty(buf); - ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, count, flags); + ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, count, NULL, NULL, flags); if (conn->flags & CO_FL_ERROR) { mux_pt_report_term_evt(ctx, muxc_tevt_type_rcv_err); se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); @@ -554,7 +554,7 @@ static size_t mux_pt_snd_buf(struct stconn *sc, struct buffer *buf, size_t count TRACE_ENTER(PT_EV_TX_DATA, conn, sc, buf, (size_t[]){count}); - ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, count, flags); + ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, count, NULL, 0, flags); if (ret > 0) b_del(buf, ret); diff --git a/src/mux_spop.c b/src/mux_spop.c index 020d121d5..10355f148 100644 --- a/src/mux_spop.c +++ b/src/mux_spop.c @@ -2368,7 +2368,7 @@ static int spop_recv(struct spop_conn *spop_conn) } max = b_room(buf); - ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, 0) : 0; + ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, NULL, NULL, 0) : 0; if (max && !ret && spop_recv_allowed(spop_conn)) { TRACE_DATA("failed to receive data, subscribing", SPOP_EV_SPOP_CONN_RECV, conn); @@ -2469,7 +2469,7 @@ static int spop_send(struct spop_conn *spop_conn) if (b_data(buf)) { int ret; - ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), flags); + ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0, flags); if (!ret) { done = 1; break; @@ -2808,7 +2808,7 @@ do_leave: for (buf = br_head(spop_conn->mbuf); b_size(buf); buf = br_del_head(spop_conn->mbuf)) { if (b_data(buf)) { int ret = spop_conn->conn->xprt->snd_buf(spop_conn->conn, spop_conn->conn->xprt_ctx, - buf, b_data(buf), 0); + buf, b_data(buf), NULL, 0, 0); if (!ret) break; b_del(buf, ret); diff --git a/src/raw_sock.c b/src/raw_sock.c index 9264e3130..a769c20f4 100644 --- a/src/raw_sock.c +++ b/src/raw_sock.c @@ -235,7 +235,7 @@ int raw_sock_from_pipe(struct connection *conn, void *xprt_ctx, struct pipe *pip * errno is cleared before starting so that the caller knows that if it spots an * error without errno, it's pending and can be retrieved via getsockopt(SO_ERROR). */ -static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags) +static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags) { ssize_t ret; size_t try, done = 0; @@ -273,6 +273,9 @@ static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu * EINTR too. */ while (count > 0) { + struct msghdr msg; + struct iovec iov; + try = b_contig_space(buf); if (!try) break; @@ -280,7 +283,17 @@ static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu if (try > count) try = count; - ret = recv(conn->handle.fd, b_tail(buf), try, 0); + memset(&msg, 0, sizeof(msg)); + msg.msg_control = msg_control; + if (msg_controllen) + msg.msg_controllen = *msg_controllen; + iov.iov_base = b_tail(buf); + iov.iov_len = try; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + ret = recvmsg(conn->handle.fd, &msg, 0); + if (ret > 0 && msg_controllen != NULL) + *msg_controllen = msg.msg_controllen; if (ret > 0) { b_add(buf, ret); @@ -367,7 +380,7 @@ static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu * is responsible for this. It's up to the caller to update the buffer's contents * based on the return value. */ -static size_t raw_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags) +static size_t raw_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags) { ssize_t ret; size_t try, done; @@ -405,15 +418,23 @@ static size_t raw_sock_from_buf(struct connection *conn, void *xprt_ctx, const s * in which case we accept to do it once again. */ while (count) { + struct msghdr msg; + struct iovec iov; try = b_contig_data(buf, done); if (try > count) try = count; send_flag = MSG_DONTWAIT | MSG_NOSIGNAL; + memset(&msg, 0, sizeof(msg)); + msg.msg_control = msg_control; + msg.msg_controllen = msg_controllen; + iov.iov_base = b_peek(buf, done); + iov.iov_len = try; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; if (try < count || flags & CO_SFL_MSG_MORE) send_flag |= MSG_MORE; - - ret = send(conn->handle.fd, b_peek(buf, done), try, send_flag); + ret = sendmsg(conn->handle.fd, &msg, send_flag); if (ret > 0) { count -= ret; diff --git a/src/ssl_sock.c b/src/ssl_sock.c index e06239efb..ad8259070 100644 --- a/src/ssl_sock.c +++ b/src/ssl_sock.c @@ -269,7 +269,7 @@ static int ha_ssl_write(BIO *h, const char *buf, int num) tmpbuf.data = num; tmpbuf.head = 0; flags = (ctx->xprt_st & SSL_SOCK_SEND_MORE) ? CO_SFL_MSG_MORE : 0; - ret = ctx->xprt->snd_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, num, flags); + ret = ctx->xprt->snd_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, num, NULL, 0, flags); BIO_clear_retry_flags(h); if (ret == 0 && !(ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH))) { BIO_set_retry_write(h); @@ -301,7 +301,7 @@ static int ha_ssl_read(BIO *h, char *buf, int size) tmpbuf.area = buf; tmpbuf.data = 0; tmpbuf.head = 0; - ret = ctx->xprt->rcv_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, size, 0); + ret = ctx->xprt->rcv_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, size, NULL, NULL, 0); BIO_clear_retry_flags(h); if (ret == 0 && !(ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))) { BIO_set_retry_read(h); @@ -5904,7 +5904,7 @@ leave: * avoiding the call if inappropriate. The function does not call the * connection's polling update function, so the caller is responsible for this. */ -static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags) +static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags) { struct ssl_sock_ctx *ctx = xprt_ctx; ssize_t ret; @@ -5915,6 +5915,8 @@ static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu if (!ctx) goto out_error; + BUG_ON_HOT(msg_control != NULL); + #ifdef SSL_READ_EARLY_DATA_SUCCESS if (b_data(&ctx->early_buf)) { try = b_contig_space(buf); @@ -6054,7 +6056,7 @@ static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu * caller to take care of this. It's up to the caller to update the buffer's * contents based on the return value. */ -static size_t ssl_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags) +static size_t ssl_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags) { struct ssl_sock_ctx *ctx = xprt_ctx; ssize_t ret; @@ -6067,6 +6069,8 @@ static size_t ssl_sock_from_buf(struct connection *conn, void *xprt_ctx, const s if (!ctx) goto out_error; + BUG_ON_HOT(msg_control != NULL); + if (conn->flags & (CO_FL_WAIT_XPRT | CO_FL_SSL_WAIT_HS | CO_FL_EARLY_SSL_HS)) { /* a handshake was requested */ TRACE_LEAVE(SSL_EV_CONN_SEND, conn); diff --git a/src/xprt_handshake.c b/src/xprt_handshake.c index 9e7bdccca..9e81aa4a4 100644 --- a/src/xprt_handshake.c +++ b/src/xprt_handshake.c @@ -25,12 +25,12 @@ DECLARE_STATIC_TYPED_POOL(xprt_handshake_ctx_pool, "xprt_handshake_ctx", struct /* This XPRT doesn't take care of sending or receiving data, once its handshake * is done, it just removes itself */ -static size_t xprt_handshake_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags) +static size_t xprt_handshake_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags) { return 0; } -static size_t xprt_handshake_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags) +static size_t xprt_handshake_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags) { return 0; }