MINOR: mux-h1: Manage processing blocking flags on the H1 stream

Because H1C_F_RX_BLK and H1C_F_TX_BLK flags now only concerns data
processing, at the H1 stream level, there is no reason to still manage them
on the H1 connection. Thus, these flags are now set on the H1 stream.
This commit is contained in:
Christopher Faulet 2021-04-07 14:18:11 +02:00
parent 14ee9b8c8b
commit 10a8670f28

View File

@ -62,17 +62,18 @@
#define H1C_F_UPG_H2C 0x00080000 /* set if an upgrade to h2 should be done */
#define H1C_F_CO_MSG_MORE 0x00100000 /* set if CO_SFL_MSG_MORE must be set when calling xprt->snd_buf() */
#define H1C_F_CO_STREAMER 0x00200000 /* set if CO_SFL_STREAMER must be set when calling xprt->snd_buf() */
#define H1C_F_RX_BLK 0x00400000 /* Don't process more input data, waiting sync with output side */
#define H1C_F_TX_BLK 0x00800000 /* Don't process more output data, waiting sync with input side */
/* 0x01000000 - 0x40000000 unusued*/
/* 0x00400000 - 0x40000000 unusued*/
#define H1C_F_IS_BACK 0x80000000 /* Set on outgoing connection */
/*
* H1 Stream flags (32 bits)
*/
#define H1S_F_NONE 0x00000000
/* 0x00000001..0x00000004 unused */
#define H1S_F_RX_BLK 0x00100000 /* Don't process more input data, waiting sync with output side */
#define H1S_F_TX_BLK 0x00200000 /* Don't process more output data, waiting sync with input side */
/* 0x00000004 unused */
#define H1S_F_REOS 0x00000008 /* End of input stream seen even if not delivered yet */
#define H1S_F_WANT_KAL 0x00000010
#define H1S_F_WANT_TUN 0x00000020
@ -697,6 +698,7 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s
if (!h1s)
goto fail;
h1s->flags |= H1S_F_RX_BLK;
h1s->cs = cs;
h1s->sess = sess;
cs->ctx = h1s;
@ -727,7 +729,7 @@ static void h1s_destroy(struct h1s *h1s)
h1_release_buf(h1c, &h1s->rxbuf);
h1c->flags &= ~(H1C_F_TX_BLK|H1C_F_RX_BLK|H1C_F_WANT_SPLICE|
h1c->flags &= ~(H1C_F_WANT_SPLICE|
H1C_F_ST_EMBRYONIC|H1C_F_ST_ATTACHED|H1C_F_ST_READY|
H1C_F_OUT_FULL|H1C_F_OUT_ALLOC|H1C_F_IN_SALLOC|
H1C_F_CO_MSG_MORE|H1C_F_CO_STREAMER);
@ -793,7 +795,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
h1c->idle_exp = TICK_ETERNITY;
if (conn_is_back(conn)) {
h1c->flags |= (H1C_F_IS_BACK|H1C_F_RX_BLK);
h1c->flags |= H1C_F_IS_BACK;
h1c->shut_timeout = h1c->timeout = proxy->timeout.server;
if (tick_isset(proxy->timeout.serverfin))
h1c->shut_timeout = proxy->timeout.serverfin;
@ -1284,15 +1286,15 @@ static void h1_set_tunnel_mode(struct h1s *h1s)
TRACE_STATE("switch H1 stream in tunnel mode", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s);
if (h1c->flags & H1C_F_RX_BLK) {
h1c->flags &= ~H1C_F_RX_BLK;
if (h1s->flags & H1S_F_RX_BLK) {
h1s->flags &= ~H1S_F_RX_BLK;
h1_wake_stream_for_recv(h1s);
TRACE_STATE("Re-enable input processing on h1c", H1_EV_RX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
TRACE_STATE("Re-enable input processing", H1_EV_RX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1s);
}
if (h1c->flags & H1C_F_TX_BLK) {
h1c->flags &= ~H1C_F_TX_BLK;
if (h1s->flags & H1S_F_TX_BLK) {
h1s->flags &= ~H1S_F_TX_BLK;
h1_wake_stream_for_send(h1s);
TRACE_STATE("Re-enable output processing on h1c", H1_EV_TX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
TRACE_STATE("Re-enable output processing", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1s);
}
}
@ -1518,7 +1520,7 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
if (h1s->flags & (H1S_F_PARSING_ERROR|H1S_F_NOT_IMPL_ERROR))
goto end;
if (h1c->flags & H1C_F_RX_BLK)
if (h1s->flags & H1S_F_RX_BLK)
goto out;
do {
@ -1576,13 +1578,13 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
else {
if (h1s->req.state < H1_MSG_DONE || h1s->res.state < H1_MSG_DONE) {
/* Unfinished transaction: block this input side waiting the end of the output side */
h1c->flags |= H1C_F_RX_BLK;
TRACE_STATE("Disable input processing on h1c", H1_EV_RX_DATA|H1_EV_H1C_BLK, h1c->conn, h1s);
h1s->flags |= H1S_F_RX_BLK;
TRACE_STATE("Disable input processing", H1_EV_RX_DATA|H1_EV_H1S_BLK, h1c->conn, h1s);
}
if (h1c->flags & H1C_F_TX_BLK) {
h1c->flags &= ~H1C_F_TX_BLK;
if (h1s->flags & H1S_F_TX_BLK) {
h1s->flags &= ~H1S_F_TX_BLK;
h1_wake_stream_for_send(h1s);
TRACE_STATE("Re-enable output processing on h1c", H1_EV_TX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
TRACE_STATE("Re-enable output processing", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1s);
}
break;
}
@ -1602,7 +1604,7 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
}
count -= htx_used_space(htx) - used;
} while (!(h1s->flags & (H1S_F_PARSING_ERROR|H1S_F_NOT_IMPL_ERROR)) && !(h1c->flags & H1C_F_RX_BLK));
} while (!(h1s->flags & (H1S_F_PARSING_ERROR|H1S_F_NOT_IMPL_ERROR|H1S_F_RX_BLK)));
if (h1s->flags & (H1S_F_PARSING_ERROR|H1S_F_NOT_IMPL_ERROR)) {
@ -1696,10 +1698,10 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
TRACE_ERROR("message aborted, set error on CS", H1_EV_RX_DATA|H1_EV_H1S_ERR, h1c->conn, h1s);
}
if (h1c->flags & H1C_F_TX_BLK) {
h1c->flags &= ~H1C_F_TX_BLK;
if (h1s->flags & H1S_F_TX_BLK) {
h1s->flags &= ~H1S_F_TX_BLK;
h1_wake_stream_for_send(h1s);
TRACE_STATE("Re-enable output processing on h1c", H1_EV_TX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
TRACE_STATE("Re-enable output processing", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1s);
}
}
}
@ -1739,10 +1741,7 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
if (htx_is_empty(chn_htx))
goto end;
if (h1s->flags & H1S_F_PROCESSING_ERROR)
goto end;
if (h1c->flags & H1C_F_TX_BLK)
if (h1s->flags & (H1S_F_PROCESSING_ERROR|H1S_F_TX_BLK))
goto end;
if (!h1_get_buf(h1c, &h1c->obuf)) {
@ -1821,10 +1820,10 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
total += count;
if (last_data) {
h1m->state = H1_MSG_DONE;
if (h1c->flags & H1C_F_RX_BLK) {
h1c->flags &= ~H1C_F_RX_BLK;
if (h1s->flags & H1S_F_RX_BLK) {
h1s->flags &= ~H1S_F_RX_BLK;
h1_wake_stream_for_recv(h1s);
TRACE_STATE("Re-enable input processing on h1c", H1_EV_TX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
TRACE_STATE("Re-enable input processing", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1s);
}
TRACE_USER((!(h1m->flags & H1_MF_RESP) ? "H1 request fully xferred" : "H1 response fully xferred"),
@ -1839,7 +1838,7 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
tmp.data = 0;
tmp.size = b_room(&h1c->obuf);
while (count && !(h1s->flags & H1S_F_PROCESSING_ERROR) && !(h1c->flags & H1C_F_TX_BLK) && blk) {
while (count && !(h1s->flags & (H1S_F_PROCESSING_ERROR|H1S_F_TX_BLK)) && blk) {
struct htx_sl *sl;
struct ist n, v;
enum htx_blk_type type = htx_get_blk_type(blk);
@ -1869,10 +1868,10 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
h1m->state = H1_MSG_HDR_FIRST;
if (h1s->meth == HTTP_METH_HEAD)
h1s->flags |= H1S_F_BODYLESS_RESP;
if (h1c->flags & H1C_F_RX_BLK) {
h1c->flags &= ~H1C_F_RX_BLK;
if (h1s->flags & H1S_F_RX_BLK) {
h1s->flags &= ~H1S_F_RX_BLK;
h1_wake_stream_for_recv(h1s);
TRACE_STATE("Re-enable input processing on h1c", H1_EV_TX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
TRACE_STATE("Re-enable input processing", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1s);
}
break;
@ -2213,8 +2212,8 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
h1m->state = H1_MSG_DONE;
if (!(h1m->flags & H1_MF_RESP) && h1s->meth == HTTP_METH_CONNECT) {
h1c->flags |= H1C_F_TX_BLK;
TRACE_STATE("Disable output processing on h1c", H1_EV_TX_DATA|H1_EV_H1C_BLK, h1c->conn, h1s);
h1s->flags |= H1S_F_TX_BLK;
TRACE_STATE("Disable output processing", H1_EV_TX_DATA|H1_EV_H1S_BLK, h1c->conn, h1s);
}
else if ((h1m->flags & H1_MF_RESP) &&
((h1s->meth == HTTP_METH_CONNECT && h1s->status >= 200 && h1s->status < 300) || h1s->status == 101)) {
@ -2225,10 +2224,10 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
TRACE_STATE("switch H1 response in tunnel mode", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s);
}
if (h1c->flags & H1C_F_RX_BLK) {
h1c->flags &= ~H1C_F_RX_BLK;
if (h1s->flags & H1S_F_RX_BLK) {
h1s->flags &= ~H1S_F_RX_BLK;
h1_wake_stream_for_recv(h1s);
TRACE_STATE("Re-enable input processing on h1c", H1_EV_TX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
TRACE_STATE("Re-enable input processing", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1s);
}
TRACE_USER((!(h1m->flags & H1_MF_RESP) ? "H1 request fully xferred" : "H1 response fully xferred"),
@ -2608,7 +2607,7 @@ static int h1_send(struct h1c *h1c)
}
end:
if (!(h1c->flags & (H1C_F_OUT_FULL|H1C_F_TX_BLK)))
if (!(h1c->flags & H1C_F_OUT_FULL))
h1_wake_stream_for_send(h1c->h1s);
/* We're done, no more to send */