MEDIUM: mux-h2: merge recv_wait and send_wait event notifications

This is the continuation of the recv+send event notifications merge
that was started. This patch is less trivial than the previous ones
because the existence of a send event subscription is also used to
decide to put a stream back into the send list.
This commit is contained in:
Willy Tarreau 2020-01-10 11:12:48 +01:00
parent 1b0d4d19fc
commit f96508aae6

View File

@ -206,8 +206,7 @@ struct h2s {
uint16_t status; /* HTTP response status */ uint16_t status; /* HTTP response status */
unsigned long long body_len; /* remaining body length according to content-length if H2_SF_DATA_CLEN */ unsigned long long body_len; /* remaining body length according to content-length if H2_SF_DATA_CLEN */
struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */ struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
struct wait_event *recv_wait; /* recv wait_event the conn_stream associated is waiting on (via h2_subscribe) */ struct wait_event *subs; /* recv wait_event the conn_stream associated is waiting on (via h2_subscribe) */
struct wait_event *send_wait; /* send wait_event the conn_stream associated is waiting on (via h2_subscribe) */
struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */ struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */
struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to send an RST after we failed to, struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to send an RST after we failed to,
* in case there's no other subscription to do it */ * in case there's no other subscription to do it */
@ -1037,29 +1036,25 @@ static inline __maybe_unused void h2s_error(struct h2s *h2s, enum h2_err err)
/* attempt to notify the data layer of recv availability */ /* attempt to notify the data layer of recv availability */
static void __maybe_unused h2s_notify_recv(struct h2s *h2s) static void __maybe_unused h2s_notify_recv(struct h2s *h2s)
{ {
struct wait_event *sw; if (h2s->subs && h2s->subs->events & SUB_RETRY_RECV) {
if (h2s->recv_wait) {
TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s); TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
sw = h2s->recv_wait; tasklet_wakeup(h2s->subs->tasklet);
sw->events &= ~SUB_RETRY_RECV; h2s->subs->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(sw->tasklet); if (!h2s->subs->events)
h2s->recv_wait = NULL; h2s->subs = NULL;
} }
} }
/* attempt to notify the data layer of send availability */ /* attempt to notify the data layer of send availability */
static void __maybe_unused h2s_notify_send(struct h2s *h2s) static void __maybe_unused h2s_notify_send(struct h2s *h2s)
{ {
struct wait_event *sw; if (h2s->subs && h2s->subs->events & SUB_RETRY_SEND) {
if (h2s->send_wait) {
TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s); TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
sw = h2s->send_wait;
h2s->send_wait = NULL;
sw->events &= ~SUB_RETRY_SEND;
h2s->flags |= H2_SF_NOTIFIED; h2s->flags |= H2_SF_NOTIFIED;
tasklet_wakeup(sw->tasklet); tasklet_wakeup(h2s->subs->tasklet);
h2s->subs->events &= ~SUB_RETRY_SEND;
if (!h2s->subs->events)
h2s->subs = NULL;
} }
else if (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) { else if (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) {
TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s); TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
@ -1079,7 +1074,7 @@ static void __maybe_unused h2s_alert(struct h2s *h2s)
{ {
TRACE_ENTER(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s); TRACE_ENTER(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s);
if (h2s->recv_wait || h2s->send_wait || if (h2s->subs ||
(h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW))) { (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW))) {
h2s_notify_recv(h2s); h2s_notify_recv(h2s);
h2s_notify_send(h2s); h2s_notify_send(h2s);
@ -1266,10 +1261,10 @@ static void h2s_destroy(struct h2s *h2s)
b_free(&h2s->rxbuf); b_free(&h2s->rxbuf);
offer_buffers(NULL, tasks_run_queue); offer_buffers(NULL, tasks_run_queue);
} }
if (h2s->send_wait != NULL)
h2s->send_wait->events &= ~SUB_RETRY_SEND; if (h2s->subs)
if (h2s->recv_wait != NULL) h2s->subs->events = 0;
h2s->recv_wait->events &= ~SUB_RETRY_RECV;
/* There's no need to explicitly call unsubscribe here, the only /* There's no need to explicitly call unsubscribe here, the only
* reference left would be in the h2c send_list/fctl_list, and if * reference left would be in the h2c send_list/fctl_list, and if
* we're in it, we're getting out anyway * we're in it, we're getting out anyway
@ -1304,8 +1299,7 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
pool_free(pool_head_h2s, h2s); pool_free(pool_head_h2s, h2s);
goto out; goto out;
} }
h2s->send_wait = NULL; h2s->subs = NULL;
h2s->recv_wait = NULL;
h2s->shut_tl->process = h2_deferred_shut; h2s->shut_tl->process = h2_deferred_shut;
h2s->shut_tl->context = h2s; h2s->shut_tl->context = h2s;
LIST_INIT(&h2s->list); LIST_INIT(&h2s->list);
@ -1982,7 +1976,8 @@ static void h2c_unblock_sfctl(struct h2c *h2c)
if (h2s->flags & H2_SF_BLK_SFCTL && h2s_mws(h2s) > 0) { if (h2s->flags & H2_SF_BLK_SFCTL && h2s_mws(h2s) > 0) {
h2s->flags &= ~H2_SF_BLK_SFCTL; h2s->flags &= ~H2_SF_BLK_SFCTL;
LIST_DEL_INIT(&h2s->list); LIST_DEL_INIT(&h2s->list);
if (h2s->send_wait || h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)) if ((h2s->subs && h2s->subs->events & SUB_RETRY_SEND) ||
h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))
LIST_ADDQ(&h2c->send_list, &h2s->list); LIST_ADDQ(&h2c->send_list, &h2s->list);
} }
node = eb32_next(node); node = eb32_next(node);
@ -2322,7 +2317,8 @@ static int h2c_handle_window_update(struct h2c *h2c, struct h2s *h2s)
if (h2s_mws(h2s) > 0 && (h2s->flags & H2_SF_BLK_SFCTL)) { if (h2s_mws(h2s) > 0 && (h2s->flags & H2_SF_BLK_SFCTL)) {
h2s->flags &= ~H2_SF_BLK_SFCTL; h2s->flags &= ~H2_SF_BLK_SFCTL;
LIST_DEL_INIT(&h2s->list); LIST_DEL_INIT(&h2s->list);
if (h2s->send_wait || h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)) if ((h2s->subs && h2s->subs->events & SUB_RETRY_SEND) ||
h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))
LIST_ADDQ(&h2c->send_list, &h2s->list); LIST_ADDQ(&h2c->send_list, &h2s->list);
} }
} }
@ -3263,17 +3259,18 @@ static void h2_resume_each_sending_h2s(struct h2c *h2c, struct list *head)
/* If the sender changed his mind and unsubscribed, let's just /* If the sender changed his mind and unsubscribed, let's just
* remove the stream from the send_list. * remove the stream from the send_list.
*/ */
if (!h2s->send_wait && if (!(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)) &&
!(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))) { (!h2s->subs || !(h2s->subs->events & SUB_RETRY_SEND))) {
LIST_DEL_INIT(&h2s->list); LIST_DEL_INIT(&h2s->list);
continue; continue;
} }
if (h2s->send_wait) { if (h2s->subs && h2s->subs->events & SUB_RETRY_SEND) {
h2s->send_wait->events &= ~SUB_RETRY_SEND;
h2s->flags |= H2_SF_NOTIFIED; h2s->flags |= H2_SF_NOTIFIED;
tasklet_wakeup(h2s->send_wait->tasklet); tasklet_wakeup(h2s->subs->tasklet);
h2s->send_wait = NULL; h2s->subs->events &= ~SUB_RETRY_SEND;
if (!h2s->subs->events)
h2s->subs = NULL;
} }
else if (h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)) { else if (h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)) {
tasklet_wakeup(h2s->shut_tl); tasklet_wakeup(h2s->shut_tl);
@ -3861,7 +3858,7 @@ static void h2_detach(struct conn_stream *cs)
if (!(cs->conn->flags & CO_FL_ERROR) && if (!(cs->conn->flags & CO_FL_ERROR) &&
(h2c->st0 < H2_CS_ERROR) && (h2c->st0 < H2_CS_ERROR) &&
(h2s->flags & (H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL)) && (h2s->flags & (H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL)) &&
((h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) || h2s->send_wait || h2s->recv_wait)) { ((h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) || h2s->subs)) {
TRACE_DEVEL("leaving on stream blocked", H2_EV_STRM_END|H2_EV_H2S_BLK, h2c->conn, h2s); TRACE_DEVEL("leaving on stream blocked", H2_EV_STRM_END|H2_EV_H2S_BLK, h2c->conn, h2s);
return; return;
} }
@ -5594,32 +5591,30 @@ static size_t h2s_make_trailers(struct h2s *h2s, struct htx *htx)
/* Called from the upper layer, to subscribe to events, such as being able to send. /* Called from the upper layer, to subscribe to events, such as being able to send.
* The <param> argument here is supposed to be a pointer to a wait_event struct * The <param> argument here is supposed to be a pointer to a wait_event struct
* which will be passed to h2s->recv_wait or h2s->send_wait depending on the * which will be passed to h2s->subs. The event_type must only be a
* event_type. The event_type must only be a combination of SUB_RETRY_RECV and * combination of SUB_RETRY_RECV and SUB_RETRY_SEND, other values will lead to -1
* SUB_RETRY_SEND, other values will lead to -1 being returned. It always * being returned. It always returns 0 except for the error above.
* returns 0 except for the error above.
*/ */
static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
{ {
struct wait_event *sw; struct wait_event *sw = param;
struct h2s *h2s = cs->ctx; struct h2s *h2s = cs->ctx;
struct h2c *h2c = h2s->h2c; struct h2c *h2c = h2s->h2c;
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s); TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s);
if (event_type & SUB_RETRY_RECV) {
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
BUG_ON(h2s->subs && h2s->subs->events & event_type);
BUG_ON(h2s->subs && h2s->subs != sw);
sw->events |= event_type;
h2s->subs = sw;
if (event_type & SUB_RETRY_RECV)
TRACE_DEVEL("subscribe(recv)", H2_EV_STRM_RECV, h2c->conn, h2s); TRACE_DEVEL("subscribe(recv)", H2_EV_STRM_RECV, h2c->conn, h2s);
sw = param;
BUG_ON(h2s->recv_wait != NULL || (sw->events & SUB_RETRY_RECV));
sw->events |= SUB_RETRY_RECV;
h2s->recv_wait = sw;
event_type &= ~SUB_RETRY_RECV;
}
if (event_type & SUB_RETRY_SEND) { if (event_type & SUB_RETRY_SEND) {
TRACE_DEVEL("subscribe(send)", H2_EV_STRM_SEND, h2c->conn, h2s); TRACE_DEVEL("subscribe(send)", H2_EV_STRM_SEND, h2c->conn, h2s);
sw = param;
BUG_ON(h2s->send_wait != NULL || (sw->events & SUB_RETRY_SEND));
sw->events |= SUB_RETRY_SEND;
h2s->send_wait = sw;
if (!(h2s->flags & H2_SF_BLK_SFCTL) && if (!(h2s->flags & H2_SF_BLK_SFCTL) &&
!LIST_ADDED(&h2s->list)) { !LIST_ADDED(&h2s->list)) {
if (h2s->flags & H2_SF_BLK_MFCTL) if (h2s->flags & H2_SF_BLK_MFCTL)
@ -5627,11 +5622,8 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
else else
LIST_ADDQ(&h2c->send_list, &h2s->list); LIST_ADDQ(&h2c->send_list, &h2s->list);
} }
event_type &= ~SUB_RETRY_SEND;
} }
TRACE_LEAVE(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s); TRACE_LEAVE(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s);
if (event_type != 0)
return -1;
return 0; return 0;
} }
@ -5642,28 +5634,28 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
*/ */
static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param) static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
{ {
struct wait_event *sw; struct wait_event *sw = param;
struct h2s *h2s = cs->ctx; struct h2s *h2s = cs->ctx;
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s); TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
if (event_type & SUB_RETRY_RECV) {
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
BUG_ON(h2s->subs && h2s->subs != sw);
sw->events &= ~event_type;
if (!sw->events)
h2s->subs = NULL;
if (event_type & SUB_RETRY_RECV)
TRACE_DEVEL("unsubscribe(recv)", H2_EV_STRM_RECV, h2s->h2c->conn, h2s); TRACE_DEVEL("unsubscribe(recv)", H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
sw = param;
BUG_ON(h2s->recv_wait != sw);
sw->events &= ~SUB_RETRY_RECV;
h2s->recv_wait = NULL;
}
if (event_type & SUB_RETRY_SEND) { if (event_type & SUB_RETRY_SEND) {
TRACE_DEVEL("subscribe(send)", H2_EV_STRM_SEND, h2s->h2c->conn, h2s); TRACE_DEVEL("subscribe(send)", H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
sw = param;
BUG_ON(h2s->send_wait != sw);
if (!(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)))
LIST_DEL_INIT(&h2s->list);
sw->events &= ~SUB_RETRY_SEND;
h2s->flags &= ~H2_SF_NOTIFIED; h2s->flags &= ~H2_SF_NOTIFIED;
h2s->send_wait = NULL; if (!(h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)))
LIST_DEL_INIT(&h2s->list);
} }
TRACE_LEAVE(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s); TRACE_LEAVE(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
return 0; return 0;
} }