[MAJOR] implement autonomous inter-socket forwarding

If an analyser sets buf->to_forward to a given value, that many
data will be forwarded between the two stream interfaces attached
to a buffer without waking the task up. The same applies once all
analysers have been released. This saves a large amount of calls
to process_session() and a number of task_dequeue/queue.
This commit is contained in:
Willy Tarreau 2008-12-14 17:31:54 +01:00
parent 3ffeba1f67
commit 6b66f3e4f6
6 changed files with 83 additions and 15 deletions

View File

@ -2,8 +2,8 @@
include/common/defaults.h include/common/defaults.h
Miscellaneous default values. Miscellaneous default values.
Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation, version 2.1 License as published by the Free Software Foundation, version 2.1
@ -40,6 +40,16 @@
#define MAXREWRITE (BUFSIZE / 2) #define MAXREWRITE (BUFSIZE / 2)
#endif #endif
/* FORWARD_DEFAULT_SIZE
* Indicates how many bytes may be forwarded at once in low-level stream-socks
* without waking the owner task up. This should be much larger than the buffer
* size. A few megabytes seem appropriate.
*/
#ifndef FORWARD_DEFAULT_SIZE
#define FORWARD_DEFAULT_SIZE (16*1024*1024)
#endif
#define REQURI_LEN 1024 #define REQURI_LEN 1024
#define CAPTURE_LEN 64 #define CAPTURE_LEN 64

View File

@ -46,6 +46,7 @@ int init_buffer();
static inline void buffer_init(struct buffer *buf) static inline void buffer_init(struct buffer *buf)
{ {
buf->send_max = 0; buf->send_max = 0;
buf->to_forward = 0;
buf->l = buf->total = 0; buf->l = buf->total = 0;
buf->analysers = 0; buf->analysers = 0;
buf->cons = NULL; buf->cons = NULL;
@ -92,6 +93,7 @@ static inline void buffer_check_timeouts(struct buffer *b)
static inline void buffer_flush(struct buffer *buf) static inline void buffer_flush(struct buffer *buf)
{ {
buf->send_max = 0; buf->send_max = 0;
buf->to_forward = 0;
buf->r = buf->lr = buf->w = buf->data; buf->r = buf->lr = buf->w = buf->data;
buf->l = 0; buf->l = 0;
buf->flags |= BF_EMPTY | BF_FULL; buf->flags |= BF_EMPTY | BF_FULL;

View File

@ -130,6 +130,7 @@ struct buffer {
char *r, *w, *lr; /* read ptr, write ptr, last read */ char *r, *w, *lr; /* read ptr, write ptr, last read */
char *rlim; /* read limit, used for header rewriting */ char *rlim; /* read limit, used for header rewriting */
unsigned int send_max; /* number of bytes the sender can consume */ unsigned int send_max; /* number of bytes the sender can consume */
unsigned int to_forward; /* number of bytes that can send without a wake-up, >= send_max */
unsigned int analysers; /* bit field indicating what to do on the buffer */ unsigned int analysers; /* bit field indicating what to do on the buffer */
int analyse_exp; /* expiration date for current analysers (if set) */ int analyse_exp; /* expiration date for current analysers (if set) */
void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */ void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */

View File

@ -809,6 +809,17 @@ void uxst_process_session(struct task *t, int *next)
if (!s->req->analysers && !(s->req->flags & BF_HIJACK)) if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
s->req->send_max = s->req->l; s->req->send_max = s->req->l;
/* if noone is interested in analysing data, let's forward everything
* and only wake up every 1-2 MB. We still wake up when send_max is
* reached though.
*/
if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
!s->req->analysers && !(s->req->flags & BF_HIJACK)) {
if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
s->req->to_forward += FORWARD_DEFAULT_SIZE;
s->req->send_max = s->req->l;
}
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */
rqf_last = s->req->flags; rqf_last = s->req->flags;
@ -879,9 +890,17 @@ void uxst_process_session(struct task *t, int *next)
resync = 1; resync = 1;
} }
/* if noone is interested in analysing data, let's forward everything */ /* if noone is interested in analysing data, let's forward everything
if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) * and only wake up every 1-2 MB. We still wake up when send_max is
* reached though.
*/
if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
s->rep->to_forward += FORWARD_DEFAULT_SIZE;
}
s->rep->send_max = s->rep->l; s->rep->send_max = s->rep->l;
}
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */
rpf_last = s->rep->flags; rpf_last = s->rep->flags;

View File

@ -746,9 +746,16 @@ void process_session(struct task *t, int *next)
resync = 1; resync = 1;
} }
/* if noone is interested in analysing data, let's forward everything */ /* if noone is interested in analysing data, let's forward everything
if (!s->req->analysers && !(s->req->flags & BF_HIJACK)) * and only wake up every 1-2 MB. We still wake up when send_max is
* reached though.
*/
if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
!s->req->analysers && !(s->req->flags & BF_HIJACK)) {
if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
s->req->to_forward += FORWARD_DEFAULT_SIZE;
s->req->send_max = s->req->l; s->req->send_max = s->req->l;
}
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */
rqf_last = s->req->flags; rqf_last = s->req->flags;
@ -855,9 +862,17 @@ void process_session(struct task *t, int *next)
resync = 1; resync = 1;
} }
/* if noone is interested in analysing data, let's forward everything */ /* if noone is interested in analysing data, let's forward everything
if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) * and only wake up every 1-2 MB. We still wake up when send_max is
* reached though.
*/
if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
s->rep->to_forward += FORWARD_DEFAULT_SIZE;
}
s->rep->send_max = s->rep->l; s->rep->send_max = s->rep->l;
}
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */
rpf_last = s->rep->flags; rpf_last = s->rep->flags;
@ -870,7 +885,7 @@ void process_session(struct task *t, int *next)
* FIXME: this is probably where we should produce error responses. * FIXME: this is probably where we should produce error responses.
*/ */
/* first, let's check if the request buffer needs to shutdown(write) */ /* first, let's check if the response buffer needs to shutdown(write) */
if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
(BF_EMPTY|BF_WRITE_ENA|BF_SHUTR))) (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)))
buffer_shutw_now(s->rep); buffer_shutw_now(s->rep);

View File

@ -116,8 +116,8 @@ int stream_sock_read(int fd) {
cur_read += ret; cur_read += ret;
/* if noone is interested in analysing data, let's forward everything */ /* if noone is interested in analysing data, let's forward everything */
if (!b->analysers) if (b->to_forward > b->send_max)
b->send_max += ret; b->send_max = MIN(b->to_forward, b->l);
if (fdtab[fd].state == FD_STCONN) if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY; fdtab[fd].state = FD_STREADY;
@ -251,10 +251,17 @@ int stream_sock_read(int fd) {
goto out_skip_wakeup; goto out_skip_wakeup;
out_wakeup: out_wakeup:
/* the consumer might be waiting for data */ /* the consumer might be waiting for data */
if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL)) if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL) && !(b->flags & BF_EMPTY))
b->cons->chk_snd(b->cons); b->cons->chk_snd(b->cons);
task_wakeup(si->owner, TASK_WOKEN_IO); /* we have to wake up if there is a special event or if we don't have
* any more data to forward.
*/
if ((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
!b->to_forward ||
si->state != SI_ST_EST ||
b->cons->state != SI_ST_EST)
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup: out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_IN; fdtab[fd].ev &= ~FD_POLL_IN;
@ -379,6 +386,13 @@ int stream_sock_write(int fd) {
b->l -= ret; b->l -= ret;
b->w += ret; b->w += ret;
b->send_max -= ret; b->send_max -= ret;
/* we can send up to send_max, we just want to know when
* to_forward has been reached.
*/
if ((signed)(b->to_forward - ret) >= 0)
b->to_forward -= ret;
else
b->to_forward = 0;
if (fdtab[fd].state == FD_STCONN) if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY; fdtab[fd].state = FD_STREADY;
@ -453,10 +467,17 @@ int stream_sock_write(int fd) {
goto out_skip_wakeup; goto out_skip_wakeup;
out_wakeup: out_wakeup:
/* the producer might be waiting for more room to store data */ /* the producer might be waiting for more room to store data */
if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL)) if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL) && !(b->flags & BF_FULL))
b->prod->chk_rcv(b->prod); b->prod->chk_rcv(b->prod);
task_wakeup(si->owner, TASK_WOKEN_IO); /* we have to wake up if there is a special event or if we don't have
* any more data to forward.
*/
if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
!b->to_forward ||
si->state != SI_ST_EST ||
b->prod->state != SI_ST_EST)
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup: out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_OUT; fdtab[fd].ev &= ~FD_POLL_OUT;