MINOR: mux-h2: implement an outgoing stream allocator : h2c_bck_stream_new()

For the backend we'll need to allocate streams as well. Let's do this
with h2c_bck_stream_new(). The stream ID allocator was split from it
so that the caller can decide whether or not to stay on the same
connection or create a new one. It possibly isn't the best way to do
this as once we're on the mux it's too late to give up creation of a
new stream. Another approach would possibly consist in detaching muxes
that reached their connection count limit before they can be reused.

Instead of choosing the stream id as soon as the stream is created, wait
until data is about to be sent. If we don't do that, the stream may send
data out of order, and so the stream 3 may send data before the stream 1,
and then when the stream 1 will try to send data, the other end will
consider that an error, as stream ids should always be increased.

Cc: Olivier Houchard <ohouchard@haproxy.com>
This commit is contained in:
Willy Tarreau 2018-10-05 09:35:00 +02:00
parent f8957277ff
commit 751f2d0ddf

View File

@ -236,6 +236,7 @@ static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id);
static int h2_frt_decode_headers(struct h2s *h2s); static int h2_frt_decode_headers(struct h2s *h2s);
static int h2_frt_transfer_data(struct h2s *h2s); static int h2_frt_transfer_data(struct h2s *h2s);
static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state); static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state);
static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs);
/*****************************************************/ /*****************************************************/
/* functions below are for dynamic buffer management */ /* functions below are for dynamic buffer management */
@ -420,7 +421,7 @@ static int h2_init(struct connection *conn, struct proxy *prx)
h2c->miw = 65535; /* mux initial window size */ h2c->miw = 65535; /* mux initial window size */
h2c->mws = 65535; /* mux window size */ h2c->mws = 65535; /* mux window size */
h2c->mfs = 16384; /* initial max frame size */ h2c->mfs = 16384; /* initial max frame size */
h2c->streams_by_id = EB_ROOT_UNIQUE; h2c->streams_by_id = EB_ROOT;
LIST_INIT(&h2c->send_list); LIST_INIT(&h2c->send_list);
LIST_INIT(&h2c->fctl_list); LIST_INIT(&h2c->fctl_list);
LIST_INIT(&h2c->sending_list); LIST_INIT(&h2c->sending_list);
@ -443,11 +444,25 @@ static int h2_init(struct connection *conn, struct proxy *prx)
return -1; return -1;
} }
/* returns the next allocatable outgoing stream ID for the H2 connection, or
* -1 if no more is allocatable.
*/
static inline int32_t h2c_get_next_sid(const struct h2c *h2c)
{
int32_t id = (h2c->max_id + 1) | 1;
if (id & 0x80000000U)
id = -1;
return id;
}
/* returns the stream associated with id <id> or NULL if not found */ /* returns the stream associated with id <id> or NULL if not found */
static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id) static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id)
{ {
struct eb32_node *node; struct eb32_node *node;
if (id == 0)
return (struct h2s *)h2_closed_stream;
if (id > h2c->max_id) if (id > h2c->max_id)
return (struct h2s *)h2_idle_stream; return (struct h2s *)h2_idle_stream;
@ -699,10 +714,19 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
h2s->st = H2_SS_IDLE; h2s->st = H2_SS_IDLE;
h2s->status = 0; h2s->status = 0;
h2s->rxbuf = BUF_NULL; h2s->rxbuf = BUF_NULL;
if (h2c->flags & H2_CF_IS_BACK) {
h1m_init_req(&h2s->h1m);
h2s->h1m.err_pos = -1; // don't care about errors on the request path
h2s->h1m.flags |= H1_MF_TOLOWER;
} else {
h1m_init_res(&h2s->h1m); h1m_init_res(&h2s->h1m);
h2s->h1m.err_pos = -1; // don't care about errors on the response path h2s->h1m.err_pos = -1; // don't care about errors on the response path
h2s->h1m.flags |= H1_MF_TOLOWER; h2s->h1m.flags |= H1_MF_TOLOWER;
}
h2s->by_id.key = h2s->id = id; h2s->by_id.key = h2s->id = id;
if (id > 0)
h2c->max_id = id; h2c->max_id = id;
eb32_insert(&h2c->streams_by_id, &h2s->by_id); eb32_insert(&h2c->streams_by_id, &h2s->by_id);
@ -767,6 +791,33 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id)
return NULL; return NULL;
} }
/* allocates a new stream associated to conn_stream <cs> on the h2c connection
* and returns it, or NULL in case of memory allocation error or if the highest
* possible stream ID was reached.
*/
static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs)
{
struct h2s *h2s = NULL;
if (h2c->nb_streams >= h2_settings_max_concurrent_streams)
goto out;
/* Defer choosing the ID until we send the first message to create the stream */
h2s = h2s_new(h2c, 0);
if (!h2s)
goto out;
h2s->cs = cs;
cs->ctx = h2s;
h2c->nb_cs++;
/* OK done, the stream lives its own life now */
if (h2_has_too_many_cs(h2c))
h2c->flags |= H2_CF_DEM_TOOMANY;
out:
return h2s;
}
/* try to send a settings frame on the connection. Returns > 0 on success, 0 if /* try to send a settings frame on the connection. Returns > 0 on success, 0 if
* it couldn't do anything. It may return an error in h2c. See RFC7540#11.3 for * it couldn't do anything. It may return an error in h2c. See RFC7540#11.3 for
* the various settings codes. * the various settings codes.
@ -4135,6 +4186,22 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
if (!(h2s->flags & H2_SF_OUTGOING_DATA) && count) if (!(h2s->flags & H2_SF_OUTGOING_DATA) && count)
h2s->flags |= H2_SF_OUTGOING_DATA; h2s->flags |= H2_SF_OUTGOING_DATA;
if (h2s->id == 0) {
int32_t id = h2c_get_next_sid(h2s->h2c);
if (id < 0) {
cs->ctx = NULL;
cs->flags |= CS_FL_ERROR;
h2s_destroy(h2s);
return 0;
}
eb32_delete(&h2s->by_id);
h2s->by_id.key = h2s->id = id;
h2s->h2c->max_id = id;
eb32_insert(&h2s->h2c->streams_by_id, &h2s->by_id);
}
if (htx) { if (htx) {
while (count && !htx_is_empty(htx)) { while (count && !htx_is_empty(htx)) {
idx = htx_get_head(htx); idx = htx_get_head(htx);