MAJOR: raw_sock: extract raw_sock_to_buf() from raw_sock_read()

This is the start of the stream connection iterator which calls the
data-layer reader. This still looks a bit tricky but is OK. Splicing
is not handled at all at the moment.
This commit is contained in:
Willy Tarreau 2012-08-20 17:30:32 +02:00 committed by Willy Tarreau
parent 75bf2c925f
commit 2ba4465086
2 changed files with 192 additions and 151 deletions

View File

@ -73,6 +73,13 @@ enum {
*/
CO_FL_POLL_SOCK = CO_FL_HANDSHAKE | CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN,
/* These flags are used by data layers to indicate to their iterators
* whether they had to stop due to missing data or missing room. Their
* callers must reset them before calling the data layer handlers.
*/
CO_FL_WAIT_DATA = 0x00004000, /* data source is empty */
CO_FL_WAIT_ROOM = 0x00008000, /* data sink is full */
/* flags used to remember what shutdown have been performed/reported */
CO_FL_DATA_RD_SH = 0x00010000, /* DATA layer was notified about shutr/read0 */
CO_FL_DATA_WR_SH = 0x00020000, /* DATA layer asked for shutw */

View File

@ -209,19 +209,91 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
#endif /* CONFIG_HAP_LINUX_SPLICE */
/* Receive up to <count> bytes from connection <conn>'s socket and store them
* into buffer <buf>. The caller must ensure that <count> is always smaller
* than the buffer's size. Only one call to recv() is performed, unless the
* buffer wraps, in which case a second call may be performed. The connection's
* flags are updated with whatever special event is detected (error, read0,
* empty). The caller is responsible for taking care of those events and
* avoiding the call if inappropriate. The function does not call the
* connection's polling update function, so the caller is responsible for this.
*/
static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int count)
{
int ret, done = 0;
int try = count;
/* stop here if we reached the end of data */
if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
goto read0;
/* compute the maximum block size we can read at once. */
if (buffer_empty(buf)) {
/* let's realign the buffer to optimize I/O */
buf->p = buf->data;
}
else if (buf->data + buf->o < buf->p &&
buf->p + buf->i < buf->data + buf->size) {
/* remaining space wraps at the end, with a moving limit */
if (try > buf->data + buf->size - (buf->p + buf->i))
try = buf->data + buf->size - (buf->p + buf->i);
}
/* read the largest possible block. For this, we perform only one call
* to recv() unless the buffer wraps and we exactly fill the first hunk,
* in which case we accept to do it once again. A new attempt is made on
* EINTR too.
*/
while (try) {
ret = recv(conn->t.sock.fd, bi_end(buf), try, 0);
if (ret > 0) {
buf->i += ret;
done += ret;
if (ret < try) {
/* unfortunately, on level-triggered events, POLL_HUP
* is generally delivered AFTER the system buffer is
* empty, so this one might never match.
*/
if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
goto read0;
break;
}
count -= ret;
try = count;
}
else if (ret == 0) {
goto read0;
}
else if (errno == EAGAIN) {
conn->flags |= CO_FL_WAIT_DATA;
break;
}
else if (errno != EINTR) {
conn->flags |= CO_FL_ERROR;
break;
}
}
return done;
read0:
conn_sock_read0(conn);
return done;
}
/*
* this function is called on a read event from a stream socket.
*/
static void sock_raw_read(struct connection *conn)
{
int fd = conn->t.sock.fd;
struct stream_interface *si = container_of(conn, struct stream_interface, conn);
struct channel *b = si->ib;
int ret, max, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
#ifdef DEBUG_FULL
fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", fd, fdtab[fd].ev, fdtab[fd].owner);
fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", conn->t.sock.fd, fdtab[conn->t.sock.fd].ev, fdtab[conn->t.sock.fd].owner);
#endif
/* stop immediately on errors. Note that we DON'T want to stop on
* POLL_ERR, as the poller might report a write error while there
@ -233,7 +305,7 @@ static void sock_raw_read(struct connection *conn)
goto out_error;
/* stop here if we reached the end of data */
if ((fdtab[fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
if (conn_data_read0_pending(conn))
goto out_shutdown_r;
/* maybe we were called immediately after an asynchronous shutr */
@ -247,7 +319,7 @@ static void sock_raw_read(struct connection *conn)
* Since older splice() implementations were buggy and returned
* EAGAIN on end of read, let's bypass the call to splice() now.
*/
if (fdtab[fd].ev & FD_POLL_HUP)
if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
goto out_shutdown_r;
if (sock_raw_splice_in(b, si) >= 0) {
@ -261,7 +333,8 @@ static void sock_raw_read(struct connection *conn)
}
#endif
cur_read = 0;
while (1) {
conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
max = bi_avail(b);
if (!max) {
@ -270,172 +343,133 @@ static void sock_raw_read(struct connection *conn)
break;
}
/*
* 1. compute the maximum block size we can read at once.
*/
if (buffer_empty(&b->buf)) {
/* let's realign the buffer to optimize I/O */
b->buf.p = b->buf.data;
ret = raw_sock_to_buf(conn, &b->buf, max);
if (ret <= 0)
break;
cur_read += ret;
/* if we're allowed to directly forward data, we must update ->o */
if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
unsigned long fwd = ret;
if (b->to_forward != BUF_INFINITE_FORWARD) {
if (fwd > b->to_forward)
fwd = b->to_forward;
b->to_forward -= fwd;
}
b_adv(b, fwd);
}
else if (b->buf.data + b->buf.o < b->buf.p &&
b->buf.p + b->buf.i < b->buf.data + b->buf.size) {
/* remaining space wraps at the end, with a moving limit */
if (max > b->buf.data + b->buf.size - (b->buf.p + b->buf.i))
max = b->buf.data + b->buf.size - (b->buf.p + b->buf.i);
if (conn->flags & CO_FL_WAIT_L4_CONN) {
conn->flags &= ~CO_FL_WAIT_L4_CONN;
si->exp = TICK_ETERNITY;
}
/* else max is already OK */
/*
* 2. read the largest possible block
*/
ret = recv(fd, bi_end(&b->buf), max, 0);
b->flags |= BF_READ_PARTIAL;
b->total += ret;
if (ret > 0) {
b->buf.i += ret;
cur_read += ret;
/* if we're allowed to directly forward data, we must update ->o */
if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
unsigned long fwd = ret;
if (b->to_forward != BUF_INFINITE_FORWARD) {
if (fwd > b->to_forward)
fwd = b->to_forward;
b->to_forward -= fwd;
}
b_adv(b, fwd);
}
if (conn->flags & CO_FL_WAIT_L4_CONN) {
conn->flags &= ~CO_FL_WAIT_L4_CONN;
si->exp = TICK_ETERNITY;
}
b->flags |= BF_READ_PARTIAL;
b->total += ret;
if (bi_full(b)) {
/* The buffer is now full, there's no point in going through
* the loop again.
*/
if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
b->xfer_small = 0;
b->xfer_large++;
if (b->xfer_large >= 3) {
/* we call this buffer a fast streamer if it manages
* to be filled in one call 3 consecutive times.
*/
b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
//fputc('+', stderr);
}
}
else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
(cur_read <= b->buf.size / 2)) {
b->xfer_large = 0;
b->xfer_small++;
if (b->xfer_small >= 2) {
/* if the buffer has been at least half full twice,
* we receive faster than we send, so at least it
* is not a "fast streamer".
*/
b->flags &= ~BF_STREAMER_FAST;
//fputc('-', stderr);
}
}
else {
b->xfer_small = 0;
b->xfer_large = 0;
}
b->flags |= BF_FULL;
si->flags |= SI_FL_WAIT_ROOM;
break;
}
/* if too many bytes were missing from last read, it means that
* it's pointless trying to read again because the system does
* not have them in buffers. BTW, if FD_POLL_HUP was present,
* it means that we have reached the end and that the connection
* is closed.
if (bi_full(b)) {
/* The buffer is now full, there's no point in going through
* the loop again.
*/
if (ret < max) {
if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
(cur_read <= b->buf.size / 2)) {
b->xfer_large = 0;
b->xfer_small++;
if (b->xfer_small >= 3) {
/* we have read less than half of the buffer in
* one pass, and this happened at least 3 times.
* This is definitely not a streamer.
*/
b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
//fputc('!', stderr);
}
if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
b->xfer_small = 0;
b->xfer_large++;
if (b->xfer_large >= 3) {
/* we call this buffer a fast streamer if it manages
* to be filled in one call 3 consecutive times.
*/
b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
//fputc('+', stderr);
}
/* unfortunately, on level-triggered events, POLL_HUP
* is generally delivered AFTER the system buffer is
* empty, so this one might never match.
*/
if (fdtab[fd].ev & FD_POLL_HUP)
goto out_shutdown_r;
/* if a streamer has read few data, it may be because we
* have exhausted system buffers. It's not worth trying
* again.
*/
if (b->flags & BF_STREAMER)
break;
/* generally if we read something smaller than 1 or 2 MSS,
* it means that either we have exhausted the system's
* buffers (streamer or question-response protocol) or
* that the connection will be closed. Streamers are
* easily detected so we return early. For other cases,
* it's still better to perform a last read to be sure,
* because it may save one complete poll/read/wakeup cycle
* in case of shutdown.
*/
if (ret < MIN_RET_FOR_READ_LOOP && b->flags & BF_STREAMER)
break;
/* if we read a large block smaller than what we requested,
* it's almost certain we'll never get anything more.
*/
if (ret >= global.tune.recv_enough)
break;
}
else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
(cur_read <= b->buf.size / 2)) {
b->xfer_large = 0;
b->xfer_small++;
if (b->xfer_small >= 2) {
/* if the buffer has been at least half full twice,
* we receive faster than we send, so at least it
* is not a "fast streamer".
*/
b->flags &= ~BF_STREAMER_FAST;
//fputc('-', stderr);
}
}
else {
b->xfer_small = 0;
b->xfer_large = 0;
}
if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
break;
}
else if (ret == 0) {
/* connection closed */
goto out_shutdown_r;
}
else if (errno == EAGAIN) {
/* Ignore EAGAIN but inform the poller that there is
* nothing to read left if we did not read much, ie
* less than what we were still expecting to read.
* But we may have done some work justifying to notify
* the task.
*/
if (cur_read < MIN_RET_FOR_READ_LOOP)
conn_data_poll_recv(conn);
b->flags |= BF_FULL;
si->flags |= SI_FL_WAIT_ROOM;
break;
}
else {
goto out_error;
if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
break;
/* if too many bytes were missing from last read, it means that
* it's pointless trying to read again because the system does
* not have them in buffers.
*/
if (ret < max) {
if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
(cur_read <= b->buf.size / 2)) {
b->xfer_large = 0;
b->xfer_small++;
if (b->xfer_small >= 3) {
/* we have read less than half of the buffer in
* one pass, and this happened at least 3 times.
* This is definitely not a streamer.
*/
b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
//fputc('!', stderr);
}
}
/* if a streamer has read few data, it may be because we
* have exhausted system buffers. It's not worth trying
* again.
*/
if (b->flags & BF_STREAMER)
break;
/* if we read a large block smaller than what we requested,
* it's almost certain we'll never get anything more.
*/
if (ret >= global.tune.recv_enough)
break;
}
} /* while (1) */
} /* while !flags */
if (conn->flags & CO_FL_ERROR)
goto out_error;
if (conn->flags & CO_FL_WAIT_DATA) {
/* we don't automatically ask for polling if we have
* read enough data, as it saves some syscalls with
* speculative pollers.
*/
if (cur_read < MIN_RET_FOR_READ_LOOP)
__conn_data_poll_recv(conn);
else
__conn_data_want_recv(conn);
}
if (conn_data_read0_pending(conn))
/* connection closed */
goto out_shutdown_r;
return;
out_shutdown_r:
/* we received a shutdown */
fdtab[fd].ev &= ~FD_POLL_HUP;
b->flags |= BF_READ_NULL;
if (b->flags & BF_AUTO_CLOSE)
buffer_shutw_now(b);
stream_sock_read0(si);
conn_data_read0(conn);
return;
out_error: