diff --git a/src/mux_h1.c b/src/mux_h1.c index 7d04e5d48..9501c5f04 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -13,6 +13,8 @@ #include #include +#include +#include #include #include @@ -40,13 +42,16 @@ #define H1C_F_CS_SHUTW_NOW 0x00002000 /* connection must be shut down for writes ASAP */ #define H1C_F_CS_SHUTW 0x00004000 /* connection is already shut down */ + /* * H1 Stream flags (32 bits) */ -// TODO +#define H1S_F_NONE 0x00000000 +#define H1S_F_ERROR 0x00000001 /* An error occurred on the H1 stream */ +#define H1S_F_MSG_XFERED 0x00000002 /* current message was transferred to the data layer */ + /* H1 connection descriptor */ -//struct h1s; struct h1c { struct connection *conn; struct proxy *px; @@ -59,8 +64,10 @@ struct h1c { struct wait_event wait_event; /* To be used if we're waiting for I/Os */ struct h1s *h1s; /* H1 stream descriptor */ - int timeout; /* idle timeout */ struct task *task; /* timeout management task */ + + int idle_exp; /* expiration date for idle connections, in ticks (client-side only)*/ + int http_exp; /* expiration date for HTTP headers parsing (client-side only) */ }; /* H1 stream descriptor */ @@ -73,6 +80,12 @@ struct h1s { struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */ struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */ + + struct h1m req; + struct h1m res; + + enum http_meth_t meth; /* HTTP resquest method */ + uint16_t status; /* HTTP response status */ }; /* the h1c and h1s pools */ @@ -209,9 +222,25 @@ static struct h1s *h1s_create(struct h1c *h1c) h1s->cs = NULL; h1s->rxbuf = BUF_NULL; + h1s->flags = H1S_F_NONE; h1s->recv_wait = NULL; h1s->send_wait = NULL; + + h1m_init_req(&h1s->req); + h1m_init_res(&h1s->res); + + h1s->status = 0; + h1s->meth = HTTP_METH_OTHER; + + if (!conn_is_back(h1c->conn)) { + if (h1c->px->options2 & PR_O2_REQBUG_OK) + h1s->req.err_pos = -1; + } + else { + if (h1c->px->options2 & PR_O2_RSPBUG_OK) + h1s->res.err_pos = -1; + } end: return h1s; } @@ -253,7 +282,6 @@ static int h1_init(struct connection *conn, struct proxy *proxy) h1c->ibuf = BUF_NULL; h1c->obuf = BUF_NULL; h1c->h1s = NULL; - h1c->timeout = 0; t = task_new(tid_bit); if (!t) @@ -263,6 +291,9 @@ static int h1_init(struct connection *conn, struct proxy *proxy) t->context = h1c; t->expire = TICK_ETERNITY; + h1c->idle_exp = TICK_ETERNITY; + h1c->http_exp = TICK_ETERNITY; + LIST_INIT(&h1c->buf_wait.list); h1c->wait_event.task = tasklet_new(); if (!h1c->wait_event.task) @@ -271,18 +302,25 @@ static int h1_init(struct connection *conn, struct proxy *proxy) h1c->wait_event.task->context = h1c; h1c->wait_event.wait_reason = 0; - conn->mux_ctx = h1c; + /* For backend mux connection, the CS already exists. In such case, + * create h1s and attached the cs to it. + */ if (cs) { - struct h1s *h1s; + struct h1s *h1s = cs->ctx; - h1s = h1s_create(h1c); - if (!h1s) - goto fail; - h1s->cs = cs; - cs->ctx = h1s; + if (!h1s) { + h1s = h1s_create(h1c); + if (!h1s) + goto fail; + cs->ctx = h1s; + h1s->cs = cs; + } } + conn->mux_ctx = h1c; + task_wakeup(t, TASK_WOKEN_INIT); + /* Try to read, if nothing is available yet we'll just subscribe */ if (h1_recv(h1c)) h1_process(h1c); @@ -329,6 +367,9 @@ static void h1_release(struct connection *conn) if (h1c->wait_event.task) tasklet_free(h1c->wait_event.task); + if (h1c->h1s) + h1s_destroy(h1c->h1s); + if (h1c->wait_event.wait_reason != 0) conn->xprt->unsubscribe(conn, h1c->wait_event.wait_reason, &h1c->wait_event); @@ -348,10 +389,243 @@ static void h1_release(struct connection *conn) /******************************************************/ /* functions below are for the H1 protocol processing */ /******************************************************/ -static void h1_process_input(struct h1c *h1c) +/* + * Set the appropriate error message. It first tries to get it from the proxy if + * it exists. Otherwise, it falls back on default one. + */ +static void h1_cpy_error_message(struct h1c *h1c, struct buffer *dst, int status) +{ + const int msgnum = http_get_status_idx(status); + const struct buffer *err; + + err = (h1c->px->errmsg[msgnum].area + ? &h1c->px->errmsg[msgnum] + : &http_err_chunks[msgnum]); + b_putblk(dst, b_head(err), b_data(err)); +} + +/* + * Parse HTTP/1 headers. It returns the number of bytes parsed if > 0, or 0 if + * it couldn't proceed. Parsing errors are reported by setting H1S_F_ERROR flag + * and filling h1s->err_pos and h1s->err_state fields. This functions is + * responsibile to update the parser state . + */ +static size_t h1_process_headers(struct h1s *h1s, struct h1m *h1m, + struct buffer *buf, size_t ofs, size_t max) +{ + struct http_hdr hdrs[MAX_HTTP_HDR]; + union h1_sl sl; + int ret = 0; + + /* Realing input buffer if necessary */ + if (b_head(buf) + b_data(buf) > b_wrap(buf)) + b_slow_realign(buf, trash.area, 0); + + ret = h1_headers_to_hdr_list(b_peek(buf, ofs), b_peek(buf, ofs) + max, + hdrs, sizeof(hdrs)/sizeof(hdrs[0]), h1m, &sl); + if (ret <= 0) { + /* Incomplete or invalid message. If the buffer is full, it's an + * error because headers are too large to be handled by the + * parser. */ + if (ret < 0 || (!ret && b_full(buf))) { + h1s->flags |= H1S_F_ERROR; + h1m->err_state = h1m->state; + h1m->err_pos = h1m->next; + ret = 0; + } + goto end; + } + + /* messages headers fully parsed, do some checks to prepare the body + * parsing. + */ + + /* Be sure to keep some space to do headers rewritting */ + if (ret > (b_size(buf) - global.tune.maxrewrite)) { + h1s->flags |= H1S_F_ERROR; + h1m->err_state = h1m->state; + h1m->err_pos = h1m->next; + ret = 0; + goto end; + } + + /* Save the request's method or the response's status and check if the + * body length is known */ + if (!(h1m->flags & H1_MF_RESP)) { + h1s->meth = sl.rq.meth; + /* Request have always a known length */ + h1m->flags |= H1_MF_XFER_LEN; + if (!(h1m->flags & H1_MF_CHNK) && !h1m->body_len) + h1m->state = H1_MSG_DONE; + } + else { + h1s->status = sl.st.status; + + if ((h1s->meth == HTTP_METH_HEAD) || + (h1s->status >= 100 && h1s->status < 200) || + (h1s->status == 204) || (h1s->status == 304) || + (h1s->meth == HTTP_METH_CONNECT && h1s->status == 200)) { + h1m->flags &= ~(H1_MF_CLEN|H1_MF_CHNK); + h1m->flags |= H1_MF_XFER_LEN; + h1m->curr_len = h1m->body_len = 0; + h1m->state = H1_MSG_DONE; + } + else if (h1m->flags & (H1_MF_CLEN|H1_MF_CHNK)) { + h1m->flags |= H1_MF_XFER_LEN; + if ((h1m->flags & H1_MF_CLEN) && !h1m->body_len) + h1m->state = H1_MSG_DONE; + } + else + h1m->state = H1_MSG_TUNNEL; + } + + end: + return ret; +} + +/* + * Parse HTTP/1 body. It returns the number of bytes parsed if > 0, or 0 if + * it couldn't proceed. Parsing errors are reported by setting H1S_F_ERROR flag + * and filling h1s->err_pos and h1s->err_state fields. This functions is + * responsibile to update the parser state . + */ +static size_t h1_process_data(struct h1s *h1s, struct h1m *h1m, + struct buffer *buf, size_t ofs, size_t max) +{ + size_t total = 0; + int ret = 0; + + if (h1m->flags & H1_MF_XFER_LEN) { + if (h1m->flags & H1_MF_CLEN) { + /* content-length: read only h2m->body_len */ + ret = max; + if ((uint64_t)ret > h1m->curr_len) + ret = h1m->curr_len; + h1m->curr_len -= ret; + total += ret; + if (!h1m->curr_len) + h1m->state = H1_MSG_DONE; + } + else if (h1m->flags & H1_MF_CHNK) { + new_chunk: + /* te:chunked : parse chunks */ + if (h1m->state == H1_MSG_CHUNK_CRLF) { + ret = h1_skip_chunk_crlf(buf, ofs, ofs + max); + if (ret <= 0) + goto end; + max -= ret; + ofs += ret; + total += ret; + h1m->state = H1_MSG_CHUNK_SIZE; + } + + if (h1m->state == H1_MSG_CHUNK_SIZE) { + unsigned int chksz; + + ret = h1_parse_chunk_size(buf, ofs, ofs + max, &chksz); + if (ret <= 0) + goto end; + h1m->curr_len = chksz; + h1m->body_len += chksz; + max -= ret; + ofs += ret; + total += ret; + h1m->state = (!chksz ? H1_MSG_TRAILERS : H1_MSG_DATA); + } + + if (h1m->state == H1_MSG_DATA) { + ret = max; + if (!ret) + goto end; + if ((uint64_t)ret > h1m->curr_len) + ret = h1m->curr_len; + h1m->curr_len -= ret; + max -= ret; + ofs += ret; + total += ret; + if (h1m->curr_len) + goto end; + h1m->state = H1_MSG_CHUNK_CRLF; + goto new_chunk; + } + + if (h1m->state == H1_MSG_TRAILERS) { + ret = h1_measure_trailers(buf, ofs, ofs + max); + if (ret <= 0) + goto end; + max -= ret; + ofs += ret; + total += ret; + h1m->state = H1_MSG_DONE; + } + } + else { + /* XFER_LEN is set but not CLEN nor CHNK, it means there + * is no body. Switch the message in DONE state + */ + h1m->state = H1_MSG_DONE; + } + } + else { + /* no content length, read till SHUTW */ + total = max; + } + + end: + if (ret < 0) { + h1s->flags |= H1S_F_ERROR; + h1m->err_state = h1m->state; + h1m->err_pos = ofs + max + ret; + return 0; + } + + return total; +} + +/* + * Synchronize the request and the response before reseting them. Except for 1xx + * responses, we wait that the request and the response are in DONE state and + * that all data are forwarded for both. For 1xx responses, only the response is + * reset, waiting the final one. Many 1xx messages can be sent. + */ +static void h1_sync_messages(struct h1c *h1c) +{ + if (!h1c->h1s) + return; + + if (h1c->h1s->res.state >= H1_MSG_DONE && + (h1c->h1s->status < 200 && (h1c->h1s->status == 100 || h1c->h1s->status >= 102)) && + ((conn_is_back(h1c->conn) && !b_data(&h1c->obuf)) || !b_data(&h1c->h1s->rxbuf))) { + /* For 100-Continue response or any other informational 1xx + * response which is non-final, don't reset the request, the + * transaction is not finished. We take care the response was + * transferred before. + */ + h1m_init_res(&h1c->h1s->res); + } + else if (!b_data(&h1c->h1s->rxbuf) && !b_data(&h1c->obuf) && + h1c->h1s->req.state >= H1_MSG_DONE && h1c->h1s->res.state >= H1_MSG_DONE) { + h1m_init_req(&h1c->h1s->req); + h1m_init_res(&h1c->h1s->res); + + // TODO: For now, the Keep-alive timeout is handled by the stream. + //if (h1c->task && !conn_is_back(h1c->conn)) + // h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpka); + } +} + +/* + * Process incoming data. It parses data and transfer them from h1c->ibuf into + * h1s->rxbuf. It returns the number of bytes parsed and transferred if > 0, or + * 0 if it couldn't proceed. + */ +static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count) { struct h1s *h1s = h1c->h1s; struct conn_stream *cs = NULL; + struct h1m *h1m; + size_t total = 0; + size_t ret = 0; if (h1c->flags & H1C_F_CS_ERROR) goto end; @@ -360,15 +634,6 @@ static void h1_process_input(struct h1c *h1c) h1s = h1s_create(h1c); if (h1s == NULL) goto err; - - cs = cs_new(h1c->conn); - if (!cs) - goto err; - - h1s->cs = cs; - cs->ctx = h1s; - if (stream_create_from_cs(cs) < 0) - goto err; } if (!h1_get_buf(h1c, &h1s->rxbuf)) { @@ -376,20 +641,85 @@ static void h1_process_input(struct h1c *h1c) goto end; } - b_xfer(&h1s->rxbuf, &h1c->ibuf, b_room(&h1s->rxbuf)); + if (count > b_room(&h1s->rxbuf)) + count = b_room(&h1s->rxbuf); - if (!b_full(&h1c->ibuf)) { - h1c->flags &= ~H1C_F_IN_FULL; - if (!b_data(&h1c->ibuf)) - h1_release_buf(h1c, &h1c->ibuf); + h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res); + while (h1m->state < H1_MSG_DONE && count) { + if (h1m->state <= H1_MSG_LAST_LF) { + if (h1m->state == H1_MSG_RQBEFORE) { + if (h1c->task && !conn_is_back(h1c->conn)) + if (!h1s->cs) + h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpreq); + } + ret = h1_process_headers(h1s, h1m, buf, total, count); + if (!ret) + break; + + /* Create the CS if not already attached to the H1S */ + if (!h1s->cs) { + cs = cs_new(h1c->conn); + if (!cs) + goto err; + h1s->cs = cs; + cs->ctx = h1s; + if (stream_create_from_cs(cs) < 0) + goto err; + } + + if (h1c->task && !conn_is_back(h1c->conn)) + h1c->http_exp = TICK_ETERNITY; + } + else if (h1m->state <= H1_MSG_TRAILERS) { + /* Do not parse the body if the header part is not yet + * transferred to the stream. + */ + if (!(h1s->flags & H1S_F_MSG_XFERED)) + break; + ret = h1_process_data(h1s, h1m, buf, total, count); + if (!ret) + break; + } + else { + h1s->flags |= H1S_F_ERROR; + break; + } + + total += ret; + count -= ret; + + if ((h1s->flags & H1S_F_ERROR)) + break; } + + if (h1s->flags & H1S_F_ERROR) { + /* For now, if an error occurred during the message parsing when + * a stream is already attached to the mux, we transfer + * everything to let the stream handle the error itself. We + * suppose the stream will detect the same error of + * course. Otherwise, we generate the error here. + */ + if (!h1s->cs) { + if (!h1_get_buf(h1c, &h1c->obuf)) { + h1c->flags |= H1C_F_OUT_ALLOC; + goto err; + } + h1_cpy_error_message(h1c, &h1c->obuf, 400); + goto err; + } + total += count; + } + + ret = b_xfer(&h1s->rxbuf, buf, total); + if (b_data(&h1s->rxbuf)) { h1s->cs->flags |= CS_FL_RCV_MORE; if (b_full(&h1s->rxbuf)) h1c->flags |= H1C_F_RX_FULL; } + end: - return; + return ret; err: if (cs) @@ -397,11 +727,21 @@ static void h1_process_input(struct h1c *h1c) if (h1s) h1s_destroy(h1s); h1c->flags |= H1C_F_CS_ERROR; + sess_log(h1c->conn->owner); + ret = 0; goto end; } +/* + * Process outgoing data. It parses data and transfer them from the channel buffer into + * h1c->obuf. It returns the number of bytes parsed and transferred if > 0, or + * 0 if it couldn't proceed. + */ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t count) { + struct h1s *h1s = h1c->h1s; + struct h1m *h1m; + size_t total = 0; size_t ret = 0; if (!h1_get_buf(h1c, &h1c->obuf)) { @@ -411,7 +751,38 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun if (count > b_room(&h1c->obuf)) count = b_room(&h1c->obuf); - ret = b_xfer(&h1c->obuf, buf, count); + h1m = (!conn_is_back(h1c->conn) ? &h1s->res : &h1s->req); + while (h1m->state < H1_MSG_DONE && count) { + if (h1m->state <= H1_MSG_LAST_LF) { + ret = h1_process_headers(h1s, h1m, buf, total, count); + if (!ret) { + /* incomplete or invalid response, this is abnormal coming from + * haproxy and may only result in a bad errorfile or bad Lua code + * so that won't be fixed, raise an error now. + */ + h1s->flags |= H1S_F_ERROR; + break; + } + } + else if (h1m->state <= H1_MSG_TRAILERS) { + ret = h1_process_data(h1s, h1m, buf, total, count); + if (!ret) + break; + } + else { + h1s->flags |= H1S_F_ERROR; + break; + } + + total += ret; + count -= ret; + + if ((h1s->flags & H1S_F_ERROR)) + break; + } + + // TODO: Handle H1S errors + ret = b_xfer(&h1c->obuf, buf, total); if (b_full(&h1c->obuf)) h1c->flags |= H1C_F_OUT_FULL; @@ -419,6 +790,10 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun return ret; } +/* + * Transfer data from h1s->rxbuf into the channel buffer. It returns the number + * of bytes transferred. + */ static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, size_t count) { struct h1c *h1c = h1s->h1c; @@ -435,10 +810,15 @@ static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, size_t count) cs->flags |= CS_FL_RCV_MORE; } else { + if (!(h1s->flags & H1S_F_MSG_XFERED)) + h1s->flags |= H1S_F_MSG_XFERED; + h1c->flags &= ~H1C_F_RX_FULL; h1_release_buf(h1c, &h1s->rxbuf); + h1_sync_messages(h1c); + cs->flags &= ~CS_FL_RCV_MORE; - if (!b_data(&h1c->ibuf) && cs->flags & CS_FL_REOS) + if (!b_data(&h1c->ibuf) && (cs->flags & CS_FL_REOS)) cs->flags |= CS_FL_EOS; } return ret; @@ -513,6 +893,7 @@ static int h1_send(struct h1c *h1c) if (ret > 0) { h1c->flags &= ~H1C_F_OUT_FULL; b_del(&h1c->obuf, ret); + h1_sync_messages(h1c); sent = 1; } @@ -570,8 +951,16 @@ static int h1_process(struct h1c * h1c) { struct connection *conn = h1c->conn; - if (b_data(&h1c->ibuf) && !(h1c->flags & (H1C_F_RX_FULL|H1C_F_RX_ALLOC))) - h1_process_input(h1c); + if (b_data(&h1c->ibuf) && !(h1c->flags & (H1C_F_RX_FULL|H1C_F_RX_ALLOC))) { + size_t ret; + + ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf)); + if (ret > 0) { + h1c->flags &= ~H1C_F_IN_FULL; + if (!b_data(&h1c->ibuf)) + h1_release_buf(h1c, &h1c->ibuf); + } + } h1_send(h1c); @@ -587,8 +976,12 @@ static int h1_process(struct h1c * h1c) } } - if (h1c->task) { - // TODO: update task's timeout and queue it if necessary + if (h1c->task && !conn_is_back(conn)) { + if (!h1c->h1s || !h1c->h1s->cs) + h1c->idle_exp = tick_add_ifset(now_ms, h1c->px->timeout.client); + else + h1c->idle_exp = TICK_ETERNITY; + h1c->task->expire = tick_first(h1c->http_exp, h1c->idle_exp); } return 0; } @@ -624,27 +1017,34 @@ static struct task *h1_timeout_task(struct task *t, void *context, unsigned shor struct h1c *h1c = context; int expired = tick_is_expired(t->expire, now_ms); - if (!expired && h1c) + if (!h1c) + goto end; + + if (!expired) { + /* For now, do not handle timeout for server-side mux */ + if (!conn_is_back(h1c->conn)) + t->expire = tick_first(t->expire, tick_first(h1c->idle_exp, h1c->http_exp)); return t; + } - task_delete(t); - task_free(t); - - if (!h1c) { - /* resources were already deleted */ - return NULL; + if (!(h1c->px->options & PR_O_IGNORE_PRB) && h1_get_buf(h1c, &h1c->obuf)) { + // TODO: do not send error if ka timeout + h1_cpy_error_message(h1c, &h1c->obuf, 408); + h1c->flags |= H1C_F_CS_ERROR; + h1c->idle_exp = TICK_ETERNITY; + h1c->http_exp = TICK_ETERNITY; + t->expire = TICK_ETERNITY; + tasklet_wakeup(h1c->wait_event.task); + sess_log(h1c->conn->owner); + return t; } h1c->task = NULL; - - // TODO - - /* either we can release everything now or it will be done later once - * the stream closes. - */ - if (!h1c->h1s) + if (!h1c->h1s || !h1c->h1s->cs) h1_release(h1c->conn); - + end: + task_delete(t); + task_free(t); return NULL; }