From ce323dea14b46fa01f8950a9d6a1e803ef803d0b Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Mon, 20 Aug 2012 21:41:06 +0200 Subject: [PATCH] REORG: stream-interface: move sock_raw_read() to si_conn_recv_cb() The recv function is now generic and is usable to iterate any connection-to-buf reading function from a stream interface. So let's move it to stream-interface. --- include/proto/stream_interface.h | 1 + src/raw_sock.c | 202 +------------------------------ src/stream_interface.c | 193 +++++++++++++++++++++++++++++ 3 files changed, 195 insertions(+), 201 deletions(-) diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 909d04096..4f45c1265 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -41,6 +41,7 @@ int stream_int_shutr(struct stream_interface *si); int stream_int_shutw(struct stream_interface *si); void stream_int_chk_rcv_conn(struct stream_interface *si); void stream_int_chk_snd_conn(struct stream_interface *si); +void si_conn_recv_cb(struct connection *conn); void si_conn_send_cb(struct connection *conn); void stream_sock_read0(struct stream_interface *si); diff --git a/src/raw_sock.c b/src/raw_sock.c index 7dd90c946..1bc40424a 100644 --- a/src/raw_sock.c +++ b/src/raw_sock.c @@ -42,9 +42,6 @@ #include -/* main event functions used to move data between sockets and buffers */ -static void sock_raw_read(struct connection *conn); - #if 0 && defined(CONFIG_HAP_LINUX_SPLICE) #include @@ -282,203 +279,6 @@ static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int coun } -/* - * this function is called on a read event from a stream socket. - */ -static void sock_raw_read(struct connection *conn) -{ - struct stream_interface *si = container_of(conn, struct stream_interface, conn); - struct channel *b = si->ib; - int ret, max, cur_read; - int read_poll = MAX_READ_POLL_LOOPS; - -#ifdef DEBUG_FULL - fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", conn->t.sock.fd, fdtab[conn->t.sock.fd].ev, fdtab[conn->t.sock.fd].owner); -#endif - /* stop immediately on errors. Note that we DON'T want to stop on - * POLL_ERR, as the poller might report a write error while there - * are still data available in the recv buffer. This typically - * happens when we send too large a request to a backend server - * which rejects it before reading it all. - */ - if (conn->flags & CO_FL_ERROR) - goto out_error; - - /* stop here if we reached the end of data */ - if (conn_data_read0_pending(conn)) - goto out_shutdown_r; - - /* maybe we were called immediately after an asynchronous shutr */ - if (b->flags & BF_SHUTR) - return; - -#if 0 && defined(CONFIG_HAP_LINUX_SPLICE) - if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) { - - /* Under Linux, if FD_POLL_HUP is set, we have reached the end. - * Since older splice() implementations were buggy and returned - * EAGAIN on end of read, let's bypass the call to splice() now. - */ - if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP) - goto out_shutdown_r; - - if (sock_raw_splice_in(b, si) >= 0) { - if (si->flags & SI_FL_ERR) - goto out_error; - if (b->flags & BF_READ_NULL) - goto out_shutdown_r; - return; - } - /* splice not possible (anymore), let's go on on standard copy */ - } -#endif - cur_read = 0; - conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM); - while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) { - max = bi_avail(b); - - if (!max) { - b->flags |= BF_FULL; - si->flags |= SI_FL_WAIT_ROOM; - break; - } - - ret = conn->data->rcv_buf(conn, &b->buf, max); - if (ret <= 0) - break; - - cur_read += ret; - - /* if we're allowed to directly forward data, we must update ->o */ - if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) { - unsigned long fwd = ret; - if (b->to_forward != BUF_INFINITE_FORWARD) { - if (fwd > b->to_forward) - fwd = b->to_forward; - b->to_forward -= fwd; - } - b_adv(b, fwd); - } - - if (conn->flags & CO_FL_WAIT_L4_CONN) { - conn->flags &= ~CO_FL_WAIT_L4_CONN; - si->exp = TICK_ETERNITY; - } - - b->flags |= BF_READ_PARTIAL; - b->total += ret; - - if (bi_full(b)) { - /* The buffer is now full, there's no point in going through - * the loop again. - */ - if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) { - b->xfer_small = 0; - b->xfer_large++; - if (b->xfer_large >= 3) { - /* we call this buffer a fast streamer if it manages - * to be filled in one call 3 consecutive times. - */ - b->flags |= (BF_STREAMER | BF_STREAMER_FAST); - //fputc('+', stderr); - } - } - else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && - (cur_read <= b->buf.size / 2)) { - b->xfer_large = 0; - b->xfer_small++; - if (b->xfer_small >= 2) { - /* if the buffer has been at least half full twice, - * we receive faster than we send, so at least it - * is not a "fast streamer". - */ - b->flags &= ~BF_STREAMER_FAST; - //fputc('-', stderr); - } - } - else { - b->xfer_small = 0; - b->xfer_large = 0; - } - - b->flags |= BF_FULL; - si->flags |= SI_FL_WAIT_ROOM; - break; - } - - if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0) - break; - - /* if too many bytes were missing from last read, it means that - * it's pointless trying to read again because the system does - * not have them in buffers. - */ - if (ret < max) { - if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && - (cur_read <= b->buf.size / 2)) { - b->xfer_large = 0; - b->xfer_small++; - if (b->xfer_small >= 3) { - /* we have read less than half of the buffer in - * one pass, and this happened at least 3 times. - * This is definitely not a streamer. - */ - b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST); - //fputc('!', stderr); - } - } - - /* if a streamer has read few data, it may be because we - * have exhausted system buffers. It's not worth trying - * again. - */ - if (b->flags & BF_STREAMER) - break; - - /* if we read a large block smaller than what we requested, - * it's almost certain we'll never get anything more. - */ - if (ret >= global.tune.recv_enough) - break; - } - } /* while !flags */ - - if (conn->flags & CO_FL_ERROR) - goto out_error; - - if (conn->flags & CO_FL_WAIT_DATA) { - /* we don't automatically ask for polling if we have - * read enough data, as it saves some syscalls with - * speculative pollers. - */ - if (cur_read < MIN_RET_FOR_READ_LOOP) - __conn_data_poll_recv(conn); - else - __conn_data_want_recv(conn); - } - - if (conn_data_read0_pending(conn)) - /* connection closed */ - goto out_shutdown_r; - - return; - - out_shutdown_r: - /* we received a shutdown */ - b->flags |= BF_READ_NULL; - if (b->flags & BF_AUTO_CLOSE) - buffer_shutw_now(b); - stream_sock_read0(si); - conn_data_read0(conn); - return; - - out_error: - /* Read error on the connection, report the error and stop I/O */ - conn->flags |= CO_FL_ERROR; - conn_data_stop_both(conn); -} - - /* * This function is called to send buffer data to a stream socket. * It returns -1 in case of unrecoverable error, otherwise zero. @@ -631,7 +431,7 @@ struct sock_ops raw_sock = { .shutw = NULL, .chk_rcv = stream_int_chk_rcv_conn, .chk_snd = stream_int_chk_snd_conn, - .read = sock_raw_read, + .read = si_conn_recv_cb, .write = si_conn_send_cb, .snd_buf = sock_raw_write_loop, .rcv_buf = raw_sock_to_buf, diff --git a/src/stream_interface.c b/src/stream_interface.c index ca06949fd..2cc1962d1 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -863,6 +863,199 @@ void stream_int_chk_snd_conn(struct stream_interface *si) } } +/* + * This is the callback which is called by the connection layer to receive data + * into the buffer from the connection. It iterates over the data layer's rcv_buf + * function. + */ +void si_conn_recv_cb(struct connection *conn) +{ + struct stream_interface *si = container_of(conn, struct stream_interface, conn); + struct channel *b = si->ib; + int ret, max, cur_read; + int read_poll = MAX_READ_POLL_LOOPS; + + /* stop immediately on errors. Note that we DON'T want to stop on + * POLL_ERR, as the poller might report a write error while there + * are still data available in the recv buffer. This typically + * happens when we send too large a request to a backend server + * which rejects it before reading it all. + */ + if (conn->flags & CO_FL_ERROR) + goto out_error; + + /* stop here if we reached the end of data */ + if (conn_data_read0_pending(conn)) + goto out_shutdown_r; + + /* maybe we were called immediately after an asynchronous shutr */ + if (b->flags & BF_SHUTR) + return; + +#if 0 && defined(CONFIG_HAP_LINUX_SPLICE) + if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) { + + /* Under Linux, if FD_POLL_HUP is set, we have reached the end. + * Since older splice() implementations were buggy and returned + * EAGAIN on end of read, let's bypass the call to splice() now. + */ + if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP) + goto out_shutdown_r; + + if (sock_raw_splice_in(b, si) >= 0) { + if (si->flags & SI_FL_ERR) + goto out_error; + if (b->flags & BF_READ_NULL) + goto out_shutdown_r; + return; + } + /* splice not possible (anymore), let's go on on standard copy */ + } +#endif + cur_read = 0; + conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM); + while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) { + max = bi_avail(b); + + if (!max) { + b->flags |= BF_FULL; + si->flags |= SI_FL_WAIT_ROOM; + break; + } + + ret = conn->data->rcv_buf(conn, &b->buf, max); + if (ret <= 0) + break; + + cur_read += ret; + + /* if we're allowed to directly forward data, we must update ->o */ + if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) { + unsigned long fwd = ret; + if (b->to_forward != BUF_INFINITE_FORWARD) { + if (fwd > b->to_forward) + fwd = b->to_forward; + b->to_forward -= fwd; + } + b_adv(b, fwd); + } + + if (conn->flags & CO_FL_WAIT_L4_CONN) + conn->flags &= ~CO_FL_WAIT_L4_CONN; + + b->flags |= BF_READ_PARTIAL; + b->total += ret; + + if (bi_full(b)) { + /* The buffer is now full, there's no point in going through + * the loop again. + */ + if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) { + b->xfer_small = 0; + b->xfer_large++; + if (b->xfer_large >= 3) { + /* we call this buffer a fast streamer if it manages + * to be filled in one call 3 consecutive times. + */ + b->flags |= (BF_STREAMER | BF_STREAMER_FAST); + //fputc('+', stderr); + } + } + else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && + (cur_read <= b->buf.size / 2)) { + b->xfer_large = 0; + b->xfer_small++; + if (b->xfer_small >= 2) { + /* if the buffer has been at least half full twice, + * we receive faster than we send, so at least it + * is not a "fast streamer". + */ + b->flags &= ~BF_STREAMER_FAST; + //fputc('-', stderr); + } + } + else { + b->xfer_small = 0; + b->xfer_large = 0; + } + + b->flags |= BF_FULL; + si->flags |= SI_FL_WAIT_ROOM; + break; + } + + if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0) + break; + + /* if too many bytes were missing from last read, it means that + * it's pointless trying to read again because the system does + * not have them in buffers. + */ + if (ret < max) { + if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && + (cur_read <= b->buf.size / 2)) { + b->xfer_large = 0; + b->xfer_small++; + if (b->xfer_small >= 3) { + /* we have read less than half of the buffer in + * one pass, and this happened at least 3 times. + * This is definitely not a streamer. + */ + b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST); + //fputc('!', stderr); + } + } + + /* if a streamer has read few data, it may be because we + * have exhausted system buffers. It's not worth trying + * again. + */ + if (b->flags & BF_STREAMER) + break; + + /* if we read a large block smaller than what we requested, + * it's almost certain we'll never get anything more. + */ + if (ret >= global.tune.recv_enough) + break; + } + } /* while !flags */ + + if (conn->flags & CO_FL_WAIT_DATA) { + /* we don't automatically ask for polling if we have + * read enough data, as it saves some syscalls with + * speculative pollers. + */ + if (cur_read < MIN_RET_FOR_READ_LOOP) + __conn_data_poll_recv(conn); + else + __conn_data_want_recv(conn); + } + + if (conn->flags & CO_FL_ERROR) + goto out_error; + + if (conn_data_read0_pending(conn)) + /* connection closed */ + goto out_shutdown_r; + + return; + + out_shutdown_r: + /* we received a shutdown */ + b->flags |= BF_READ_NULL; + if (b->flags & BF_AUTO_CLOSE) + buffer_shutw_now(b); + stream_sock_read0(si); + conn_data_read0(conn); + return; + + out_error: + /* Read error on the connection, report the error and stop I/O */ + conn->flags |= CO_FL_ERROR; + conn_data_stop_both(conn); +} + /* * This is the callback which is called by the connection layer to send data * from the buffer to the connection. It iterates over the data layer's snd_buf