diff --git a/include/haproxy/mux_h1-t.h b/include/haproxy/mux_h1-t.h index 2f49a495e..5f4cd7f5f 100644 --- a/include/haproxy/mux_h1-t.h +++ b/include/haproxy/mux_h1-t.h @@ -134,6 +134,7 @@ enum h1_cs { H1_CS_EMBRYONIC, /* Connection is waiting for the message headers (H1S is not NULL, not attached to a SC - Frontend connection only) */ H1_CS_UPGRADING, /* TCP>H1 upgrade in-progress (H1S is not NULL and attached to a SC - Frontend connection only) */ H1_CS_RUNNING, /* Connection fully established and the H1S is processing data (H1S is not NULL and attached to a SC) */ + H1_CS_DRAINING, /* H1C is draining the message before destroying the H1S (H1S is not NULL but no SC attached) */ H1_CS_CLOSING, /* Send pending outgoing data and close the connection ASAP (H1S may be NULL) */ H1_CS_CLOSED, /* Connection must be closed now and H1C must be released (H1S is NULL) */ H1_CS_ENTRIES, @@ -150,6 +151,7 @@ static inline const char *h1c_st_to_str(enum h1_cs st) case H1_CS_EMBRYONIC: return "EMB"; case H1_CS_UPGRADING: return "UPG"; case H1_CS_RUNNING: return "RUN"; + case H1_CS_DRAINING: return "DRN"; case H1_CS_CLOSING: return "CLI"; case H1_CS_CLOSED: return "CLD"; default: return "???"; diff --git a/src/mux_h1.c b/src/mux_h1.c index 0f91ec016..581e59d72 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -556,11 +556,11 @@ static inline void h1_release_buf(struct h1c *h1c, struct buffer *bptr) } /* Returns 1 if the H1 connection is alive (IDLE, EMBRYONIC, RUNNING or - * RUNNING). Ortherwise 0 is returned. + * DRAINING). Ortherwise 0 is returned. */ static inline int h1_is_alive(const struct h1c *h1c) { - return (h1c->state <= H1_CS_RUNNING); + return (h1c->state <= H1_CS_DRAINING); } /* Switch the H1 connection to CLOSING or CLOSED mode, depending on the output @@ -952,6 +952,10 @@ static int h1s_must_shut_conn(struct h1s *h1s) TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s); ret = 0; } + else if (!(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) { + TRACE_STATE("defer shutdown to drain request first", H1_EV_STRM_SHUT, h1c->conn, h1s); + ret = 0; + } else if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) { TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s); ret = 0; @@ -3485,6 +3489,11 @@ static int h1_handle_internal_err(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } session_inc_http_req_ctr(sess); proxy_inc_fe_req_ctr(sess->listener, sess->fe, 1); _HA_ATOMIC_INC(&sess->fe->fe_counters.p.http.rsp[5]); @@ -3495,6 +3504,7 @@ static int h1_handle_internal_err(struct h1c *h1c) h1c->errcode = 500; ret = h1_send_error(h1c); sess_log(sess); + end: return ret; } @@ -3508,6 +3518,11 @@ static int h1_handle_parsing_error(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) { h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; h1_close(h1c); @@ -3541,6 +3556,11 @@ static int h1_handle_not_impl_err(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) { h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; h1_close(h1c); @@ -3571,6 +3591,11 @@ static int h1_handle_req_tout(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) { h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; h1_close(h1c); @@ -3788,7 +3813,7 @@ static int h1_process(struct h1c * h1c) /* Try to parse now the first block of a request, creating the H1 stream if necessary */ if (b_data(&h1c->ibuf) && /* Input data to be processed */ - (h1c->state < H1_CS_RUNNING) && /* IDLE, EMBRYONIC or UPGRADING */ + ((h1c->state < H1_CS_RUNNING) || (h1c->state == H1_CS_DRAINING)) && /* IDLE, EMBRYONIC, UPGRADING or DRAINING */ !(h1c->flags & (H1C_F_IN_SALLOC|H1C_F_ABRT_PENDING))) { /* No allocation failure on the stream rxbuf and no ERROR on the H1C */ struct h1s *h1s = h1c->h1s; struct buffer *buf; @@ -3799,7 +3824,8 @@ static int h1_process(struct h1c * h1c) goto release; /* First of all handle H1 to H2 upgrade (no need to create the H1 stream) */ - if (!(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */ + if (h1c->state != H1_CS_DRAINING && /* Not draining message */ + !(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */ !(h1c->px->options2 & PR_O2_NO_H2_UPGRADE) && /* H2 upgrade supported by the proxy */ !(conn->mux->flags & MX_FL_NO_UPG)) { /* the current mux supports upgrades */ /* Try to match H2 preface before parsing the request headers. */ @@ -3840,7 +3866,7 @@ static int h1_process(struct h1c * h1c) h1_process_demux(h1c, buf, count); h1_release_buf(h1c, &h1s->rxbuf); h1_set_idle_expiration(h1c); - if (h1c->state < H1_CS_RUNNING) { + if (h1c->state != H1_CS_RUNNING) { // TODO: be sure state cannot change in h1_process_demux if (h1s->flags & H1S_F_INTERNAL_ERROR) { h1_handle_internal_err(h1c); TRACE_ERROR("internal error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR); @@ -3883,6 +3909,11 @@ static int h1_process(struct h1c * h1c) if (h1_send_error(h1c)) h1_send(h1c); } + else if (h1c->state == H1_CS_DRAINING) { + BUG_ON(h1c->h1s->sd && !se_fl_test(h1c->h1s->sd, SE_FL_ORPHAN)); + h1s_destroy(h1c->h1s); + TRACE_STATE("abort/error when draining message. destroy h1s and close h1c", H1_EV_H1S_END, h1c->conn); + } else { h1_close(h1c); TRACE_STATE("close h1c", H1_EV_H1S_END, h1c->conn); @@ -3911,6 +3942,17 @@ static int h1_process(struct h1c * h1c) h1_alert(h1s); } } + else if (h1c->state == H1_CS_DRAINING) { + BUG_ON(!h1c->h1s); + if (se_fl_test(h1c->h1s->sd, SE_FL_EOI)) { + if (h1s_must_shut_conn(h1c->h1s)) { + h1_shutw_conn(conn); + goto release; + } + h1s_finish_detach(h1c->h1s); + goto end; + } + } if (!b_data(&h1c->ibuf)) h1_release_buf(h1c, &h1c->ibuf); @@ -4218,6 +4260,7 @@ static void h1_destroy(void *ctx) static void h1_detach(struct sedesc *sd) { struct h1s *h1s = sd->se; + struct h1c *h1c; TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s); @@ -4225,7 +4268,25 @@ static void h1_detach(struct sedesc *sd) TRACE_LEAVE(H1_EV_STRM_END); return; } - h1s_finish_detach(h1s); + h1c = h1s->h1c; + + if (h1c->state == H1_CS_RUNNING && !(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE) { + h1c->state = H1_CS_DRAINING; + TRACE_DEVEL("Deferring H1S destroy to drain message", H1_EV_STRM_END, h1s->h1c->conn, h1s); + /* If we have a pending data, process it immediately or + * subscribe for reads waiting for new data + */ + if (unlikely(b_data(&h1c->ibuf))) { + if (h1_process(h1c) == -1) + goto end; + } + else + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + h1_set_idle_expiration(h1c); + h1_refresh_timeout(h1c); + } + else + h1s_finish_detach(h1s); end: TRACE_LEAVE(H1_EV_STRM_END);