[MEDIUM] add a send limit to a buffer

For keep-alive, line-mode protocols and splicing, we will need to
limit the sender to process a certain amount of bytes. The limit
is automatically set to the buffer size when analysers are detached
from the buffer.
This commit is contained in:
Willy Tarreau 2008-12-13 21:12:26 +01:00
parent 05cb29bcd0
commit f890dc9003
6 changed files with 36 additions and 4 deletions

View File

@ -45,6 +45,7 @@ int init_buffer();
*/
static inline void buffer_init(struct buffer *buf)
{
buf->send_max = 0;
buf->l = buf->total = 0;
buf->analysers = 0;
buf->cons = NULL;
@ -87,6 +88,7 @@ static inline void buffer_check_timeouts(struct buffer *b)
*/
static inline void buffer_flush(struct buffer *buf)
{
buf->send_max = 0;
buf->r = buf->lr = buf->w = buf->data;
buf->l = 0;
buf->flags |= BF_EMPTY | BF_FULL;

View File

@ -129,6 +129,7 @@ struct buffer {
unsigned int l; /* data length */
char *r, *w, *lr; /* read ptr, write ptr, last read */
char *rlim; /* read limit, used for header rewriting */
unsigned int send_max; /* number of bytes the sender can consume */
unsigned int analysers; /* bit field indicating what to do on the buffer */
int analyse_exp; /* expiration date for current analysers (if set) */
void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */

View File

@ -30,7 +30,8 @@ int init_buffer()
/* writes <len> bytes from message <msg> to buffer <buf>. Returns -1 in case of
* success, or the number of bytes available otherwise.
* success, or the number of bytes available otherwise. The send limit is
* automatically adjusted with the amount of data written.
* FIXME-20060521: handle unaligned data.
*/
int buffer_write(struct buffer *buf, const char *msg, int len)
@ -44,6 +45,7 @@ int buffer_write(struct buffer *buf, const char *msg, int len)
memcpy(buf->r, msg, len);
buf->l += len;
buf->send_max += len;
buf->r += len;
buf->total += len;
if (buf->r == buf->data + BUFSIZE)
@ -60,7 +62,8 @@ int buffer_write(struct buffer *buf, const char *msg, int len)
/* writes the chunk <chunk> to buffer <buf>. Returns -1 in case of
* success, or the number of bytes available otherwise. If the chunk
* has been written, its size is automatically reset to zero.
* has been written, its size is automatically reset to zero. The send limit is
* automatically adjusted with the amount of data written.
*/
int buffer_write_chunk(struct buffer *buf, struct chunk *chunk)
{
@ -76,6 +79,7 @@ int buffer_write_chunk(struct buffer *buf, struct chunk *chunk)
memcpy(buf->r, chunk->str, chunk->len);
buf->l += chunk->len;
buf->send_max += chunk->len;
buf->r += chunk->len;
buf->total += chunk->len;
if (buf->r == buf->data + BUFSIZE)
@ -133,7 +137,7 @@ int buffer_replace(struct buffer *b, char *pos, char *end, const char *str)
/*
* same except that the string length is given, which allows str to be NULL if
* len is 0.
* len is 0. The send limit is *not* adjusted.
*/
int buffer_replace2(struct buffer *b, char *pos, char *end, const char *str, int len)
{
@ -178,7 +182,7 @@ int buffer_replace2(struct buffer *b, char *pos, char *end, const char *str, int
* argument informs about the length of string <str> so that we don't have to
* measure it. It does not include the "\r\n". If <str> is NULL, then the buffer
* is only opened for len+2 bytes but nothing is copied in. It may be useful in
* some circumstances.
* some circumstances. The send limit is *not* adjusted.
*
* The number of bytes added is returned on success. 0 is returned on failure.
*/

View File

@ -799,6 +799,10 @@ void uxst_process_session(struct task *t, int *next)
resync = 1;
}
/* if noone is interested in analysing data, let's forward everything */
if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
s->req->send_max = s->req->l;
/* reflect what the L7 analysers have seen last */
rqf_last = s->req->flags;
@ -869,6 +873,10 @@ void uxst_process_session(struct task *t, int *next)
resync = 1;
}
/* if noone is interested in analysing data, let's forward everything */
if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
s->rep->send_max = s->rep->l;
/* reflect what the L7 analysers have seen last */
rpf_last = s->rep->flags;

View File

@ -744,6 +744,10 @@ void process_session(struct task *t, int *next)
resync = 1;
}
/* if noone is interested in analysing data, let's forward everything */
if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
s->req->send_max = s->req->l;
/* reflect what the L7 analysers have seen last */
rqf_last = s->req->flags;
@ -849,6 +853,10 @@ void process_session(struct task *t, int *next)
resync = 1;
}
/* if noone is interested in analysing data, let's forward everything */
if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
s->rep->send_max = s->rep->l;
/* reflect what the L7 analysers have seen last */
rpf_last = s->rep->flags;

View File

@ -114,6 +114,10 @@ int stream_sock_read(int fd) {
b->l += ret;
cur_read += ret;
/* if noone is interested in analysing data, let's forward everything */
if (!b->analysers)
b->send_max += ret;
if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY;
@ -306,6 +310,10 @@ int stream_sock_write(int fd) {
max = b->data + BUFSIZE - b->w;
}
/* limit the amount of outgoing data if required */
if (max > b->send_max)
max = b->send_max;
if (max == 0) {
/* may be we have received a connection acknowledgement in TCP mode without data */
if (likely(fdtab[fd].state == FD_STCONN)) {
@ -363,6 +371,7 @@ int stream_sock_write(int fd) {
if (ret > 0) {
b->l -= ret;
b->w += ret;
b->send_max -= ret;
if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY;