mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 23:27:04 +02:00
MAJOR: connection: split the send call into connection and stream interface
Similar to what was done on the receive path, the data layer now provides only an snd_buf() callback that is iterated over by the stream interface's si_conn_send_loop() function. The data layer now has no knowledge about channels nor stream interfaces. The splice() code still need to be ported as it currently is disabled.
This commit is contained in:
parent
ce323dea14
commit
5368d80ede
@ -37,16 +37,6 @@ static inline void conn_data_close(struct connection *conn)
|
||||
conn->data->close(conn);
|
||||
}
|
||||
|
||||
/* Calls the snd_buf() function of the data layer if any, otherwise
|
||||
* returns 0.
|
||||
*/
|
||||
static inline int conn_data_snd_buf(struct connection *conn)
|
||||
{
|
||||
if (!conn->data->snd_buf)
|
||||
return 0;
|
||||
return conn->data->snd_buf(conn);
|
||||
}
|
||||
|
||||
/* set polling depending on the change between the CURR part of the
|
||||
* flags and the new flags in connection C. The connection flags are
|
||||
* updated with the new flags at the end of the operation. Only the bits
|
||||
|
@ -118,8 +118,8 @@ struct sock_ops {
|
||||
void (*read)(struct connection *conn); /* read callback after poll() */
|
||||
void (*write)(struct connection *conn); /* write callback after poll() */
|
||||
void (*close)(struct connection *); /* close the data channel on the connection */
|
||||
int (*snd_buf)(struct connection *conn); /* callback used to send a buffer contents */
|
||||
int (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */
|
||||
int (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */
|
||||
};
|
||||
|
||||
/* A stream interface has 3 parts :
|
||||
|
160
src/raw_sock.c
160
src/raw_sock.c
@ -279,148 +279,60 @@ static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int coun
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* This function is called to send buffer data to a stream socket.
|
||||
* It returns -1 in case of unrecoverable error, otherwise zero.
|
||||
/* Send all pending bytes from buffer <buf> to connection <conn>'s socket.
|
||||
* <flags> may contain MSG_MORE to make the system hold on without sending
|
||||
* data too fast.
|
||||
* Only one call to send() is performed, unless the buffer wraps, in which case
|
||||
* a second call may be performed. The connection's flags are updated with
|
||||
* whatever special event is detected (error, empty). The caller is responsible
|
||||
* for taking care of those events and avoiding the call if inappropriate. The
|
||||
* function does not call the connection's polling update function, so the caller
|
||||
* is responsible for this.
|
||||
*/
|
||||
static int sock_raw_write_loop(struct connection *conn)
|
||||
static int raw_sock_from_buf(struct connection *conn, struct buffer *buf, int flags)
|
||||
{
|
||||
struct stream_interface *si = container_of(conn, struct stream_interface, conn);
|
||||
struct channel *b = si->ob;
|
||||
int write_poll = MAX_WRITE_POLL_LOOPS;
|
||||
int ret, max;
|
||||
int ret, try, done, send_flag;
|
||||
|
||||
#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
while (b->pipe) {
|
||||
ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
|
||||
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
|
||||
if (ret <= 0) {
|
||||
if (ret == 0 || errno == EAGAIN) {
|
||||
conn_data_poll_send(&si->conn);
|
||||
return 0;
|
||||
}
|
||||
/* here we have another error */
|
||||
return -1;
|
||||
}
|
||||
|
||||
b->flags |= BF_WRITE_PARTIAL;
|
||||
b->pipe->data -= ret;
|
||||
|
||||
if (!b->pipe->data) {
|
||||
put_pipe(b->pipe);
|
||||
b->pipe = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
if (--write_poll <= 0)
|
||||
return 0;
|
||||
|
||||
/* The only reason we did not empty the pipe is that the output
|
||||
* buffer is full.
|
||||
*/
|
||||
conn_data_poll_send(&si->conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* At this point, the pipe is empty, but we may still have data pending
|
||||
* in the normal buffer.
|
||||
done = 0;
|
||||
/* send the largest possible block. For this we perform only one call
|
||||
* to send() unless the buffer wraps and we exactly fill the first hunk,
|
||||
* in which case we accept to do it once again.
|
||||
*/
|
||||
#endif
|
||||
if (!b->buf.o) {
|
||||
b->flags |= BF_OUT_EMPTY;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* when we're in this loop, we already know that there is no spliced
|
||||
* data left, and that there are sendable buffered data.
|
||||
*/
|
||||
while (1) {
|
||||
max = b->buf.o;
|
||||
|
||||
while (buf->o) {
|
||||
try = buf->o;
|
||||
/* outgoing data may wrap at the end */
|
||||
if (b->buf.data + max > b->buf.p)
|
||||
max = b->buf.data + max - b->buf.p;
|
||||
if (buf->data + try > buf->p)
|
||||
try = buf->data + try - buf->p;
|
||||
|
||||
/* check if we want to inform the kernel that we're interested in
|
||||
* sending more data after this call. We want this if :
|
||||
* - we're about to close after this last send and want to merge
|
||||
* the ongoing FIN with the last segment.
|
||||
* - we know we can't send everything at once and must get back
|
||||
* here because of unaligned data
|
||||
* - there is still a finite amount of data to forward
|
||||
* The test is arranged so that the most common case does only 2
|
||||
* tests.
|
||||
*/
|
||||
send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
|
||||
if (try < buf->o)
|
||||
send_flag = MSG_MORE;
|
||||
|
||||
if (MSG_NOSIGNAL && MSG_MORE) {
|
||||
unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
|
||||
|
||||
if ((!(b->flags & BF_NEVER_WAIT) &&
|
||||
((b->to_forward && b->to_forward != BUF_INFINITE_FORWARD) ||
|
||||
(b->flags & BF_EXPECT_MORE))) ||
|
||||
((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == BF_SHUTW_NOW && (max == b->buf.o)) ||
|
||||
(max != b->buf.o)) {
|
||||
send_flag |= MSG_MORE;
|
||||
}
|
||||
|
||||
/* this flag has precedence over the rest */
|
||||
if (b->flags & BF_SEND_DONTWAIT)
|
||||
send_flag &= ~MSG_MORE;
|
||||
|
||||
ret = send(si_fd(si), bo_ptr(&b->buf), max, send_flag);
|
||||
} else {
|
||||
int skerr;
|
||||
socklen_t lskerr = sizeof(skerr);
|
||||
|
||||
ret = getsockopt(si_fd(si), SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
|
||||
if (ret == -1 || skerr)
|
||||
ret = -1;
|
||||
else
|
||||
ret = send(si_fd(si), bo_ptr(&b->buf), max, MSG_DONTWAIT);
|
||||
}
|
||||
ret = send(conn->t.sock.fd, bo_ptr(buf), try, send_flag | flags);
|
||||
|
||||
if (ret > 0) {
|
||||
if (si->conn.flags & CO_FL_WAIT_L4_CONN) {
|
||||
si->conn.flags &= ~CO_FL_WAIT_L4_CONN;
|
||||
si->exp = TICK_ETERNITY;
|
||||
}
|
||||
buf->o -= ret;
|
||||
done += ret;
|
||||
|
||||
b->flags |= BF_WRITE_PARTIAL;
|
||||
|
||||
b->buf.o -= ret;
|
||||
if (likely(!buffer_len(&b->buf)))
|
||||
if (likely(!buffer_len(buf)))
|
||||
/* optimize data alignment in the buffer */
|
||||
b->buf.p = b->buf.data;
|
||||
|
||||
if (likely(!bi_full(b)))
|
||||
b->flags &= ~BF_FULL;
|
||||
|
||||
if (!b->buf.o) {
|
||||
/* Always clear both flags once everything has been sent, they're one-shot */
|
||||
b->flags &= ~(BF_EXPECT_MORE | BF_SEND_DONTWAIT);
|
||||
if (likely(!b->pipe))
|
||||
b->flags |= BF_OUT_EMPTY;
|
||||
break;
|
||||
}
|
||||
buf->p = buf->data;
|
||||
|
||||
/* if the system buffer is full, don't insist */
|
||||
if (ret < max)
|
||||
break;
|
||||
|
||||
if (--write_poll <= 0)
|
||||
if (ret < try)
|
||||
break;
|
||||
}
|
||||
else if (ret == 0 || errno == EAGAIN) {
|
||||
/* nothing written, we need to poll for write first */
|
||||
conn_data_poll_send(&si->conn);
|
||||
return 0;
|
||||
conn->flags |= CO_FL_WAIT_ROOM;
|
||||
break;
|
||||
}
|
||||
else {
|
||||
/* bad, we got an error */
|
||||
return -1;
|
||||
else if (errno != EINTR) {
|
||||
conn->flags |= CO_FL_ERROR;
|
||||
break;
|
||||
}
|
||||
} /* while (1) */
|
||||
return 0;
|
||||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
|
||||
@ -433,7 +345,7 @@ struct sock_ops raw_sock = {
|
||||
.chk_snd = stream_int_chk_snd_conn,
|
||||
.read = si_conn_recv_cb,
|
||||
.write = si_conn_send_cb,
|
||||
.snd_buf = sock_raw_write_loop,
|
||||
.snd_buf = raw_sock_from_buf,
|
||||
.rcv_buf = raw_sock_to_buf,
|
||||
.close = NULL,
|
||||
};
|
||||
|
@ -654,6 +654,118 @@ void conn_notify_si(struct connection *conn)
|
||||
si->ib->flags &= ~BF_READ_DONTWAIT;
|
||||
}
|
||||
|
||||
/*
|
||||
* This function is called to send buffer data to a stream socket.
|
||||
* It returns -1 in case of unrecoverable error, otherwise zero.
|
||||
* It iterates the data layer's snd_buf function.
|
||||
*/
|
||||
static int si_conn_send_loop(struct connection *conn)
|
||||
{
|
||||
struct stream_interface *si = container_of(conn, struct stream_interface, conn);
|
||||
struct channel *b = si->ob;
|
||||
int write_poll = MAX_WRITE_POLL_LOOPS;
|
||||
int ret;
|
||||
|
||||
#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
while (b->pipe) {
|
||||
ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
|
||||
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
|
||||
if (ret <= 0) {
|
||||
if (ret == 0 || errno == EAGAIN) {
|
||||
conn_data_poll_send(&si->conn);
|
||||
return 0;
|
||||
}
|
||||
/* here we have another error */
|
||||
return -1;
|
||||
}
|
||||
|
||||
b->flags |= BF_WRITE_PARTIAL;
|
||||
b->pipe->data -= ret;
|
||||
|
||||
if (!b->pipe->data) {
|
||||
put_pipe(b->pipe);
|
||||
b->pipe = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
if (--write_poll <= 0)
|
||||
return 0;
|
||||
|
||||
/* The only reason we did not empty the pipe is that the output
|
||||
* buffer is full.
|
||||
*/
|
||||
conn_data_poll_send(&si->conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* At this point, the pipe is empty, but we may still have data pending
|
||||
* in the normal buffer.
|
||||
*/
|
||||
#endif
|
||||
if (!b->buf.o) {
|
||||
b->flags |= BF_OUT_EMPTY;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* when we're in this loop, we already know that there is no spliced
|
||||
* data left, and that there are sendable buffered data.
|
||||
*/
|
||||
conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
|
||||
while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
|
||||
/* check if we want to inform the kernel that we're interested in
|
||||
* sending more data after this call. We want this if :
|
||||
* - we're about to close after this last send and want to merge
|
||||
* the ongoing FIN with the last segment.
|
||||
* - we know we can't send everything at once and must get back
|
||||
* here because of unaligned data
|
||||
* - there is still a finite amount of data to forward
|
||||
* The test is arranged so that the most common case does only 2
|
||||
* tests.
|
||||
*/
|
||||
unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
|
||||
|
||||
if ((!(b->flags & (BF_NEVER_WAIT|BF_SEND_DONTWAIT)) &&
|
||||
((b->to_forward && b->to_forward != BUF_INFINITE_FORWARD) ||
|
||||
(b->flags & BF_EXPECT_MORE))) ||
|
||||
((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == BF_SHUTW_NOW))
|
||||
send_flag |= MSG_MORE;
|
||||
|
||||
ret = conn->data->snd_buf(conn, &b->buf, send_flag);
|
||||
if (ret <= 0)
|
||||
break;
|
||||
|
||||
if (si->conn.flags & CO_FL_WAIT_L4_CONN)
|
||||
si->conn.flags &= ~CO_FL_WAIT_L4_CONN;
|
||||
|
||||
b->flags |= BF_WRITE_PARTIAL;
|
||||
|
||||
if (likely(!bi_full(b)))
|
||||
b->flags &= ~BF_FULL;
|
||||
|
||||
if (!b->buf.o) {
|
||||
/* Always clear both flags once everything has been sent, they're one-shot */
|
||||
b->flags &= ~(BF_EXPECT_MORE | BF_SEND_DONTWAIT);
|
||||
if (likely(!b->pipe))
|
||||
b->flags |= BF_OUT_EMPTY;
|
||||
break;
|
||||
}
|
||||
|
||||
if (--write_poll <= 0)
|
||||
break;
|
||||
} /* while */
|
||||
|
||||
if (conn->flags & CO_FL_ERROR)
|
||||
return -1;
|
||||
|
||||
if (conn->flags & CO_FL_WAIT_ROOM) {
|
||||
/* we need to poll before going on */
|
||||
conn_data_poll_send(&si->conn);
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/* Updates the timers and flags of a stream interface attached to a connection,
|
||||
* depending on the buffers' flags. It should only be called once after the
|
||||
* buffer flags have settled down, and before they are cleared. It doesn't
|
||||
@ -792,7 +904,7 @@ void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
(fdtab[si_fd(si)].ev & FD_POLL_OUT))) /* we'll be called anyway */
|
||||
return;
|
||||
|
||||
if (conn_data_snd_buf(&si->conn) < 0) {
|
||||
if (si_conn_send_loop(&si->conn) < 0) {
|
||||
/* Write error on the file descriptor. We mark the FD as STERROR so
|
||||
* that we don't use it anymore and we notify the task.
|
||||
*/
|
||||
@ -1078,7 +1190,7 @@ void si_conn_send_cb(struct connection *conn)
|
||||
return;
|
||||
|
||||
/* OK there are data waiting to be sent */
|
||||
if (conn_data_snd_buf(conn) < 0)
|
||||
if (si_conn_send_loop(conn) < 0)
|
||||
goto out_error;
|
||||
|
||||
/* OK all done */
|
||||
|
Loading…
Reference in New Issue
Block a user