MINOR: xprt: Add recvmsg() and sendmsg() parameters to rcv_buf() and snd_buf().

In rcv_buf() and snd_buf(), use sendmsg/recvmsg instead of send and
recv, and add two new optional parameters to provide msg_control and
msg_controllen.
Those are unused for now, but will be used later for kTLS.
This commit is contained in:
Olivier Houchard 2025-06-17 16:43:20 +02:00 committed by Olivier Houchard
parent 67cb6aab90
commit 3d685fcb7d
10 changed files with 53 additions and 28 deletions

View File

@ -439,8 +439,8 @@ union conn_handle {
* and the other ones are used to setup and release the transport layer. * and the other ones are used to setup and release the transport layer.
*/ */
struct xprt_ops { struct xprt_ops {
size_t (*rcv_buf)(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags); /* recv callback */ size_t (*rcv_buf)(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags); /* recv callback */
size_t (*snd_buf)(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags); /* send callback */ size_t (*snd_buf)(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags); /* send callback */
int (*rcv_pipe)(struct connection *conn, void *xprt_ctx, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */ int (*rcv_pipe)(struct connection *conn, void *xprt_ctx, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
int (*snd_pipe)(struct connection *conn, void *xprt_ctx, struct pipe *pipe, unsigned int count); /* send-to-pipe callback */ int (*snd_pipe)(struct connection *conn, void *xprt_ctx, struct pipe *pipe, unsigned int count); /* send-to-pipe callback */
void (*shutr)(struct connection *conn, void *xprt_ctx, int); /* shutr function */ void (*shutr)(struct connection *conn, void *xprt_ctx, int); /* shutr function */

View File

@ -908,7 +908,7 @@ int conn_ctrl_send(struct connection *conn, const void *buf, int len, int flags)
/* snd_buf() already takes care of updating conn->flags and handling /* snd_buf() already takes care of updating conn->flags and handling
* the FD polling status. * the FD polling status.
*/ */
ret = xprt->snd_buf(conn, NULL, &buffer, buffer.data, flags); ret = xprt->snd_buf(conn, NULL, &buffer, buffer.data, NULL, 0, flags);
if (conn->flags & CO_FL_ERROR) if (conn->flags & CO_FL_ERROR)
ret = -1; ret = -1;
return ret; return ret;

View File

@ -2844,7 +2844,7 @@ static int fcgi_recv(struct fcgi_conn *fconn)
else else
max = b_room(buf); max = b_room(buf);
ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, 0) : 0; ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, NULL, NULL, 0) : 0;
if (max && !ret && fcgi_recv_allowed(fconn)) { if (max && !ret && fcgi_recv_allowed(fconn)) {
TRACE_DATA("failed to receive data, subscribing", FCGI_EV_FCONN_RECV, conn); TRACE_DATA("failed to receive data, subscribing", FCGI_EV_FCONN_RECV, conn);
@ -2948,7 +2948,7 @@ static int fcgi_send(struct fcgi_conn *fconn)
if (b_data(buf)) { if (b_data(buf)) {
int ret; int ret;
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), flags); ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0, flags);
if (!ret) { if (!ret) {
done = 1; done = 1;
break; break;
@ -3341,7 +3341,7 @@ do_leave:
for (buf = br_head(fconn->mbuf); b_size(buf); buf = br_del_head(fconn->mbuf)) { for (buf = br_head(fconn->mbuf); b_size(buf); buf = br_del_head(fconn->mbuf)) {
if (b_data(buf)) { if (b_data(buf)) {
int ret = fconn->conn->xprt->snd_buf(fconn->conn, fconn->conn->xprt_ctx, int ret = fconn->conn->xprt->snd_buf(fconn->conn, fconn->conn->xprt_ctx,
buf, b_data(buf), 0); buf, b_data(buf), NULL, 0, 0);
if (!ret) if (!ret)
break; break;
b_del(buf, ret); b_del(buf, ret);

View File

@ -3931,7 +3931,7 @@ static int h1_recv(struct h1c *h1c)
*/ */
h1c->ibuf.head = sizeof(struct htx); h1c->ibuf.head = sizeof(struct htx);
} }
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &h1c->ibuf, max, flags); ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &h1c->ibuf, max, NULL, 0, flags);
HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, ret); HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, ret);
} }
@ -3993,7 +3993,7 @@ static int h1_send(struct h1c *h1c)
if (h1c->flags & H1C_F_CO_STREAMER) if (h1c->flags & H1C_F_CO_STREAMER)
flags |= CO_SFL_STREAMER; flags |= CO_SFL_STREAMER;
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &h1c->obuf, b_data(&h1c->obuf), flags); ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &h1c->obuf, b_data(&h1c->obuf), NULL, 0, flags);
if (ret > 0) { if (ret > 0) {
TRACE_DATA("data sent", H1_EV_H1C_SEND, h1c->conn, 0, 0, (size_t[]){ret}); TRACE_DATA("data sent", H1_EV_H1C_SEND, h1c->conn, 0, 0, (size_t[]){ret});
if (h1c->flags & H1C_F_OUT_FULL) { if (h1c->flags & H1C_F_OUT_FULL) {
@ -5097,7 +5097,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
#endif #endif
if (!sdo->iobuf.pipe) { if (!sdo->iobuf.pipe) {
b_add(sdo->iobuf.buf, sdo->iobuf.offset); b_add(sdo->iobuf.buf, sdo->iobuf.offset);
ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, flags); ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, NULL, NULL, flags);
if (ret < try) { if (ret < try) {
TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn); TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);

View File

@ -4754,7 +4754,7 @@ static int h2_recv(struct h2c *h2c)
else else
max = b_room(buf); max = b_room(buf);
ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, 0) : 0; ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, NULL, NULL, 0) : 0;
if (max && !ret && h2_recv_allowed(h2c)) { if (max && !ret && h2_recv_allowed(h2c)) {
TRACE_DATA("failed to receive data, subscribing", H2_EV_H2C_RECV, h2c->conn); TRACE_DATA("failed to receive data, subscribing", H2_EV_H2C_RECV, h2c->conn);
@ -4862,7 +4862,7 @@ static int h2_send(struct h2c *h2c)
for (buf = br_head(h2c->mbuf); b_size(buf); buf = br_del_head(h2c->mbuf)) { for (buf = br_head(h2c->mbuf); b_size(buf); buf = br_del_head(h2c->mbuf)) {
if (b_data(buf)) { if (b_data(buf)) {
int ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), int ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0,
flags | (to_send > 1 ? CO_SFL_MSG_MORE : 0)); flags | (to_send > 1 ? CO_SFL_MSG_MORE : 0));
if (!ret) { if (!ret) {
done = 1; done = 1;
@ -5282,7 +5282,7 @@ do_leave:
for (buf = br_head(h2c->mbuf); b_size(buf); buf = br_del_head(h2c->mbuf)) { for (buf = br_head(h2c->mbuf); b_size(buf); buf = br_del_head(h2c->mbuf)) {
if (b_data(buf)) { if (b_data(buf)) {
int ret = h2c->conn->xprt->snd_buf(h2c->conn, h2c->conn->xprt_ctx, buf, b_data(buf), 0); int ret = h2c->conn->xprt->snd_buf(h2c->conn, h2c->conn->xprt_ctx, buf, b_data(buf), NULL, 0, 0);
if (!ret) if (!ret)
break; break;
b_del(buf, ret); b_del(buf, ret);

View File

@ -525,7 +525,7 @@ static size_t mux_pt_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count
goto end; goto end;
} }
b_realign_if_empty(buf); b_realign_if_empty(buf);
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, count, flags); ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, count, NULL, NULL, flags);
if (conn->flags & CO_FL_ERROR) { if (conn->flags & CO_FL_ERROR) {
mux_pt_report_term_evt(ctx, muxc_tevt_type_rcv_err); mux_pt_report_term_evt(ctx, muxc_tevt_type_rcv_err);
se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
@ -554,7 +554,7 @@ static size_t mux_pt_snd_buf(struct stconn *sc, struct buffer *buf, size_t count
TRACE_ENTER(PT_EV_TX_DATA, conn, sc, buf, (size_t[]){count}); TRACE_ENTER(PT_EV_TX_DATA, conn, sc, buf, (size_t[]){count});
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, count, flags); ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, count, NULL, 0, flags);
if (ret > 0) if (ret > 0)
b_del(buf, ret); b_del(buf, ret);

View File

@ -2368,7 +2368,7 @@ static int spop_recv(struct spop_conn *spop_conn)
} }
max = b_room(buf); max = b_room(buf);
ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, 0) : 0; ret = max ? conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, max, NULL, NULL, 0) : 0;
if (max && !ret && spop_recv_allowed(spop_conn)) { if (max && !ret && spop_recv_allowed(spop_conn)) {
TRACE_DATA("failed to receive data, subscribing", SPOP_EV_SPOP_CONN_RECV, conn); TRACE_DATA("failed to receive data, subscribing", SPOP_EV_SPOP_CONN_RECV, conn);
@ -2469,7 +2469,7 @@ static int spop_send(struct spop_conn *spop_conn)
if (b_data(buf)) { if (b_data(buf)) {
int ret; int ret;
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), flags); ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0, flags);
if (!ret) { if (!ret) {
done = 1; done = 1;
break; break;
@ -2808,7 +2808,7 @@ do_leave:
for (buf = br_head(spop_conn->mbuf); b_size(buf); buf = br_del_head(spop_conn->mbuf)) { for (buf = br_head(spop_conn->mbuf); b_size(buf); buf = br_del_head(spop_conn->mbuf)) {
if (b_data(buf)) { if (b_data(buf)) {
int ret = spop_conn->conn->xprt->snd_buf(spop_conn->conn, spop_conn->conn->xprt_ctx, int ret = spop_conn->conn->xprt->snd_buf(spop_conn->conn, spop_conn->conn->xprt_ctx,
buf, b_data(buf), 0); buf, b_data(buf), NULL, 0, 0);
if (!ret) if (!ret)
break; break;
b_del(buf, ret); b_del(buf, ret);

View File

@ -235,7 +235,7 @@ int raw_sock_from_pipe(struct connection *conn, void *xprt_ctx, struct pipe *pip
* errno is cleared before starting so that the caller knows that if it spots an * errno is cleared before starting so that the caller knows that if it spots an
* error without errno, it's pending and can be retrieved via getsockopt(SO_ERROR). * error without errno, it's pending and can be retrieved via getsockopt(SO_ERROR).
*/ */
static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags) static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags)
{ {
ssize_t ret; ssize_t ret;
size_t try, done = 0; size_t try, done = 0;
@ -273,6 +273,9 @@ static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu
* EINTR too. * EINTR too.
*/ */
while (count > 0) { while (count > 0) {
struct msghdr msg;
struct iovec iov;
try = b_contig_space(buf); try = b_contig_space(buf);
if (!try) if (!try)
break; break;
@ -280,7 +283,17 @@ static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu
if (try > count) if (try > count)
try = count; try = count;
ret = recv(conn->handle.fd, b_tail(buf), try, 0); memset(&msg, 0, sizeof(msg));
msg.msg_control = msg_control;
if (msg_controllen)
msg.msg_controllen = *msg_controllen;
iov.iov_base = b_tail(buf);
iov.iov_len = try;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
ret = recvmsg(conn->handle.fd, &msg, 0);
if (ret > 0 && msg_controllen != NULL)
*msg_controllen = msg.msg_controllen;
if (ret > 0) { if (ret > 0) {
b_add(buf, ret); b_add(buf, ret);
@ -367,7 +380,7 @@ static size_t raw_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu
* is responsible for this. It's up to the caller to update the buffer's contents * is responsible for this. It's up to the caller to update the buffer's contents
* based on the return value. * based on the return value.
*/ */
static size_t raw_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags) static size_t raw_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags)
{ {
ssize_t ret; ssize_t ret;
size_t try, done; size_t try, done;
@ -405,15 +418,23 @@ static size_t raw_sock_from_buf(struct connection *conn, void *xprt_ctx, const s
* in which case we accept to do it once again. * in which case we accept to do it once again.
*/ */
while (count) { while (count) {
struct msghdr msg;
struct iovec iov;
try = b_contig_data(buf, done); try = b_contig_data(buf, done);
if (try > count) if (try > count)
try = count; try = count;
send_flag = MSG_DONTWAIT | MSG_NOSIGNAL; send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
memset(&msg, 0, sizeof(msg));
msg.msg_control = msg_control;
msg.msg_controllen = msg_controllen;
iov.iov_base = b_peek(buf, done);
iov.iov_len = try;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
if (try < count || flags & CO_SFL_MSG_MORE) if (try < count || flags & CO_SFL_MSG_MORE)
send_flag |= MSG_MORE; send_flag |= MSG_MORE;
ret = sendmsg(conn->handle.fd, &msg, send_flag);
ret = send(conn->handle.fd, b_peek(buf, done), try, send_flag);
if (ret > 0) { if (ret > 0) {
count -= ret; count -= ret;

View File

@ -269,7 +269,7 @@ static int ha_ssl_write(BIO *h, const char *buf, int num)
tmpbuf.data = num; tmpbuf.data = num;
tmpbuf.head = 0; tmpbuf.head = 0;
flags = (ctx->xprt_st & SSL_SOCK_SEND_MORE) ? CO_SFL_MSG_MORE : 0; flags = (ctx->xprt_st & SSL_SOCK_SEND_MORE) ? CO_SFL_MSG_MORE : 0;
ret = ctx->xprt->snd_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, num, flags); ret = ctx->xprt->snd_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, num, NULL, 0, flags);
BIO_clear_retry_flags(h); BIO_clear_retry_flags(h);
if (ret == 0 && !(ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH))) { if (ret == 0 && !(ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH))) {
BIO_set_retry_write(h); BIO_set_retry_write(h);
@ -301,7 +301,7 @@ static int ha_ssl_read(BIO *h, char *buf, int size)
tmpbuf.area = buf; tmpbuf.area = buf;
tmpbuf.data = 0; tmpbuf.data = 0;
tmpbuf.head = 0; tmpbuf.head = 0;
ret = ctx->xprt->rcv_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, size, 0); ret = ctx->xprt->rcv_buf(ctx->conn, ctx->xprt_ctx, &tmpbuf, size, NULL, NULL, 0);
BIO_clear_retry_flags(h); BIO_clear_retry_flags(h);
if (ret == 0 && !(ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))) { if (ret == 0 && !(ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))) {
BIO_set_retry_read(h); BIO_set_retry_read(h);
@ -5904,7 +5904,7 @@ leave:
* avoiding the call if inappropriate. The function does not call the * avoiding the call if inappropriate. The function does not call the
* connection's polling update function, so the caller is responsible for this. * connection's polling update function, so the caller is responsible for this.
*/ */
static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags) static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags)
{ {
struct ssl_sock_ctx *ctx = xprt_ctx; struct ssl_sock_ctx *ctx = xprt_ctx;
ssize_t ret; ssize_t ret;
@ -5915,6 +5915,8 @@ static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu
if (!ctx) if (!ctx)
goto out_error; goto out_error;
BUG_ON_HOT(msg_control != NULL);
#ifdef SSL_READ_EARLY_DATA_SUCCESS #ifdef SSL_READ_EARLY_DATA_SUCCESS
if (b_data(&ctx->early_buf)) { if (b_data(&ctx->early_buf)) {
try = b_contig_space(buf); try = b_contig_space(buf);
@ -6054,7 +6056,7 @@ static size_t ssl_sock_to_buf(struct connection *conn, void *xprt_ctx, struct bu
* caller to take care of this. It's up to the caller to update the buffer's * caller to take care of this. It's up to the caller to update the buffer's
* contents based on the return value. * contents based on the return value.
*/ */
static size_t ssl_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags) static size_t ssl_sock_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags)
{ {
struct ssl_sock_ctx *ctx = xprt_ctx; struct ssl_sock_ctx *ctx = xprt_ctx;
ssize_t ret; ssize_t ret;
@ -6067,6 +6069,8 @@ static size_t ssl_sock_from_buf(struct connection *conn, void *xprt_ctx, const s
if (!ctx) if (!ctx)
goto out_error; goto out_error;
BUG_ON_HOT(msg_control != NULL);
if (conn->flags & (CO_FL_WAIT_XPRT | CO_FL_SSL_WAIT_HS | CO_FL_EARLY_SSL_HS)) { if (conn->flags & (CO_FL_WAIT_XPRT | CO_FL_SSL_WAIT_HS | CO_FL_EARLY_SSL_HS)) {
/* a handshake was requested */ /* a handshake was requested */
TRACE_LEAVE(SSL_EV_CONN_SEND, conn); TRACE_LEAVE(SSL_EV_CONN_SEND, conn);

View File

@ -25,12 +25,12 @@ DECLARE_STATIC_TYPED_POOL(xprt_handshake_ctx_pool, "xprt_handshake_ctx", struct
/* This XPRT doesn't take care of sending or receiving data, once its handshake /* This XPRT doesn't take care of sending or receiving data, once its handshake
* is done, it just removes itself * is done, it just removes itself
*/ */
static size_t xprt_handshake_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, int flags) static size_t xprt_handshake_from_buf(struct connection *conn, void *xprt_ctx, const struct buffer *buf, size_t count, void *msg_control, size_t msg_controllen, int flags)
{ {
return 0; return 0;
} }
static size_t xprt_handshake_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, int flags) static size_t xprt_handshake_to_buf(struct connection *conn, void *xprt_ctx, struct buffer *buf, size_t count, void *msg_control, size_t *msg_controllen, int flags)
{ {
return 0; return 0;
} }