mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-09 00:27:08 +02:00
MAJOR: channel: add a new flag CF_WAKE_WRITE to notify the task of writes
Since commit 6b66f3e
([MAJOR] implement autonomous inter-socket forwarding)
introduced in 1.3.16-rc1, we've been relying on a stupid mechanism to wake
up the task after a write, which was an exact copy-paste of the reader side.
The principle was that if we empty a buffer and there's no forwarding
scheduled or if the *producer* is not in a connected state, then we wake
the task up.
That does not make any sense. It happens to wake up too late sometimes (eg,
when the request analyser waits for some room in the buffer to start to
work), and leads to unneeded wakeups in client-side keep-alive, because
the task is woken up when the response is sent, while the analysers are
simply waiting for a new request.
In order to fix this, we introduce a new channel flag : CF_WAKE_WRITE. It
is designed so that an analyser can explicitly request being notified when
some data were written. It is used only when the HTTP request or response
analysers need to wait for more room in the buffers. It is automatically
cleared upon wake up.
The flag is also automatically set by the functions which try to write into
a buffer from an applet when they fail (bi_putblk() etc...).
That allows us to remove the stupid condition above and avoid some wakeups.
In http-server-close and in http-keep-alive modes, this reduces from 4 to 3
the average number of wakeups per request, and increases the overall
performance by about 1.5%.
This commit is contained in:
parent
51437d2c59
commit
d7ad9f5b0d
@ -70,7 +70,7 @@
|
|||||||
#define CF_WRITE_ERROR 0x00000800 /* unrecoverable error on consumer side */
|
#define CF_WRITE_ERROR 0x00000800 /* unrecoverable error on consumer side */
|
||||||
#define CF_WRITE_ACTIVITY (CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR)
|
#define CF_WRITE_ACTIVITY (CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR)
|
||||||
|
|
||||||
/* unused: 0x00001000 */
|
#define CF_WAKE_WRITE 0x00001000 /* wake the task up when there's write activity */
|
||||||
#define CF_SHUTW 0x00002000 /* consumer has already shut down */
|
#define CF_SHUTW 0x00002000 /* consumer has already shut down */
|
||||||
#define CF_SHUTW_NOW 0x00004000 /* the consumer must shut down for writes ASAP */
|
#define CF_SHUTW_NOW 0x00004000 /* the consumer must shut down for writes ASAP */
|
||||||
#define CF_AUTO_CLOSE 0x00008000 /* producer can forward shutdown to other side */
|
#define CF_AUTO_CLOSE 0x00008000 /* producer can forward shutdown to other side */
|
||||||
@ -120,11 +120,6 @@
|
|||||||
#define CF_WAKE_ONCE 0x10000000 /* pretend there is activity on this channel (one-shoot) */
|
#define CF_WAKE_ONCE 0x10000000 /* pretend there is activity on this channel (one-shoot) */
|
||||||
/* unused: 0x20000000, 0x40000000, 0x80000000 */
|
/* unused: 0x20000000, 0x40000000, 0x80000000 */
|
||||||
|
|
||||||
/* Use these masks to clear the flags before going back to lower layers */
|
|
||||||
#define CF_CLEAR_READ (~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ERROR|CF_READ_ATTACHED))
|
|
||||||
#define CF_CLEAR_WRITE (~(CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR))
|
|
||||||
#define CF_CLEAR_TIMEOUT (~(CF_READ_TIMEOUT|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT))
|
|
||||||
|
|
||||||
/* Masks which define input events for stream analysers */
|
/* Masks which define input events for stream analysers */
|
||||||
#define CF_MASK_ANALYSER (CF_READ_ATTACHED|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_ANA_TIMEOUT|CF_WRITE_ACTIVITY|CF_WAKE_ONCE)
|
#define CF_MASK_ANALYSER (CF_READ_ATTACHED|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_ANA_TIMEOUT|CF_WRITE_ACTIVITY|CF_WAKE_ONCE)
|
||||||
|
|
||||||
|
@ -113,14 +113,18 @@ int bo_inject(struct channel *chn, const char *msg, int len)
|
|||||||
* input is closed, -2 is returned. If there is not enough room left in the
|
* input is closed, -2 is returned. If there is not enough room left in the
|
||||||
* buffer, -1 is returned. Otherwise the number of bytes copied is returned
|
* buffer, -1 is returned. Otherwise the number of bytes copied is returned
|
||||||
* (1). Channel flag READ_PARTIAL is updated if some data can be transferred.
|
* (1). Channel flag READ_PARTIAL is updated if some data can be transferred.
|
||||||
|
* Channel flag CF_WAKE_WRITE is set if the write fails because the buffer is
|
||||||
|
* full.
|
||||||
*/
|
*/
|
||||||
int bi_putchr(struct channel *chn, char c)
|
int bi_putchr(struct channel *chn, char c)
|
||||||
{
|
{
|
||||||
if (unlikely(channel_input_closed(chn)))
|
if (unlikely(channel_input_closed(chn)))
|
||||||
return -2;
|
return -2;
|
||||||
|
|
||||||
if (channel_full(chn))
|
if (channel_full(chn)) {
|
||||||
|
chn->flags |= CF_WAKE_WRITE;
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
*bi_end(chn->buf) = c;
|
*bi_end(chn->buf) = c;
|
||||||
|
|
||||||
@ -143,7 +147,8 @@ int bi_putchr(struct channel *chn, char c)
|
|||||||
* -3 is returned. If there is not enough room left in the buffer, -1 is
|
* -3 is returned. If there is not enough room left in the buffer, -1 is
|
||||||
* returned. Otherwise the number of bytes copied is returned (0 being a valid
|
* returned. Otherwise the number of bytes copied is returned (0 being a valid
|
||||||
* number). Channel flag READ_PARTIAL is updated if some data can be
|
* number). Channel flag READ_PARTIAL is updated if some data can be
|
||||||
* transferred.
|
* transferred. Channel flag CF_WAKE_WRITE is set if the write fails because
|
||||||
|
* the buffer is full.
|
||||||
*/
|
*/
|
||||||
int bi_putblk(struct channel *chn, const char *blk, int len)
|
int bi_putblk(struct channel *chn, const char *blk, int len)
|
||||||
{
|
{
|
||||||
@ -161,6 +166,7 @@ int bi_putblk(struct channel *chn, const char *blk, int len)
|
|||||||
if (len > max)
|
if (len > max)
|
||||||
return -3;
|
return -3;
|
||||||
|
|
||||||
|
chn->flags |= CF_WAKE_WRITE;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1965,8 +1965,10 @@ static void cli_io_handler(struct stream_interface *si)
|
|||||||
/* ensure we have some output room left in the event we
|
/* ensure we have some output room left in the event we
|
||||||
* would want to return some info right after parsing.
|
* would want to return some info right after parsing.
|
||||||
*/
|
*/
|
||||||
if (buffer_almost_full(si->ib->buf))
|
if (buffer_almost_full(si->ib->buf)) {
|
||||||
|
si->ib->flags |= CF_WAKE_WRITE;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
reql = bo_getline(si->ob, trash.str, trash.size);
|
reql = bo_getline(si->ob, trash.str, trash.size);
|
||||||
if (reql <= 0) { /* closed or EOL not found */
|
if (reql <= 0) { /* closed or EOL not found */
|
||||||
@ -3360,8 +3362,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy
|
|||||||
case STAT_PX_ST_LI:
|
case STAT_PX_ST_LI:
|
||||||
/* stats.l has been initialized above */
|
/* stats.l has been initialized above */
|
||||||
for (; appctx->ctx.stats.l != &px->conf.listeners; appctx->ctx.stats.l = l->by_fe.n) {
|
for (; appctx->ctx.stats.l != &px->conf.listeners; appctx->ctx.stats.l = l->by_fe.n) {
|
||||||
if (buffer_almost_full(rep->buf))
|
if (buffer_almost_full(rep->buf)) {
|
||||||
|
rep->flags |= CF_WAKE_WRITE;
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
l = LIST_ELEM(appctx->ctx.stats.l, struct listener *, by_fe);
|
l = LIST_ELEM(appctx->ctx.stats.l, struct listener *, by_fe);
|
||||||
if (!l->counters)
|
if (!l->counters)
|
||||||
@ -3390,8 +3394,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy
|
|||||||
for (; appctx->ctx.stats.sv != NULL; appctx->ctx.stats.sv = sv->next) {
|
for (; appctx->ctx.stats.sv != NULL; appctx->ctx.stats.sv = sv->next) {
|
||||||
int sv_state; /* 0=DOWN, 1=going up, 2=going down, 3=UP, 4,5=NOLB, 6=unchecked */
|
int sv_state; /* 0=DOWN, 1=going up, 2=going down, 3=UP, 4,5=NOLB, 6=unchecked */
|
||||||
|
|
||||||
if (buffer_almost_full(rep->buf))
|
if (buffer_almost_full(rep->buf)) {
|
||||||
|
rep->flags |= CF_WAKE_WRITE;
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
sv = appctx->ctx.stats.sv;
|
sv = appctx->ctx.stats.sv;
|
||||||
|
|
||||||
@ -3874,8 +3880,10 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct uri_aut
|
|||||||
case STAT_ST_LIST:
|
case STAT_ST_LIST:
|
||||||
/* dump proxies */
|
/* dump proxies */
|
||||||
while (appctx->ctx.stats.px) {
|
while (appctx->ctx.stats.px) {
|
||||||
if (buffer_almost_full(rep->buf))
|
if (buffer_almost_full(rep->buf)) {
|
||||||
|
rep->flags |= CF_WAKE_WRITE;
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
px = appctx->ctx.stats.px;
|
px = appctx->ctx.stats.px;
|
||||||
/* skip the disabled proxies, global frontend and non-networked ones */
|
/* skip the disabled proxies, global frontend and non-networked ones */
|
||||||
|
@ -2307,6 +2307,7 @@ int http_wait_for_request(struct session *s, struct channel *req, int an_bit)
|
|||||||
/* some data has still not left the buffer, wake us once that's done */
|
/* some data has still not left the buffer, wake us once that's done */
|
||||||
channel_dont_connect(req);
|
channel_dont_connect(req);
|
||||||
req->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
|
req->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
|
||||||
|
req->flags |= CF_WAKE_WRITE;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if (unlikely(bi_end(req->buf) < b_ptr(req->buf, msg->next) ||
|
if (unlikely(bi_end(req->buf) < b_ptr(req->buf, msg->next) ||
|
||||||
@ -2331,6 +2332,7 @@ int http_wait_for_request(struct session *s, struct channel *req, int an_bit)
|
|||||||
/* don't let a connection request be initiated */
|
/* don't let a connection request be initiated */
|
||||||
channel_dont_connect(req);
|
channel_dont_connect(req);
|
||||||
s->rep->flags &= ~CF_EXPECT_MORE; /* speed up sending a previous response */
|
s->rep->flags &= ~CF_EXPECT_MORE; /* speed up sending a previous response */
|
||||||
|
s->rep->flags |= CF_WAKE_WRITE;
|
||||||
s->rep->analysers |= an_bit; /* wake us up once it changes */
|
s->rep->analysers |= an_bit; /* wake us up once it changes */
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -5115,6 +5117,7 @@ int http_wait_for_response(struct session *s, struct channel *rep, int an_bit)
|
|||||||
goto abort_response;
|
goto abort_response;
|
||||||
channel_dont_close(rep);
|
channel_dont_close(rep);
|
||||||
rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
|
rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
|
||||||
|
rep->flags |= CF_WAKE_WRITE;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1605,7 +1605,8 @@ struct task *process_session(struct task *t)
|
|||||||
memset(&s->txn.auth, 0, sizeof(s->txn.auth));
|
memset(&s->txn.auth, 0, sizeof(s->txn.auth));
|
||||||
|
|
||||||
/* This flag must explicitly be set every time */
|
/* This flag must explicitly be set every time */
|
||||||
s->req->flags &= ~CF_READ_NOEXP;
|
s->req->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
|
||||||
|
s->rep->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
|
||||||
|
|
||||||
/* Keep a copy of req/rep flags so that we can detect shutdowns */
|
/* Keep a copy of req/rep flags so that we can detect shutdowns */
|
||||||
rqf_last = s->req->flags & ~CF_MASK_ANALYSER;
|
rqf_last = s->req->flags & ~CF_MASK_ANALYSER;
|
||||||
|
@ -205,9 +205,7 @@ static void stream_int_update_embedded(struct stream_interface *si)
|
|||||||
/* changes on the consumption side */
|
/* changes on the consumption side */
|
||||||
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
|
((si->ob->flags & CF_WRITE_ACTIVITY) &&
|
||||||
((si->ob->flags & CF_SHUTW) ||
|
(si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
|
||||||
si->ob->prod->state != SI_ST_EST ||
|
|
||||||
(channel_is_empty(si->ob) && !si->ob->to_forward)))) {
|
|
||||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||||
}
|
}
|
||||||
@ -630,9 +628,7 @@ static int si_conn_wake_cb(struct connection *conn)
|
|||||||
/* changes on the consumption side */
|
/* changes on the consumption side */
|
||||||
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
|
((si->ob->flags & CF_WRITE_ACTIVITY) &&
|
||||||
((si->ob->flags & CF_SHUTW) ||
|
(si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
|
||||||
si->ob->prod->state != SI_ST_EST ||
|
|
||||||
(channel_is_empty(si->ob) && !si->ob->to_forward)))) {
|
|
||||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||||
}
|
}
|
||||||
if (si->ib->flags & CF_READ_ACTIVITY)
|
if (si->ib->flags & CF_READ_ACTIVITY)
|
||||||
@ -1045,9 +1041,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
|||||||
/* in case of special condition (error, shutdown, end of write...), we
|
/* in case of special condition (error, shutdown, end of write...), we
|
||||||
* have to notify the task.
|
* have to notify the task.
|
||||||
*/
|
*/
|
||||||
if (likely((ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
|
if (likely(ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW|CF_WAKE_WRITE))) {
|
||||||
(channel_is_empty(ob) && !ob->to_forward) ||
|
|
||||||
si->state != SI_ST_EST)) {
|
|
||||||
out_wakeup:
|
out_wakeup:
|
||||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||||
|
Loading…
Reference in New Issue
Block a user