diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h index dc2b763ca..fe89d988a 100644 --- a/include/proto/stream_sock.h +++ b/include/proto/stream_sock.h @@ -33,7 +33,9 @@ /* main event functions used to move data between sockets and buffers */ int stream_sock_read(int fd); int stream_sock_write(int fd); -int stream_sock_process_data(int fd); +int stream_sock_data_check_errors(int fd); +int stream_sock_data_update(int fd); +int stream_sock_data_finish(int fd); /* This either returns the sockname or the original destination address. Code diff --git a/include/types/buffers.h b/include/types/buffers.h index aee48eee7..bc8a18490 100644 --- a/include/types/buffers.h +++ b/include/types/buffers.h @@ -71,7 +71,7 @@ #define BF_MASK_INTERFACE_O (BF_EMPTY|BF_HIJACK|BF_MAY_FORWARD|BF_SHUTR|BF_SHUTW|BF_SHUTW_NOW) #define BF_MASK_INTERFACE (BF_MASK_INTF_I | BF_MASK_INTF_O) -#define BF_MASK_ANALYSER (BF_FULL|BF_READ_ERROR|BF_READ_TIMEOUT|BF_WRITE_ERROR|BF_SHUTW|BF_SHUTR|BF_READ_NULL) +#define BF_MASK_ANALYSER (BF_FULL|BF_READ_NULL|BF_READ_ERROR|BF_READ_TIMEOUT|BF_SHUTR|BF_WRITE_ERROR) /* Analysers (buffer->analysers). * Those bits indicate that there are some processing to do on the buffer diff --git a/src/proto_http.c b/src/proto_http.c index b0ec9e584..d28506d42 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -660,6 +660,33 @@ void process_session(struct task *t, int *next) unsigned int rqf_srv, rpf_srv; unsigned int rqf_req, rpf_rep; + /* check server-side errors during data phase */ + if (s->req->cons->state == SI_ST_EST) { + stream_sock_data_check_errors(s->req->cons->fd); + /* When a server-side connection is released, we have to + * count it and check for pending connections on this server. + */ + if (unlikely(s->req->cons->state == SI_ST_CLO)) { + /* Count server-side errors (but not timeouts). */ + if (s->req->flags & BF_WRITE_ERROR) { + s->be->failed_resp++; + if (s->srv) + s->srv->failed_resp++; + } + + if (s->srv) { + s->srv->cur_sess--; + sess_change_server(s, NULL); + if (may_dequeue_tasks(s->srv, s->be)) + process_srv_queue(s->srv); + } + } + } + + /* check client-side errors during data phase */ + if (s->rep->cons->state == SI_ST_EST) + stream_sock_data_check_errors(s->rep->cons->fd); + /* force one first pass everywhere */ rqf_cli = rqf_srv = rqf_req = ~s->req->flags; rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags; @@ -667,29 +694,31 @@ void process_session(struct task *t, int *next) do { resync = 0; - if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) || - ((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) { - resync = 1; - if (s->rep->cons->state != SI_ST_CLO) { - stream_sock_process_data(s->rep->cons->fd); + if (s->rep->cons->state != SI_ST_CLO) { + if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) || + ((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) { + resync = 1; + stream_sock_data_update(s->rep->cons->fd); + rqf_cli = s->req->flags; + rpf_cli = s->rep->flags; + if (unlikely((s->rep->cons->state == SI_ST_CLO) && (global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { int len; len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", - s->uniq_id, s->be->id, (unsigned short)s->rep->cons->fd, (unsigned short)s->req->cons->fd); + s->uniq_id, s->be->id, (unsigned short)s->rep->prod->fd, (unsigned short)s->req->cons->fd); write(1, trash, len); } } - rqf_cli = s->req->flags; - rpf_cli = s->rep->flags; } - if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) || - ((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) { - resync = 1; - if (s->req->cons->state != SI_ST_CLO) { + if (s->req->cons->state != SI_ST_CLO) { + if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) || + ((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) { + resync = 1; + if (s->req->cons->state < SI_ST_EST && s->req->flags & BF_MAY_FORWARD) process_srv_conn(s); @@ -704,14 +733,7 @@ void process_session(struct task *t, int *next) buffer_shutw_now(s->req); } - stream_sock_process_data(s->req->cons->fd); - - /* Count server-side errors (but not timeouts). */ - if (s->req->flags & BF_WRITE_ERROR) { - s->be->failed_resp++; - if (s->srv) - s->srv->failed_resp++; - } + stream_sock_data_update(s->req->cons->fd); /* When a server-side connection is released, we have to * count it and check for pending connections on this server. @@ -725,18 +747,18 @@ void process_session(struct task *t, int *next) } } } + rqf_srv = s->req->flags; + rpf_srv = s->rep->flags; if (unlikely((s->req->cons->state == SI_ST_CLO) && (global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { int len; len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n", - s->uniq_id, s->be->id, (unsigned short)s->cli_fd, (unsigned short)s->req->cons->fd); + s->uniq_id, s->be->id, (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd); write(1, trash, len); } } - rqf_srv = s->req->flags; - rpf_srv = s->rep->flags; } if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) { @@ -752,7 +774,8 @@ void process_session(struct task *t, int *next) if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) { resync = 1; /* the analysers must block it themselves */ - s->rep->flags |= BF_MAY_FORWARD; + if (s->req->cons->state >= SI_ST_EST) + s->rep->flags |= BF_MAY_FORWARD; if (s->rep->analysers) { process_response(s); @@ -768,6 +791,12 @@ void process_session(struct task *t, int *next) if ((s->fe->options & PR_O_CONTSTATS) && (s->flags & SN_BE_ASSIGNED)) session_process_counters(s); + if (s->rep->cons->state == SI_ST_EST) + stream_sock_data_finish(s->rep->cons->fd); + + if (s->req->cons->state == SI_ST_EST) + stream_sock_data_finish(s->req->cons->fd); + s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE; s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE; @@ -810,7 +839,7 @@ void process_session(struct task *t, int *next) int len; len = sprintf(trash, "%08x:%s.closed[%04x:%04x] (term_trace=0x%08x)\n", s->uniq_id, s->be->id, - (unsigned short)s->cli_fd, (unsigned short)s->req->cons->fd, + (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd, s->term_trace); write(1, trash, len); } @@ -1673,12 +1702,14 @@ int process_request(struct session *t) struct buffer *req = t->req; struct buffer *rep = t->rep; - DPRINTF(stderr,"[%u] %s: c=%s set(r,w)=%d,%d exp(r,w)=%u,%u req=%08x rep=%08x analysers=%02x\n", + DPRINTF(stderr,"[%u] %s: session=%p b=%p, exp(r,w)=%u,%u bf=%08x bl=%d analysers=%02x\n", now_ms, __FUNCTION__, - cli_stnames[t->cli_state], - t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_RD) : 0, - t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_WR) : 0, - req->rex, rep->wex, req->flags, rep->flags, req->analysers); + t, + req, + req->rex, req->wex, + req->flags, + req->l, + req->analysers); /* The tcp-inspect analyser is always called alone */ if (req->analysers & AN_REQ_INSPECT) { @@ -2692,10 +2723,14 @@ int process_response(struct session *t) struct buffer *req = t->req; struct buffer *rep = t->rep; - DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x analysers=%02x\n", + DPRINTF(stderr,"[%u] %s: session=%p b=%p, exp(r,w)=%u,%u bf=%08x bl=%d analysers=%02x\n", now_ms, __FUNCTION__, - cli_stnames[t->cli_state], - req->rex, rep->wex, req->flags, rep->flags, rep->analysers); + t, + rep, + rep->rex, rep->wex, + rep->flags, + rep->l, + rep->analysers); if (rep->analysers & AN_RTR_HTTP_HDR) { /* receiving server headers */ /* @@ -2838,7 +2873,7 @@ int process_response(struct session *t) return 0; } /* write error to client, or close from server */ - else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTW|BF_SHUTR|BF_READ_NULL)) { + else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR|BF_READ_NULL)) { buffer_shutr_now(rep); buffer_shutw_now(req); //fd_delete(req->cons->fd); @@ -3146,7 +3181,7 @@ int process_response(struct session *t) #ifdef CONFIG_HAP_TCPSPLICE if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) { /* TCP splicing supported by both FE and BE */ - tcp_splice_splicefd(t->cli_fd, req->cons->fd, 0); + tcp_splice_splicefd(rep->cons->fd, rep->prod->fd, 0); } #endif /* if the user wants to log as soon as possible, without counting @@ -3556,7 +3591,7 @@ int tcp_connection_status(struct session *t) #ifdef CONFIG_HAP_TCPSPLICE if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) { /* TCP splicing supported by both FE and BE */ - tcp_splice_splicefd(t->cli_fd, req->cons->fd, 0); + tcp_splice_splicefd(req->prod->fd, req->cons->fd, 0); } #endif } @@ -5262,7 +5297,7 @@ int stats_check_uri_auth(struct session *t, struct proxy *backend) /* The request is valid, the user is authenticated. Let's start sending * data. */ - EV_FD_CLR(t->cli_fd, DIR_RD); + EV_FD_CLR(t->req->prod->fd, DIR_RD); buffer_shutr(t->req); buffer_shutr(t->rep); buffer_set_rlim(t->req, BUFSIZE); /* no more rewrite needed */ @@ -5282,7 +5317,7 @@ void debug_hdr(const char *dir, struct session *t, const char *start, const char { int len, max; len = sprintf(trash, "%08x:%s.%s[%04x:%04x]: ", t->uniq_id, t->be->id, - dir, (unsigned short)t->cli_fd, (unsigned short)t->req->cons->fd); + dir, (unsigned short)t->req->prod->fd, (unsigned short)t->req->cons->fd); max = end - start; UBOUND(max, sizeof(trash) - len - 1); len += strlcpy2(trash + len, start, max + 1); diff --git a/src/stream_sock.c b/src/stream_sock.c index a08bf9b0e..3f4be6724 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -417,12 +417,11 @@ int stream_sock_write(int fd) { /* - * Manages a stream_sock connection during its data phase. The file descriptor - * status is checked, and the read and write timeouts are controlled. The - * buffers are examined for special shutdown cases and finally the timeouts, - * file descriptor and buffers' flags are updated accordingly. + * This function only has to be called once after a wakeup event during a data + * phase. It controls the file descriptor's status, as well as read and write + * timeouts. */ -int stream_sock_process_data(int fd) +int stream_sock_data_check_errors(int fd) { struct buffer *ib = fdtab[fd].cb[DIR_RD].b; struct buffer *ob = fdtab[fd].cb[DIR_WR].b; @@ -436,7 +435,7 @@ int stream_sock_process_data(int fd) ib->l, ob->l); /* Read or write error on the file descriptor */ - if (fdtab[fd].state == FD_STERROR) { + if (unlikely(fdtab[fd].state == FD_STERROR)) { //trace_term(t, TT_HTTP_SRV_6); if (!ob->cons->err_type) { //ob->cons->err_loc = t->srv; @@ -453,30 +452,114 @@ int stream_sock_process_data(int fd) return 0; } + /* Read timeout */ + if (unlikely(!(ib->flags & (BF_SHUTR|BF_READ_TIMEOUT)) && tick_is_expired(ib->rex, now_ms))) { + //trace_term(t, TT_HTTP_SRV_12); + ib->flags |= BF_READ_TIMEOUT; + if (!ob->cons->err_type) { + //ob->cons->err_loc = t->srv; + ob->cons->err_type = SI_ET_DATA_TO; + } + buffer_shutr(ib); + if (ob->flags & BF_SHUTW) + goto do_close_and_return; + EV_FD_CLR(fd, DIR_RD); + } + + /* Write timeout */ + if (unlikely(!(ob->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) && tick_is_expired(ob->wex, now_ms))) { + //trace_term(t, TT_HTTP_SRV_13); + ob->flags |= BF_WRITE_TIMEOUT; + if (!ob->cons->err_type) { + //ob->cons->err_loc = t->srv; + ob->cons->err_type = SI_ET_DATA_TO; + } + buffer_shutw(ob); + if (ib->flags & BF_SHUTR) + goto do_close_and_return; + + EV_FD_CLR(fd, DIR_WR); + shutdown(fd, SHUT_WR); + } + return 0; +} + +/* + * Manages a stream_sock connection during its data phase. The buffers are + * examined for various cases of shutdown, then file descriptor and buffers' + * flags are updated accordingly. + */ +int stream_sock_data_update(int fd) +{ + struct buffer *ib = fdtab[fd].cb[DIR_RD].b; + struct buffer *ob = fdtab[fd].cb[DIR_WR].b; + + DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n", + now_ms, __FUNCTION__, + fd, fdtab[fd].owner, + ib, ob, + ib->rex, ob->wex, + ib->flags, ob->flags, + ib->l, ob->l); + /* Check if we need to close the read side */ if (!(ib->flags & BF_SHUTR)) { /* Last read, forced read-shutdown, or other end closed */ if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) { //trace_term(t, TT_HTTP_SRV_10); - do_close_read: buffer_shutr(ib); - if (ob->flags & BF_SHUTW) - goto do_close_and_return; - + if (ob->flags & BF_SHUTW) { + fd_delete(fd); + ob->cons->state = SI_ST_CLO; + return 0; + } EV_FD_CLR(fd, DIR_RD); } - /* Read timeout */ - else if (unlikely(!(ib->flags & BF_READ_TIMEOUT) && tick_is_expired(ib->rex, now_ms))) { - //trace_term(t, TT_HTTP_SRV_12); - ib->flags |= BF_READ_TIMEOUT; - if (!ob->cons->err_type) { - //ob->cons->err_loc = t->srv; - ob->cons->err_type = SI_ET_DATA_TO; + } + + /* Check if we need to close the write side */ + if (!(ob->flags & BF_SHUTW)) { + /* Forced write-shutdown or other end closed with empty buffer. */ + if ((ob->flags & BF_SHUTW_NOW) || + (ob->flags & (BF_EMPTY|BF_HIJACK|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) { + //trace_term(t, TT_HTTP_SRV_11); + buffer_shutw(ob); + if (ib->flags & BF_SHUTR) { + fd_delete(fd); + ob->cons->state = SI_ST_CLO; + return 0; } - goto do_close_read; + EV_FD_CLR(fd, DIR_WR); + shutdown(fd, SHUT_WR); } + } + return 0; /* other cases change nothing */ +} + + +/* + * Updates a connected stream_sock file descriptor status and timeouts + * according to the buffers' flags. It should only be called once after the + * buffer flags have settled down, and before they are cleared. It doesn't + * harm to call it as often as desired (it just slightly hurts performance). + */ +int stream_sock_data_finish(int fd) +{ + struct buffer *ib = fdtab[fd].cb[DIR_RD].b; + struct buffer *ob = fdtab[fd].cb[DIR_WR].b; + + DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n", + now_ms, __FUNCTION__, + fd, fdtab[fd].owner, + ib, ob, + ib->rex, ob->wex, + ib->flags, ob->flags, + ib->l, ob->l); + + /* Check if we need to close the read side */ + if (!(ib->flags & BF_SHUTR)) { /* Read not closed, update FD status and timeout for reads */ - else if (ib->flags & (BF_FULL|BF_HIJACK)) { + if (ib->flags & (BF_FULL|BF_HIJACK)) { /* stop reading */ EV_FD_COND_C(fd, DIR_RD); ib->rex = TICK_ETERNITY; @@ -494,30 +577,9 @@ int stream_sock_process_data(int fd) /* Check if we need to close the write side */ if (!(ob->flags & BF_SHUTW)) { - /* Forced write-shutdown or other end closed with empty buffer. */ - if ((ob->flags & BF_SHUTW_NOW) || - (ob->flags & (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) { - //trace_term(t, TT_HTTP_SRV_11); - do_close_write: - buffer_shutw(ob); - if (ib->flags & BF_SHUTR) - goto do_close_and_return; - - EV_FD_CLR(fd, DIR_WR); - shutdown(fd, SHUT_WR); - } - /* Write timeout */ - else if (unlikely(!(ob->flags & BF_WRITE_TIMEOUT) && tick_is_expired(ob->wex, now_ms))) { - //trace_term(t, TT_HTTP_SRV_13); - ob->flags |= BF_WRITE_TIMEOUT; - if (!ob->cons->err_type) { - //ob->cons->err_loc = t->srv; - ob->cons->err_type = SI_ET_DATA_TO; - } - goto do_close_write; - } /* Write not closed, update FD status and timeout for writes */ - else if ((ob->flags & (BF_EMPTY|BF_MAY_FORWARD)) != BF_MAY_FORWARD) { + if ((ob->flags & BF_EMPTY) || + (ob->flags & (BF_HIJACK|BF_MAY_FORWARD)) == 0) { /* stop writing */ EV_FD_COND_C(fd, DIR_WR); ob->wex = TICK_ETERNITY; @@ -541,7 +603,7 @@ int stream_sock_process_data(int fd) } } } - return 0; /* other cases change nothing */ + return 0; }