diff --git a/src/stream_sock.c b/src/stream_sock.c index e006797d5..161574609 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -289,81 +289,32 @@ int stream_sock_read(int fd) { /* - * this function is called on a write event from a stream socket. - * It returns 0 if we have a high confidence that we will not be - * able to write more data without polling first. Returns non-zero - * otherwise. + * This function is called to send buffer data to a stream socket. + * It returns -1 in case of unrecoverable error, 0 if the caller needs to poll + * before calling it again, otherwise 1. */ -int stream_sock_write(int fd) { - __label__ out_wakeup, out_error; - struct stream_interface *si = fdtab[fd].owner; - struct buffer *b = si->ob; - int ret, max, retval; +int stream_sock_write_loop(struct stream_interface *si, struct buffer *b) +{ int write_poll = MAX_WRITE_POLL_LOOPS; + int retval = 1; + int ret, max; -#ifdef DEBUG_FULL - fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner); -#endif - - retval = 1; - if (fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR)) - goto out_error; + if (!b->l || !b->send_max) + return retval; + /* when we're in this loop, we already know that there is no spliced + * data left, and that there are sendable buffered data. + */ while (1) { - if (b->l == 0) { /* let's realign the buffer to optimize I/O */ - b->r = b->w = b->lr = b->data; - max = 0; - } - else if (b->r > b->w) { + if (b->r > b->w) max = b->r - b->w; - } - else { + else max = b->data + BUFSIZE - b->w; - } /* limit the amount of outgoing data if required */ if (max > b->send_max) max = b->send_max; - if (max == 0) { - /* may be we have received a connection acknowledgement in TCP mode without data */ - if (likely(fdtab[fd].state == FD_STCONN)) { - /* We have no data to send to check the connection, and - * getsockopt() will not inform us whether the connection - * is still pending. So we'll reuse connect() to check the - * state of the socket. This has the advantage of givig us - * the following info : - * - error - * - connecting (EALREADY, EINPROGRESS) - * - connected (EISCONN, 0) - */ - if ((connect(fd, fdtab[fd].peeraddr, fdtab[fd].peerlen) == 0)) - errno = 0; - - if (errno == EALREADY || errno == EINPROGRESS) { - retval = 0; - goto out_may_wakeup; - } - - if (errno && errno != EISCONN) - goto out_error; - - /* OK we just need to indicate that we got a connection - * and that we wrote nothing. - */ - b->flags |= BF_WRITE_NULL; - fdtab[fd].state = FD_STREADY; - } - - /* Funny, we were called to write something but there wasn't - * anything. We can get there, for example if we were woken up - * on a write event to finish the splice, but the send_max is 0 - * so we cannot write anything from the buffer. Let's disable - * the write event and pretend we never came there. - */ - goto out_stop_write; - } - #ifndef MSG_NOSIGNAL { int skerr; @@ -376,30 +327,33 @@ int stream_sock_write(int fd) { ret = send(fd, b->w, max, MSG_DONTWAIT); } #else - ret = send(fd, b->w, max, MSG_DONTWAIT | MSG_NOSIGNAL); + ret = send(si->fd, b->w, max, MSG_DONTWAIT | MSG_NOSIGNAL); #endif if (ret > 0) { - b->l -= ret; - b->w += ret; - b->send_max -= ret; - - if (fdtab[fd].state == FD_STCONN) - fdtab[fd].state = FD_STREADY; + if (fdtab[si->fd].state == FD_STCONN) + fdtab[si->fd].state = FD_STREADY; b->flags |= BF_WRITE_PARTIAL; - if (b->l < b->max_len) + b->w += ret; + if (b->w == b->data + BUFSIZE) + b->w = b->data; /* wrap around the buffer */ + + b->l -= ret; + if (likely(b->l < b->max_len)) b->flags &= ~BF_FULL; - if (b->w == b->data + BUFSIZE) { - b->w = b->data; /* wrap around the buffer */ + if (likely(!b->l)) { + /* optimize data alignment in the buffer */ + b->r = b->w = b->lr = b->data; + if (likely(!b->splice_len)) + b->flags |= BF_EMPTY; } - if (!b->l && !b->splice_len) { - b->flags |= BF_EMPTY; - goto out_stop_write; - } + b->send_max -= ret; + if (!b->send_max || !b->l) + break; /* if the system buffer is full, don't insist */ if (ret < max) @@ -409,74 +363,140 @@ int stream_sock_write(int fd) { break; } else if (ret == 0 || errno == EAGAIN) { - /* nothing written, just pretend we were never called - * and wait for the socket to be ready. But we may have - * done some work justifying to notify the task. - */ + /* nothing written, we need to poll for write first */ retval = 0; break; } else { - goto out_error; + /* bad, we got an error */ + retval = -1; + break; } } /* while (1) */ - /* - * The only way to get out of this loop is to have stopped writing - * without any error, either by limiting the number of loops, or - * because of an EAGAIN. We only rearm the timer if we have at least - * written something. - */ + return retval; +} - if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) { - b->wex = tick_add_ifset(now_ms, b->wto); - if (tick_isset(b->wex) & tick_isset(si->ib->rex)) { - /* FIXME: to prevent the client from expiring read timeouts during writes, - * we refresh it. A solution would be to merge read+write timeouts into a - * unique one, although that needs some study particularly on full-duplex - * TCP connections. */ - si->ib->rex = b->wex; + +/* + * This function is called on a write event from a stream socket. + * It returns 0 if the caller needs to poll before calling it again, otherwise + * non-zero. + */ +int stream_sock_write(int fd) +{ + struct stream_interface *si = fdtab[fd].owner; + struct buffer *b = si->ob; + int retval = 1; + +#ifdef DEBUG_FULL + fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner); +#endif + + retval = 1; + if (fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR)) + goto out_error; + + if (likely(!(b->flags & BF_EMPTY))) { + /* OK there are data waiting to be sent */ + retval = stream_sock_write_loop(si, b); + if (retval < 0) + goto out_error; + } + else { + /* may be we have received a connection acknowledgement in TCP mode without data */ + if (likely(fdtab[fd].state == FD_STCONN)) { + /* We have no data to send to check the connection, and + * getsockopt() will not inform us whether the connection + * is still pending. So we'll reuse connect() to check the + * state of the socket. This has the advantage of givig us + * the following info : + * - error + * - connecting (EALREADY, EINPROGRESS) + * - connected (EISCONN, 0) + */ + if ((connect(fd, fdtab[fd].peeraddr, fdtab[fd].peerlen) == 0)) + errno = 0; + + if (errno == EALREADY || errno == EINPROGRESS) { + retval = 0; + goto out_may_wakeup; + } + + if (errno && errno != EISCONN) + goto out_error; + + /* OK we just need to indicate that we got a connection + * and that we wrote nothing. + */ + b->flags |= BF_WRITE_NULL; + fdtab[fd].state = FD_STREADY; } + + /* Funny, we were called to write something but there wasn't + * anything. We can get there, for example if we were woken up + * on a write event to finish the splice, but the send_max is 0 + * so we cannot write anything from the buffer. Let's disable + * the write event and pretend we never came there. + */ + } + + if ((b->flags & BF_EMPTY) || !b->send_max) { + /* the connection is established but we can't write. Either the + * buffer is empty, or we just refrain from sending because the + * send_max limit was reached. Maybe we just wrote the last + * chunk and need to close. + */ + if (((b->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == + (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) && + (si->state == SI_ST_EST)) { + stream_sock_shutw(si); + goto out_wakeup; + } + + if (b->flags & BF_EMPTY) + si->flags |= SI_FL_WAIT_DATA; + + EV_FD_CLR(fd, DIR_WR); + b->wex = TICK_ETERNITY; } out_may_wakeup: - if (!(b->flags & BF_WRITE_ACTIVITY)) - goto out_skip_wakeup; - out_wakeup: - /* the producer might be waiting for more room to store data */ - if (likely((b->flags & (BF_WRITE_PARTIAL|BF_FULL)) == BF_WRITE_PARTIAL && - (b->prod->flags & SI_FL_WAIT_ROOM))) - b->prod->chk_rcv(b->prod); + if (b->flags & BF_WRITE_ACTIVITY) { + /* update timeout if we have written something */ + if (b->send_max && + (b->flags & (BF_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL) + b->wex = tick_add_ifset(now_ms, b->wto); - /* we have to wake up if there is a special event or if we don't have - * any more data to forward and it's not planned to send any more. - */ - if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || - (!b->to_forward && !b->send_max && !b->splice_len) || - si->state != SI_ST_EST || - b->prod->state != SI_ST_EST)) - task_wakeup(si->owner, TASK_WOKEN_IO); + out_wakeup: + if (tick_isset(si->ib->rex)) { + /* Note: to prevent the client from expiring read timeouts + * during writes, we refresh it. A better solution would be + * to merge read+write timeouts into a unique one, although + * that needs some study particularly on full-duplex TCP + * connections. + */ + si->ib->rex = tick_add_ifset(now_ms, si->ib->rto); + } + + /* the producer might be waiting for more room to store data */ + if (likely((b->flags & (BF_WRITE_PARTIAL|BF_FULL)) == BF_WRITE_PARTIAL && + (b->prod->flags & SI_FL_WAIT_ROOM))) + b->prod->chk_rcv(b->prod); + + /* we have to wake up if there is a special event or if we don't have + * any more data to forward and it's not planned to send any more. + */ + if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || + (!b->to_forward && !b->send_max && !b->splice_len) || + si->state != SI_ST_EST || + b->prod->state != SI_ST_EST)) + task_wakeup(si->owner, TASK_WOKEN_IO); + } - out_skip_wakeup: fdtab[fd].ev &= ~FD_POLL_OUT; return retval; - out_stop_write: - /* We can't write anymore. Either the buffer is empty, or we just - * refrain from sending because send_max is reached. Maybe we just - * wrote the last chunk and need to close. - */ - if ((b->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR) && - (si->state == SI_ST_EST)) { - stream_sock_shutw(si); - } else { - if (!b->l && !b->splice_len) - si->flags |= SI_FL_WAIT_DATA; - EV_FD_CLR(fd, DIR_WR); - } - b->wex = TICK_ETERNITY; - goto out_wakeup; - out_error: /* Write error on the file descriptor. We mark the FD as STERROR so * that we don't use it anymore. The error is reported to the stream