diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h index fe89d988a..e104054f3 100644 --- a/include/proto/stream_sock.h +++ b/include/proto/stream_sock.h @@ -33,7 +33,7 @@ /* main event functions used to move data between sockets and buffers */ int stream_sock_read(int fd); int stream_sock_write(int fd); -int stream_sock_data_check_errors(int fd); +int stream_sock_data_check_timeouts(int fd); int stream_sock_data_update(int fd); int stream_sock_data_finish(int fd); diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index edfb7583b..2525f1237 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -42,24 +42,26 @@ enum { /* error types reported on the streams interface for more accurate reporting */ enum { - SI_ET_NONE = 0, /* no error yet, leave it to zero */ - SI_ET_QUEUE_TO, /* queue timeout */ - SI_ET_QUEUE_ERR, /* queue error (eg: full) */ - SI_ET_QUEUE_ABRT, /* aborted in queue by external cause */ - SI_ET_CONN_TO, /* connection timeout */ - SI_ET_CONN_ERR, /* connection error (eg: no server available) */ - SI_ET_CONN_ABRT, /* connection aborted by external cause (eg: abort) */ - SI_ET_CONN_OTHER, /* connection aborted for other reason (eg: 500) */ - SI_ET_DATA_TO, /* timeout during data phase */ - SI_ET_DATA_ERR, /* error during data phase */ - SI_ET_DATA_ABRT, /* data phase aborted by external cause */ + SI_ET_NONE = 0x0000, /* no error yet, leave it to zero */ + SI_ET_QUEUE_TO = 0x0001, /* queue timeout */ + SI_ET_QUEUE_ERR = 0x0002, /* queue error (eg: full) */ + SI_ET_QUEUE_ABRT = 0x0004, /* aborted in queue by external cause */ + SI_ET_CONN_TO = 0x0008, /* connection timeout */ + SI_ET_CONN_ERR = 0x0010, /* connection error (eg: no server available) */ + SI_ET_CONN_ABRT = 0x0020, /* connection aborted by external cause (eg: abort) */ + SI_ET_CONN_OTHER = 0x0040, /* connection aborted for other reason (eg: 500) */ + SI_ET_DATA_TO = 0x0080, /* timeout during data phase */ + SI_ET_DATA_ERR = 0x0100, /* error during data phase */ + SI_ET_DATA_ABRT = 0x0200, /* data phase aborted by external cause */ }; struct stream_interface { unsigned int state; /* SI_ST* */ - int err_type; /* first error detected, one of SI_ET_* */ - void *err_loc; /* commonly the server, NULL when SI_ET_NONE */ + unsigned int prev_state;/* SI_ST*, copy of previous state */ + void *owner; /* generally a (struct task*) */ int fd; /* file descriptor for a stream driver when known */ + unsigned int err_type; /* first error detected, one of SI_ET_* */ + void *err_loc; /* commonly the server, NULL when SI_ET_NONE */ }; diff --git a/src/backend.c b/src/backend.c index f51ac8fc3..16b2cc9df 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1805,7 +1805,7 @@ int connect_server(struct session *s) } } - fdtab[fd].owner = s->task; + fdtab[fd].owner = s->req->cons; fdtab[fd].state = FD_STCONN; /* connection in progress */ fdtab[fd].cb[DIR_RD].f = &stream_sock_read; fdtab[fd].cb[DIR_RD].b = s->rep; diff --git a/src/client.c b/src/client.c index 1f577d1be..ef1ee098b 100644 --- a/src/client.c +++ b/src/client.c @@ -173,12 +173,14 @@ int event_accept(int fd) { s->si[0].state = SI_ST_EST; s->si[0].err_type = SI_ET_NONE; s->si[0].err_loc = NULL; + s->si[0].owner = t; s->si[0].fd = cfd; s->cli_fd = cfd; s->si[1].state = SI_ST_INI; s->si[1].err_type = SI_ET_NONE; s->si[1].err_loc = NULL; + s->si[1].owner = t; s->si[1].fd = -1; /* just to help with debugging */ s->srv = s->prev_srv = s->srv_conn = NULL; @@ -373,7 +375,7 @@ int event_accept(int fd) { t->expire = TICK_ETERNITY; fd_insert(cfd); - fdtab[cfd].owner = t; + fdtab[cfd].owner = &s->si[0]; fdtab[cfd].listener = l; fdtab[cfd].state = FD_STREADY; fdtab[cfd].cb[DIR_RD].f = l->proto->read; diff --git a/src/proto_http.c b/src/proto_http.c index bc8d3fbf2..b3db20dd8 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -660,40 +660,151 @@ void process_session(struct task *t, int *next) unsigned int rqf_srv, rpf_srv; unsigned int rqf_req, rpf_rep; - /* check server-side errors during data phase */ - if (s->req->cons->state == SI_ST_EST) { - stream_sock_data_check_errors(s->req->cons->fd); - /* When a server-side connection is released, we have to - * count it and check for pending connections on this server. - */ - if (unlikely(s->req->cons->state == SI_ST_CLO)) { - /* Count server-side errors (but not timeouts). */ - if (s->req->flags & BF_WRITE_ERROR) { - s->be->failed_resp++; - if (s->srv) - s->srv->failed_resp++; - } + /* Check timeouts only during data phase for now */ + if (unlikely(t->state & TASK_WOKEN_TIMER)) { + if (s->rep->cons->state == SI_ST_EST) + stream_sock_data_check_timeouts(s->rep->cons->fd); - if (s->srv) { - s->srv->cur_sess--; - sess_change_server(s, NULL); - if (may_dequeue_tasks(s->srv, s->be)) - process_srv_queue(s->srv); - } + if (s->req->cons->state == SI_ST_EST) + stream_sock_data_check_timeouts(s->req->cons->fd); + } + + /* When a server-side connection is released, we have to + * count it and check for pending connections on this server. + */ + if (unlikely(s->req->cons->state == SI_ST_CLO && + s->req->cons->prev_state == SI_ST_EST)) { + /* Count server-side errors (but not timeouts). */ + if (s->req->flags & BF_WRITE_ERROR) { + s->be->failed_resp++; + if (s->srv) + s->srv->failed_resp++; + } + + if (s->srv) { + s->srv->cur_sess--; + sess_change_server(s, NULL); + if (may_dequeue_tasks(s->srv, s->be)) + process_srv_queue(s->srv); + } + + if (unlikely((s->req->cons->state == SI_ST_CLO) && + (global.mode & MODE_DEBUG) && + (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { + int len; + len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n", + s->uniq_id, s->be->id, (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd); + write(1, trash, len); } } - /* check client-side errors during data phase */ - if (s->rep->cons->state == SI_ST_EST) - stream_sock_data_check_errors(s->rep->cons->fd); + if (unlikely(s->rep->cons->state == SI_ST_CLO && + s->rep->cons->prev_state == SI_ST_EST)) { + if (unlikely((s->rep->cons->state == SI_ST_CLO) && + (global.mode & MODE_DEBUG) && + (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { + int len; + len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", + s->uniq_id, s->be->id, (unsigned short)s->rep->prod->fd, (unsigned short)s->req->cons->fd); + write(1, trash, len); + } + } - /* force one first pass everywhere */ + + /* Check if we need to close the write side. This can only happen + * when either SHUTR or EMPTY appears, because WRITE_ENA cannot appear + * from low level, and neither HIJACK nor SHUTW can disappear from low + * level. Later, this should move to stream_sock_{read,write}. + */ + if ((s->req->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) { + buffer_shutw(s->req); + if (s->rep->flags & BF_SHUTR) { + fd_delete(s->req->cons->fd); + s->req->cons->state = SI_ST_CLO; + } + else { + EV_FD_CLR(s->req->cons->fd, DIR_WR); + shutdown(s->req->cons->fd, SHUT_WR); + } + } + + /* Check if we need to close the write side */ + if ((s->rep->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) { + buffer_shutw(s->rep); + if (s->req->flags & BF_SHUTR) { + fd_delete(s->rep->cons->fd); + s->rep->cons->state = SI_ST_CLO; + } + else { + EV_FD_CLR(s->rep->cons->fd, DIR_WR); + shutdown(s->rep->cons->fd, SHUT_WR); + } + } + + + + /* Dirty trick: force one first pass everywhere */ rqf_cli = rqf_srv = rqf_req = ~s->req->flags; rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags; + /* well, the ST_CONN state is already handled properly */ + if (s->req->prod->state == SI_ST_EST) { + rqf_cli = s->req->flags; + rpf_cli = s->rep->flags; + } + + if (s->req->cons->state == SI_ST_EST) { + rqf_srv = s->req->flags; + rpf_srv = s->rep->flags; + } + do { + DPRINTF(stderr,"[%u] %s: task=%p rq=%p, rp=%p, exp(r,w)=%u,%u rqf=%08x rpf=%08x rql=%d rpl=%d cs=%d ss=%d\n", + now_ms, __FUNCTION__, + t, + s->req, s->rep, + s->req->rex, s->rep->wex, + s->req->flags, s->rep->flags, + s->req->l, s->rep->l, s->rep->cons->state, s->req->cons->state); + resync = 0; + /* Analyse request */ + if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) { + if (s->req->prod->state >= SI_ST_EST) { + resync = 1; + /* it's up to the analysers to reset write_ena */ + buffer_write_ena(s->req); + if (s->req->analysers) + process_request(s); + rqf_req = s->req->flags; + } + + } + + /* Analyse response */ + if (unlikely(s->rep->flags & BF_HIJACK)) { + /* In inject mode, we wake up everytime something has + * happened on the write side of the buffer. + */ + if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) && + !(s->rep->flags & BF_FULL)) { + if (produce_content(s) != 0) + resync = 1; /* completed, better re-check flags */ + } + } + else if (s->rep->prod->state >= SI_ST_EST) { + if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) { + resync = 1; + /* it's up to the analysers to reset write_ena */ + buffer_write_ena(s->rep); + if (s->rep->analysers) + process_response(s); + rpf_rep = s->rep->flags; + } + } + + /* Maybe resync client FD state */ if (s->rep->cons->state != SI_ST_CLO) { if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) || ((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) { @@ -713,7 +824,7 @@ void process_session(struct task *t, int *next) } } - + /* Maybe resync server FD state */ if (s->req->cons->state != SI_ST_CLO) { if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) || ((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) { @@ -761,38 +872,6 @@ void process_session(struct task *t, int *next) } } - if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) { - /* the analysers must block it themselves */ - if (s->req->prod->state >= SI_ST_EST) { - resync = 1; - buffer_write_ena(s->req); - if (s->req->analysers) - process_request(s); - } - rqf_req = s->req->flags; - } - - if (unlikely(s->rep->flags & BF_HIJACK)) { - /* In inject mode, we wake up everytime something has - * happened on the write side of the buffer. - */ - if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) && - !(s->rep->flags & BF_FULL)) { - if (produce_content(s) != 0) - resync = 1; /* completed, better re-check flags */ - } - } - else if (s->rep->prod->state >= SI_ST_EST) { - if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) { - /* the analysers must block it themselves */ - resync = 1; - buffer_write_ena(s->rep); - if (s->rep->analysers) - process_response(s); - rpf_rep = s->rep->flags; - } - } - } while (resync); if (likely((s->rep->cons->state != SI_ST_CLO) || @@ -809,6 +888,8 @@ void process_session(struct task *t, int *next) s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE; s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE; + s->si[0].prev_state = s->si[0].state; + s->si[1].prev_state = s->si[1].state; /* Trick: if a request is being waiting for the server to respond, * and if we know the server can timeout, we don't want the timeout @@ -1766,7 +1847,7 @@ int process_request(struct session *t) * - if one rule returns KO, then return KO */ - if (req->flags & (BF_READ_NULL | BF_SHUTR) || tick_is_expired(req->analyse_exp, now_ms)) + if (req->flags & BF_SHUTR || tick_is_expired(req->analyse_exp, now_ms)) partial = 0; else partial = ACL_PARTIAL; @@ -1921,7 +2002,7 @@ int process_request(struct session *t) } /* 4: have we encountered a close ? */ - else if (req->flags & (BF_READ_NULL | BF_SHUTR)) { + else if (req->flags & BF_SHUTR) { txn->status = 400; client_retnclose(t, error_message(t, HTTP_ERR_400)); msg->msg_state = HTTP_MSG_ERROR; @@ -2607,7 +2688,7 @@ int process_request(struct session *t) * timeout. We just have to check that the client is still * there and that the timeout has not expired. */ - if ((req->flags & (BF_READ_NULL|BF_READ_ERROR)) == 0 && + if ((req->flags & (BF_SHUTR|BF_READ_ERROR)) == 0 && !tick_is_expired(req->analyse_exp, now_ms)) return 0; @@ -2690,7 +2771,7 @@ int process_request(struct session *t) * buffer closed). */ if (req->l - body >= limit || /* enough bytes! */ - req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_NULL | BF_READ_TIMEOUT) || + req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_TIMEOUT) || tick_is_expired(req->analyse_exp, now_ms)) { /* The situation will not evolve, so let's give up on the analysis. */ t->logs.tv_request = now; /* update the request timer to reflect full request */ @@ -2887,7 +2968,7 @@ int process_response(struct session *t) return 0; } /* write error to client, or close from server */ - else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR|BF_READ_NULL)) { + else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR)) { buffer_shutr_now(rep); buffer_shutw_now(req); //fd_delete(req->cons->fd); diff --git a/src/stream_sock.c b/src/stream_sock.c index 52860eaee..14a8df272 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -42,6 +42,7 @@ int stream_sock_read(int fd) { __label__ out_wakeup, out_shutdown_r, out_error; struct buffer *b = fdtab[fd].cb[DIR_RD].b; + struct stream_interface *si = fdtab[fd].owner; int ret, max, retval, cur_read; int read_poll = MAX_READ_POLL_LOOPS; @@ -239,16 +240,21 @@ int stream_sock_read(int fd) { if (!(b->flags & BF_READ_ACTIVITY)) goto out_skip_wakeup; out_wakeup: - task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO); + task_wakeup(si->owner, TASK_WOKEN_IO); out_skip_wakeup: fdtab[fd].ev &= ~FD_POLL_IN; return retval; out_shutdown_r: + /* we received a shutdown */ fdtab[fd].ev &= ~FD_POLL_HUP; b->flags |= BF_READ_NULL; - b->rex = TICK_ETERNITY; + buffer_shutr(b); + /* Maybe we have to completely close the socket */ + if (fdtab[fd].cb[DIR_WR].b->flags & BF_SHUTW) + goto do_close_and_return; + EV_FD_CLR(fd, DIR_RD); goto out_wakeup; out_error: @@ -258,7 +264,27 @@ int stream_sock_read(int fd) { fdtab[fd].state = FD_STERROR; fdtab[fd].ev &= ~FD_POLL_STICKY; b->rex = TICK_ETERNITY; - goto out_wakeup; + + /* Read error on the file descriptor. We close the FD and set + * the error on both buffers. + * Note: right now we only support connected sockets. + */ + if (si->state != SI_ST_EST) + goto out_wakeup; + + if (!si->err_type) + si->err_type = SI_ET_DATA_ERR; + + buffer_shutr(fdtab[fd].cb[DIR_RD].b); + fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR; + buffer_shutw(fdtab[fd].cb[DIR_WR].b); + fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR; + + do_close_and_return: + fd_delete(fd); + si->state = SI_ST_CLO; + task_wakeup(si->owner, TASK_WOKEN_IO); + return 1; } @@ -271,6 +297,7 @@ int stream_sock_read(int fd) { int stream_sock_write(int fd) { __label__ out_wakeup, out_error; struct buffer *b = fdtab[fd].cb[DIR_WR].b; + struct stream_interface *si = fdtab[fd].owner; int ret, max, retval; int write_poll = MAX_WRITE_POLL_LOOPS; @@ -411,7 +438,7 @@ int stream_sock_write(int fd) { if (!(b->flags & BF_WRITE_ACTIVITY)) goto out_skip_wakeup; out_wakeup: - task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO); + task_wakeup(si->owner, TASK_WOKEN_IO); out_skip_wakeup: fdtab[fd].ev &= ~FD_POLL_OUT; @@ -424,7 +451,25 @@ int stream_sock_write(int fd) { fdtab[fd].state = FD_STERROR; fdtab[fd].ev &= ~FD_POLL_STICKY; b->wex = TICK_ETERNITY; - goto out_wakeup; + /* Read error on the file descriptor. We close the FD and set + * the error on both buffers. + * Note: right now we only support connected sockets. + */ + if (si->state != SI_ST_EST) + goto out_wakeup; + + if (!si->err_type) + si->err_type = SI_ET_DATA_ERR; + + buffer_shutr(fdtab[fd].cb[DIR_RD].b); + fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR; + buffer_shutw(fdtab[fd].cb[DIR_WR].b); + fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR; + + fd_delete(fd); + si->state = SI_ST_CLO; + task_wakeup(si->owner, TASK_WOKEN_IO); + return 1; } @@ -433,7 +478,7 @@ int stream_sock_write(int fd) { * phase. It controls the file descriptor's status, as well as read and write * timeouts. */ -int stream_sock_data_check_errors(int fd) +int stream_sock_data_check_timeouts(int fd) { struct buffer *ib = fdtab[fd].cb[DIR_RD].b; struct buffer *ob = fdtab[fd].cb[DIR_WR].b; @@ -446,24 +491,6 @@ int stream_sock_data_check_errors(int fd) ib->flags, ob->flags, ib->l, ob->l); - /* Read or write error on the file descriptor */ - if (unlikely(fdtab[fd].state == FD_STERROR)) { - //trace_term(t, TT_HTTP_SRV_6); - if (!ob->cons->err_type) { - //ob->cons->err_loc = t->srv; - ob->cons->err_type = SI_ET_DATA_ERR; - } - buffer_shutw(ob); - ob->flags |= BF_WRITE_ERROR; - buffer_shutr(ib); - ib->flags |= BF_READ_ERROR; - - do_close_and_return: - fd_delete(fd); - ob->cons->state = SI_ST_CLO; - return 0; - } - /* Read timeout */ if (unlikely(!(ib->flags & (BF_SHUTR|BF_READ_TIMEOUT)) && tick_is_expired(ib->rex, now_ms))) { //trace_term(t, TT_HTTP_SRV_12); @@ -473,8 +500,13 @@ int stream_sock_data_check_errors(int fd) ob->cons->err_type = SI_ET_DATA_TO; } buffer_shutr(ib); - if (ob->flags & BF_SHUTW) - goto do_close_and_return; + if (ob->flags & BF_SHUTW) { + do_close_and_return: + fd_delete(fd); + ob->cons->state = SI_ST_CLO; + return 0; + } + EV_FD_CLR(fd, DIR_RD); } @@ -506,18 +538,18 @@ int stream_sock_data_update(int fd) struct buffer *ib = fdtab[fd].cb[DIR_RD].b; struct buffer *ob = fdtab[fd].cb[DIR_WR].b; - DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n", + DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n", now_ms, __FUNCTION__, fd, fdtab[fd].owner, ib, ob, ib->rex, ob->wex, ib->flags, ob->flags, - ib->l, ob->l); + ib->l, ob->l, ob->cons->state); /* Check if we need to close the read side */ if (!(ib->flags & BF_SHUTR)) { /* Last read, forced read-shutdown, or other end closed */ - if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) { + if (ib->flags & (BF_SHUTR_NOW|BF_SHUTW)) { //trace_term(t, TT_HTTP_SRV_10); buffer_shutr(ib); if (ob->flags & BF_SHUTW) { @@ -560,13 +592,13 @@ int stream_sock_data_finish(int fd) struct buffer *ib = fdtab[fd].cb[DIR_RD].b; struct buffer *ob = fdtab[fd].cb[DIR_WR].b; - DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n", + DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n", now_ms, __FUNCTION__, fd, fdtab[fd].owner, ib, ob, ib->rex, ob->wex, ib->flags, ob->flags, - ib->l, ob->l); + ib->l, ob->l, ob->cons->state); /* Check if we need to close the read side */ if (!(ib->flags & BF_SHUTR)) {