MEDIUM: mux-fcgi: do not make an fstrm subscribe to itself on deferred shut

This is the port to FCGI of previous commit "MEDIUM: mux-h2: do not make
an h2s subscribe to itself on deferred shut".

The purpose is to avoid subscribing to the send_wait list when trying to
close, because we'll soon have to merge both recv and send lists. Basic
testing showed no difference (performance nor issues).
This commit is contained in:
Willy Tarreau 2020-01-16 17:20:57 +01:00
parent 5723f295d8
commit 7aad7039e4

View File

@ -164,10 +164,10 @@ struct fcgi_strm {
struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */ struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
struct eb32_node by_id; /* place in fcgi_conn's streams_by_id */ struct eb32_node by_id; /* place in fcgi_conn's streams_by_id */
struct wait_event wait_event; /* Wait list, when we're attempting to send an ABORT but we can't send */
struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */ struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */ struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */
struct list send_list; /* To be used when adding in fcgi_conn->send_list */ struct list send_list; /* To be used when adding in fcgi_conn->send_list */
struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to close after we failed to by lack of space */
}; };
/* Flags representing all default FCGI parameters */ /* Flags representing all default FCGI parameters */
@ -935,6 +935,10 @@ static void fcgi_strm_notify_send(struct fcgi_strm *fstrm)
tasklet_wakeup(sw->tasklet); tasklet_wakeup(sw->tasklet);
fstrm->send_wait = NULL; fstrm->send_wait = NULL;
} }
else if (fstrm->flags & (FCGI_SF_WANT_SHUTR | FCGI_SF_WANT_SHUTW)) {
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
tasklet_wakeup(fstrm->shut_tl);
}
} }
/* Alerts the data layer, trying to wake it up by all means, following /* Alerts the data layer, trying to wake it up by all means, following
@ -949,7 +953,8 @@ static void fcgi_strm_notify_send(struct fcgi_strm *fstrm)
static void fcgi_strm_alert(struct fcgi_strm *fstrm) static void fcgi_strm_alert(struct fcgi_strm *fstrm)
{ {
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm); TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
if (fstrm->recv_wait || fstrm->send_wait) { if (fstrm->recv_wait || fstrm->send_wait ||
(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
fcgi_strm_notify_recv(fstrm); fcgi_strm_notify_recv(fstrm);
fcgi_strm_notify_send(fstrm); fcgi_strm_notify_send(fstrm);
} }
@ -1022,7 +1027,7 @@ static void fcgi_strm_destroy(struct fcgi_strm *fstrm)
* we're in it, we're getting out anyway * we're in it, we're getting out anyway
*/ */
LIST_DEL_INIT(&fstrm->send_list); LIST_DEL_INIT(&fstrm->send_list);
tasklet_free(fstrm->wait_event.tasklet); tasklet_free(fstrm->shut_tl);
pool_free(pool_head_fcgi_strm, fstrm); pool_free(pool_head_fcgi_strm, fstrm);
TRACE_LEAVE(FCGI_EV_FSTRM_END, conn); TRACE_LEAVE(FCGI_EV_FSTRM_END, conn);
@ -1044,16 +1049,15 @@ static struct fcgi_strm *fcgi_strm_new(struct fcgi_conn *fconn, int id)
if (!fstrm) if (!fstrm)
goto out; goto out;
fstrm->wait_event.tasklet = tasklet_new(); fstrm->shut_tl = tasklet_new();
if (!fstrm->wait_event.tasklet) { if (!fstrm->shut_tl) {
pool_free(pool_head_fcgi_strm, fstrm); pool_free(pool_head_fcgi_strm, fstrm);
goto out; goto out;
} }
fstrm->send_wait = NULL; fstrm->send_wait = NULL;
fstrm->recv_wait = NULL; fstrm->recv_wait = NULL;
fstrm->wait_event.tasklet->process = fcgi_deferred_shut; fstrm->shut_tl->process = fcgi_deferred_shut;
fstrm->wait_event.tasklet->context = fstrm; fstrm->shut_tl->context = fstrm;
fstrm->wait_event.events = 0;
LIST_INIT(&fstrm->send_list); LIST_INIT(&fstrm->send_list);
fstrm->fconn = fconn; fstrm->fconn = fconn;
fstrm->cs = NULL; fstrm->cs = NULL;
@ -2646,18 +2650,25 @@ static int fcgi_process_mux(struct fcgi_conn *fconn)
if (fstrm->flags & FCGI_SF_NOTIFIED) if (fstrm->flags & FCGI_SF_NOTIFIED)
continue; continue;
/* For some reason, the upper layer failed to subsribe again, /* If the sender changed his mind and unsubscribed, let's just
* so remove it from the send_list * remove the stream from the send_list.
*/ */
if (!fstrm->send_wait) { if (!fstrm->send_wait &&
!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
LIST_DEL_INIT(&fstrm->send_list); LIST_DEL_INIT(&fstrm->send_list);
continue; continue;
} }
TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm); if (fstrm->send_wait) {
fstrm->flags &= ~FCGI_SF_BLK_ANY; TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
fstrm->send_wait->events &= ~SUB_RETRY_SEND; fstrm->flags &= ~FCGI_SF_BLK_ANY;
fstrm->flags |= FCGI_SF_NOTIFIED; fstrm->send_wait->events &= ~SUB_RETRY_SEND;
tasklet_wakeup(fstrm->send_wait->tasklet); fstrm->flags |= FCGI_SF_NOTIFIED;
tasklet_wakeup(fstrm->send_wait->tasklet);
} else {
/* it's the shut request that was queued */
TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
tasklet_wakeup(fstrm->shut_tl);
}
} }
fail: fail:
@ -2854,18 +2865,25 @@ static int fcgi_send(struct fcgi_conn *fconn)
if (fstrm->flags & FCGI_SF_NOTIFIED) if (fstrm->flags & FCGI_SF_NOTIFIED)
continue; continue;
/* For some reason, the upper layer failed to subsribe again, /* If the sender changed his mind and unsubscribed, let's just
* so remove it from the send_list * remove the stream from the send_list.
*/ */
if (!fstrm->send_wait) { if (!fstrm->send_wait &&
!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
LIST_DEL_INIT(&fstrm->send_list); LIST_DEL_INIT(&fstrm->send_list);
continue; continue;
} }
fstrm->flags &= ~FCGI_SF_BLK_ANY; if (fstrm->send_wait) {
fstrm->send_wait->events &= ~SUB_RETRY_SEND; fstrm->flags &= ~FCGI_SF_BLK_ANY;
TRACE_DEVEL("waking up pending stream", FCGI_EV_FCONN_SEND|FCGI_EV_STRM_WAKE, conn, fstrm); fstrm->send_wait->events &= ~SUB_RETRY_SEND;
tasklet_wakeup(fstrm->send_wait->tasklet); TRACE_DEVEL("waking up pending stream", FCGI_EV_FCONN_SEND|FCGI_EV_STRM_WAKE, conn, fstrm);
fstrm->flags |= FCGI_SF_NOTIFIED; tasklet_wakeup(fstrm->send_wait->tasklet);
fstrm->flags |= FCGI_SF_NOTIFIED;
} else {
/* it's the shut request that was queued */
TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
tasklet_wakeup(fstrm->shut_tl);
}
} }
} }
/* We're done, no more to send */ /* We're done, no more to send */
@ -3429,7 +3447,8 @@ static void fcgi_detach(struct conn_stream *cs)
*/ */
if (!(cs->conn->flags & CO_FL_ERROR) && if (!(cs->conn->flags & CO_FL_ERROR) &&
(fconn->state != FCGI_CS_CLOSED) && (fconn->state != FCGI_CS_CLOSED) &&
(fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) && (fstrm->send_wait || fstrm->recv_wait)) { (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) &&
(fstrm->send_wait || fstrm->recv_wait || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) {
TRACE_DEVEL("leaving on stream blocked", FCGI_EV_STRM_END|FCGI_EV_FSTRM_BLK, fconn->conn, fstrm); TRACE_DEVEL("leaving on stream blocked", FCGI_EV_STRM_END|FCGI_EV_FSTRM_BLK, fconn->conn, fstrm);
return; return;
} }
@ -3511,7 +3530,6 @@ static void fcgi_detach(struct conn_stream *cs)
static void fcgi_do_shutr(struct fcgi_strm *fstrm) static void fcgi_do_shutr(struct fcgi_strm *fstrm)
{ {
struct fcgi_conn *fconn = fstrm->fconn; struct fcgi_conn *fconn = fstrm->fconn;
struct wait_event *sw = &fstrm->wait_event;
TRACE_ENTER(FCGI_EV_STRM_SHUT, fconn->conn, fstrm); TRACE_ENTER(FCGI_EV_STRM_SHUT, fconn->conn, fstrm);
@ -3544,14 +3562,16 @@ static void fcgi_do_shutr(struct fcgi_strm *fstrm)
return; return;
add_to_list: add_to_list:
/* Let the handler know we want to shutr, and add ourselves to the
* send list if not yet done. fcgi_deferred_shut() will be
* automatically called via the shut_tl tasklet when there's room
* again.
*/
if (!LIST_ADDED(&fstrm->send_list)) { if (!LIST_ADDED(&fstrm->send_list)) {
sw->events |= SUB_RETRY_SEND;
if (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) { if (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) {
fstrm->send_wait = sw;
LIST_ADDQ(&fconn->send_list, &fstrm->send_list); LIST_ADDQ(&fconn->send_list, &fstrm->send_list);
} }
} }
/* Let the handler know we want shutr */
fstrm->flags |= FCGI_SF_WANT_SHUTR; fstrm->flags |= FCGI_SF_WANT_SHUTR;
TRACE_LEAVE(FCGI_EV_STRM_SHUT, fconn->conn, fstrm); TRACE_LEAVE(FCGI_EV_STRM_SHUT, fconn->conn, fstrm);
return; return;
@ -3561,7 +3581,6 @@ static void fcgi_do_shutr(struct fcgi_strm *fstrm)
static void fcgi_do_shutw(struct fcgi_strm *fstrm) static void fcgi_do_shutw(struct fcgi_strm *fstrm)
{ {
struct fcgi_conn *fconn = fstrm->fconn; struct fcgi_conn *fconn = fstrm->fconn;
struct wait_event *sw = &fstrm->wait_event;
TRACE_ENTER(FCGI_EV_STRM_SHUT, fconn->conn, fstrm); TRACE_ENTER(FCGI_EV_STRM_SHUT, fconn->conn, fstrm);
@ -3599,20 +3618,22 @@ static void fcgi_do_shutw(struct fcgi_strm *fstrm)
return; return;
add_to_list: add_to_list:
/* Let the handler know we want to shutr, and add ourselves to the
* send list if not yet done. fcgi_deferred_shut() will be
* automatically called via the shut_tl tasklet when there's room
* again.
*/
if (!LIST_ADDED(&fstrm->send_list)) { if (!LIST_ADDED(&fstrm->send_list)) {
sw->events |= SUB_RETRY_SEND;
if (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) { if (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) {
fstrm->send_wait = sw;
LIST_ADDQ(&fconn->send_list, &fstrm->send_list); LIST_ADDQ(&fconn->send_list, &fstrm->send_list);
} }
} }
/* let the handler know we want to shutw */
fstrm->flags |= FCGI_SF_WANT_SHUTW; fstrm->flags |= FCGI_SF_WANT_SHUTW;
TRACE_LEAVE(FCGI_EV_STRM_SHUT, fconn->conn, fstrm); TRACE_LEAVE(FCGI_EV_STRM_SHUT, fconn->conn, fstrm);
return; return;
} }
/* This is the tasklet referenced in fstrm->wait_event.tasklet, it is used for /* This is the tasklet referenced in fstrm->shut_tl, it is used for
* deferred shutdowns when the fcgi_detach() was done but the mux buffer was full * deferred shutdowns when the fcgi_detach() was done but the mux buffer was full
* and prevented the last record from being emitted. * and prevented the last record from being emitted.
*/ */
@ -3623,7 +3644,11 @@ static struct task *fcgi_deferred_shut(struct task *t, void *ctx, unsigned short
TRACE_ENTER(FCGI_EV_STRM_SHUT, fconn->conn, fstrm); TRACE_ENTER(FCGI_EV_STRM_SHUT, fconn->conn, fstrm);
fstrm->flags &= ~FCGI_SF_NOTIFIED; if (fstrm->flags & FCGI_SF_NOTIFIED) {
/* some data processing remains to be done first */
goto end;
}
if (fstrm->flags & FCGI_SF_WANT_SHUTW) if (fstrm->flags & FCGI_SF_WANT_SHUTW)
fcgi_do_shutw(fstrm); fcgi_do_shutw(fstrm);
@ -3640,7 +3665,7 @@ static struct task *fcgi_deferred_shut(struct task *t, void *ctx, unsigned short
fcgi_release(fconn); fcgi_release(fconn);
} }
} }
end:
TRACE_LEAVE(FCGI_EV_STRM_SHUT); TRACE_LEAVE(FCGI_EV_STRM_SHUT);
return NULL; return NULL;
} }
@ -3730,8 +3755,8 @@ static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, void *param)
TRACE_DEVEL("subscribe(send)", FCGI_EV_STRM_SEND, fconn->conn, fstrm); TRACE_DEVEL("subscribe(send)", FCGI_EV_STRM_SEND, fconn->conn, fstrm);
sw = param; sw = param;
BUG_ON(fstrm->send_wait != sw); BUG_ON(fstrm->send_wait != sw);
LIST_DEL(&fstrm->send_list); if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))
LIST_INIT(&fstrm->send_list); LIST_DEL_INIT(&fstrm->send_list);
sw->events &= ~SUB_RETRY_SEND; sw->events &= ~SUB_RETRY_SEND;
fstrm->flags &= ~FCGI_SF_NOTIFIED; fstrm->flags &= ~FCGI_SF_NOTIFIED;
fstrm->send_wait = NULL; fstrm->send_wait = NULL;
@ -3929,7 +3954,8 @@ static size_t fcgi_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t co
} }
/* Ok we managed to send something, leave the send_list */ /* Ok we managed to send something, leave the send_list */
LIST_DEL_INIT(&fstrm->send_list); if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))
LIST_DEL_INIT(&fstrm->send_list);
} }
TRACE_LEAVE(FCGI_EV_STRM_SEND, fconn->conn, fstrm, htx, (size_t[]){total}); TRACE_LEAVE(FCGI_EV_STRM_SEND, fconn->conn, fstrm, htx, (size_t[]){total});