[MEDIUM] i/o: rework ->to_forward and ->send_max

The way the buffers and stream interfaces handled ->to_forward was
really not handy for multiple reasons. Now we've moved its control
to the receive-side of the buffer, which is also responsible for
keeping send_max up to date. This makes more sense as it now becomes
possible to send some pre-formatted data followed by forwarded data.

The following explanation has also been added to buffer.h to clarify
the situation. Right now, tests show that the I/O is behaving extremely
well. Some work will have to be done to adapt existing splice code
though.

/* Note about the buffer structure

   The buffer contains two length indicators, one to_forward counter and one
   send_max limit. First, it must be understood that the buffer is in fact
   split in two parts :
     - the visible data (->data, for ->l bytes)
     - the invisible data, typically in kernel buffers forwarded directly from
       the source stream sock to the destination stream sock (->splice_len
       bytes). Those are used only during forward.

   In order not to mix data streams, the producer may only feed the invisible
   data with data to forward, and only when the visible buffer is empty. The
   consumer may not always be able to feed the invisible buffer due to platform
   limitations (lack of kernel support).

   Conversely, the consumer must always take data from the invisible data first
   before ever considering visible data. There is no limit to the size of data
   to consume from the invisible buffer, as platform-specific implementations
   will rarely leave enough control on this. So any byte fed into the invisible
   buffer is expected to reach the destination file descriptor, by any means.
   However, it's the consumer's responsibility to ensure that the invisible
   data has been entirely consumed before consuming visible data. This must be
   reflected by ->splice_len. This is very important as this and only this can
   ensure strict ordering of data between buffers.

   The producer is responsible for decreasing ->to_forward and increasing
   ->send_max. The ->to_forward parameter indicates how many bytes may be fed
   into either data buffer without waking the parent up. The ->send_max
   parameter says how many bytes may be read from the visible buffer. Thus it
   may never exceed ->l. This parameter is updated by any buffer_write() as
   well as any data forwarded through the visible buffer.

   The consumer is responsible for decreasing ->send_max when it sends data
   from the visible buffer, and ->splice_len when it sends data from the
   invisible buffer.

   A real-world example consists in part in an HTTP response waiting in a
   buffer to be forwarded. We know the header length (300) and the amount of
   data to forward (content-length=9000). The buffer already contains 1000
   bytes of data after the 300 bytes of headers. Thus the caller will set
   ->send_max to 300 indicating that it explicitly wants to send those data,
   and set ->to_forward to 9000 (content-length). This value must be normalised
   immediately after updating ->to_forward : since there are already 1300 bytes
   in the buffer, 300 of which are already counted in ->send_max, and that size
   is smaller than ->to_forward, we must update ->send_max to 1300 to flush the
   whole buffer, and reduce ->to_forward to 8000. After that, the producer may
   try to feed the additional data through the invisible buffer using a
   platform-specific method such as splice().
 */
This commit is contained in:
Willy Tarreau 2009-01-08 00:09:41 +01:00
parent 4d9b1dee9f
commit 0abebcc0fb
5 changed files with 89 additions and 28 deletions

View File

@ -88,8 +88,29 @@ static inline void buffer_check_timeouts(struct buffer *b)
b->flags |= BF_ANA_TIMEOUT; b->flags |= BF_ANA_TIMEOUT;
} }
/* flushes any content from buffer <buf> and adjusts flags /* Schedule <bytes> more bytes to be forwarded by the buffer without notifying
* accordingly. * the task. Any pending data in the buffer is scheduled to be sent as well,
* in the limit of the number of bytes to forward. This must be the only method
* to use to schedule bytes to be sent. Directly touching ->to_forward will
* cause lockups when send_max goes down to zero if nobody is ready to push the
* remaining data.
*/
static inline void buffer_forward(struct buffer *buf, unsigned int bytes)
{
unsigned int data_left;
buf->to_forward += bytes;
data_left = buf->l - buf->send_max;
if (data_left > buf->to_forward)
data_left = buf->to_forward;
buf->to_forward -= data_left;
buf->send_max += data_left;
}
/* Flush any content from buffer <buf> and adjusts flags accordingly. Note
* that any spliced data is not affected since we may not have any access to
* it.
*/ */
static inline void buffer_flush(struct buffer *buf) static inline void buffer_flush(struct buffer *buf)
{ {

View File

@ -130,8 +130,8 @@ struct buffer {
unsigned int splice_len; /* number of bytes remaining in splice, out of buffer */ unsigned int splice_len; /* number of bytes remaining in splice, out of buffer */
char *r, *w, *lr; /* read ptr, write ptr, last read */ char *r, *w, *lr; /* read ptr, write ptr, last read */
char *rlim; /* read limit, used for header rewriting */ char *rlim; /* read limit, used for header rewriting */
unsigned int send_max; /* number of bytes the sender can consume */ unsigned int send_max; /* number of bytes the sender can consume om this buffer, <= l */
unsigned int to_forward; /* number of bytes that can send without a wake-up, >= send_max */ unsigned int to_forward; /* number of bytes to forward after send_max without a wake-up */
unsigned int analysers; /* bit field indicating what to do on the buffer */ unsigned int analysers; /* bit field indicating what to do on the buffer */
int analyse_exp; /* expiration date for current analysers (if set) */ int analyse_exp; /* expiration date for current analysers (if set) */
void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */ void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */
@ -144,6 +144,56 @@ struct buffer {
}; };
/* Note about the buffer structure
The buffer contains two length indicators, one to_forward counter and one
send_max limit. First, it must be understood that the buffer is in fact
split in two parts :
- the visible data (->data, for ->l bytes)
- the invisible data, typically in kernel buffers forwarded directly from
the source stream sock to the destination stream sock (->splice_len
bytes). Those are used only during forward.
In order not to mix data streams, the producer may only feed the invisible
data with data to forward, and only when the visible buffer is empty. The
consumer may not always be able to feed the invisible buffer due to platform
limitations (lack of kernel support).
Conversely, the consumer must always take data from the invisible data first
before ever considering visible data. There is no limit to the size of data
to consume from the invisible buffer, as platform-specific implementations
will rarely leave enough control on this. So any byte fed into the invisible
buffer is expected to reach the destination file descriptor, by any means.
However, it's the consumer's responsibility to ensure that the invisible
data has been entirely consumed before consuming visible data. This must be
reflected by ->splice_len. This is very important as this and only this can
ensure strict ordering of data between buffers.
The producer is responsible for decreasing ->to_forward and increasing
->send_max. The ->to_forward parameter indicates how many bytes may be fed
into either data buffer without waking the parent up. The ->send_max
parameter says how many bytes may be read from the visible buffer. Thus it
may never exceed ->l. This parameter is updated by any buffer_write() as
well as any data forwarded through the visible buffer.
The consumer is responsible for decreasing ->send_max when it sends data
from the visible buffer, and ->splice_len when it sends data from the
invisible buffer.
A real-world example consists in part in an HTTP response waiting in a
buffer to be forwarded. We know the header length (300) and the amount of
data to forward (content-length=9000). The buffer already contains 1000
bytes of data after the 300 bytes of headers. Thus the caller will set
->send_max to 300 indicating that it explicitly wants to send those data,
and set ->to_forward to 9000 (content-length). This value must be normalised
immediately after updating ->to_forward : since there are already 1300 bytes
in the buffer, 300 of which are already counted in ->send_max, and that size
is smaller than ->to_forward, we must update ->send_max to 1300 to flush the
whole buffer, and reduce ->to_forward to 8000. After that, the producer may
try to feed the additional data through the invisible buffer using a
platform-specific method such as splice().
*/
#endif /* _TYPES_BUFFERS_H */ #endif /* _TYPES_BUFFERS_H */
/* /*

View File

@ -816,8 +816,7 @@ void uxst_process_session(struct task *t, int *next)
if (!s->req->send_max && s->req->prod->state >= SI_ST_EST && if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
!s->req->analysers && !(s->req->flags & BF_HIJACK)) { !s->req->analysers && !(s->req->flags & BF_HIJACK)) {
if (s->req->to_forward < FORWARD_DEFAULT_SIZE) if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
s->req->to_forward += FORWARD_DEFAULT_SIZE; buffer_forward(s->req, FORWARD_DEFAULT_SIZE);
s->req->send_max = s->req->l;
} }
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */
@ -896,10 +895,8 @@ void uxst_process_session(struct task *t, int *next)
*/ */
if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST && if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) { !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) { if (s->rep->to_forward < FORWARD_DEFAULT_SIZE)
s->rep->to_forward += FORWARD_DEFAULT_SIZE; buffer_forward(s->rep, FORWARD_DEFAULT_SIZE);
}
s->rep->send_max = s->rep->l;
} }
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */

View File

@ -753,8 +753,7 @@ resync_stream_interface:
if (!s->req->send_max && s->req->prod->state >= SI_ST_EST && if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
!s->req->analysers && !(s->req->flags & BF_HIJACK)) { !s->req->analysers && !(s->req->flags & BF_HIJACK)) {
if (s->req->to_forward < FORWARD_DEFAULT_SIZE) if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
s->req->to_forward += FORWARD_DEFAULT_SIZE; buffer_forward(s->req, FORWARD_DEFAULT_SIZE);
s->req->send_max = s->req->l;
} }
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */
@ -868,10 +867,8 @@ resync_stream_interface:
*/ */
if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST && if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) { !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) { if (s->rep->to_forward < FORWARD_DEFAULT_SIZE)
s->rep->to_forward += FORWARD_DEFAULT_SIZE; buffer_forward(s->rep, FORWARD_DEFAULT_SIZE);
}
s->rep->send_max = s->rep->l;
} }
/* reflect what the L7 analysers have seen last */ /* reflect what the L7 analysers have seen last */

View File

@ -115,9 +115,12 @@ int stream_sock_read(int fd) {
b->l += ret; b->l += ret;
cur_read += ret; cur_read += ret;
/* if noone is interested in analysing data, let's forward everything */ /* if we're allowed to directly forward data, we must update send_max */
if (b->to_forward - b->splice_len > b->send_max) if (b->to_forward > 0) {
b->send_max = MIN(b->to_forward - b->splice_len, b->l); int fwd = MIN(b->to_forward, ret);
b->send_max += fwd;
b->to_forward -= fwd;
}
if (fdtab[fd].state == FD_STCONN) if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY; fdtab[fd].state = FD_STREADY;
@ -385,13 +388,6 @@ int stream_sock_write(int fd) {
b->l -= ret; b->l -= ret;
b->w += ret; b->w += ret;
b->send_max -= ret; b->send_max -= ret;
/* we can send up to send_max, we just want to know when
* to_forward has been reached.
*/
if ((signed)(b->to_forward - ret) >= 0)
b->to_forward -= ret;
else
b->to_forward = 0;
if (fdtab[fd].state == FD_STCONN) if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY; fdtab[fd].state = FD_STREADY;
@ -475,10 +471,10 @@ int stream_sock_write(int fd) {
b->prod->chk_rcv(b->prod); b->prod->chk_rcv(b->prod);
/* we have to wake up if there is a special event or if we don't have /* we have to wake up if there is a special event or if we don't have
* any more data to forward. * any more data to forward and it's not planned to send any more.
*/ */
if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
!b->to_forward || (!b->to_forward && !b->send_max && !b->splice_len) ||
si->state != SI_ST_EST || si->state != SI_ST_EST ||
b->prod->state != SI_ST_EST) b->prod->state != SI_ST_EST)
task_wakeup(si->owner, TASK_WOKEN_IO); task_wakeup(si->owner, TASK_WOKEN_IO);