diff --git a/include/proto/buffers.h b/include/proto/buffers.h index edb3973a6..b0aad624f 100644 --- a/include/proto/buffers.h +++ b/include/proto/buffers.h @@ -88,8 +88,29 @@ static inline void buffer_check_timeouts(struct buffer *b) b->flags |= BF_ANA_TIMEOUT; } -/* flushes any content from buffer and adjusts flags - * accordingly. +/* Schedule more bytes to be forwarded by the buffer without notifying + * the task. Any pending data in the buffer is scheduled to be sent as well, + * in the limit of the number of bytes to forward. This must be the only method + * to use to schedule bytes to be sent. Directly touching ->to_forward will + * cause lockups when send_max goes down to zero if nobody is ready to push the + * remaining data. + */ +static inline void buffer_forward(struct buffer *buf, unsigned int bytes) +{ + unsigned int data_left; + + buf->to_forward += bytes; + data_left = buf->l - buf->send_max; + if (data_left > buf->to_forward) + data_left = buf->to_forward; + + buf->to_forward -= data_left; + buf->send_max += data_left; +} + +/* Flush any content from buffer and adjusts flags accordingly. Note + * that any spliced data is not affected since we may not have any access to + * it. */ static inline void buffer_flush(struct buffer *buf) { diff --git a/include/types/buffers.h b/include/types/buffers.h index b310aeda8..1ceb9bc03 100644 --- a/include/types/buffers.h +++ b/include/types/buffers.h @@ -130,8 +130,8 @@ struct buffer { unsigned int splice_len; /* number of bytes remaining in splice, out of buffer */ char *r, *w, *lr; /* read ptr, write ptr, last read */ char *rlim; /* read limit, used for header rewriting */ - unsigned int send_max; /* number of bytes the sender can consume */ - unsigned int to_forward; /* number of bytes that can send without a wake-up, >= send_max */ + unsigned int send_max; /* number of bytes the sender can consume om this buffer, <= l */ + unsigned int to_forward; /* number of bytes to forward after send_max without a wake-up */ unsigned int analysers; /* bit field indicating what to do on the buffer */ int analyse_exp; /* expiration date for current analysers (if set) */ void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */ @@ -144,6 +144,56 @@ struct buffer { }; +/* Note about the buffer structure + + The buffer contains two length indicators, one to_forward counter and one + send_max limit. First, it must be understood that the buffer is in fact + split in two parts : + - the visible data (->data, for ->l bytes) + - the invisible data, typically in kernel buffers forwarded directly from + the source stream sock to the destination stream sock (->splice_len + bytes). Those are used only during forward. + + In order not to mix data streams, the producer may only feed the invisible + data with data to forward, and only when the visible buffer is empty. The + consumer may not always be able to feed the invisible buffer due to platform + limitations (lack of kernel support). + + Conversely, the consumer must always take data from the invisible data first + before ever considering visible data. There is no limit to the size of data + to consume from the invisible buffer, as platform-specific implementations + will rarely leave enough control on this. So any byte fed into the invisible + buffer is expected to reach the destination file descriptor, by any means. + However, it's the consumer's responsibility to ensure that the invisible + data has been entirely consumed before consuming visible data. This must be + reflected by ->splice_len. This is very important as this and only this can + ensure strict ordering of data between buffers. + + The producer is responsible for decreasing ->to_forward and increasing + ->send_max. The ->to_forward parameter indicates how many bytes may be fed + into either data buffer without waking the parent up. The ->send_max + parameter says how many bytes may be read from the visible buffer. Thus it + may never exceed ->l. This parameter is updated by any buffer_write() as + well as any data forwarded through the visible buffer. + + The consumer is responsible for decreasing ->send_max when it sends data + from the visible buffer, and ->splice_len when it sends data from the + invisible buffer. + + A real-world example consists in part in an HTTP response waiting in a + buffer to be forwarded. We know the header length (300) and the amount of + data to forward (content-length=9000). The buffer already contains 1000 + bytes of data after the 300 bytes of headers. Thus the caller will set + ->send_max to 300 indicating that it explicitly wants to send those data, + and set ->to_forward to 9000 (content-length). This value must be normalised + immediately after updating ->to_forward : since there are already 1300 bytes + in the buffer, 300 of which are already counted in ->send_max, and that size + is smaller than ->to_forward, we must update ->send_max to 1300 to flush the + whole buffer, and reduce ->to_forward to 8000. After that, the producer may + try to feed the additional data through the invisible buffer using a + platform-specific method such as splice(). + */ + #endif /* _TYPES_BUFFERS_H */ /* diff --git a/src/proto_uxst.c b/src/proto_uxst.c index a6eed1332..f07fa158b 100644 --- a/src/proto_uxst.c +++ b/src/proto_uxst.c @@ -816,8 +816,7 @@ void uxst_process_session(struct task *t, int *next) if (!s->req->send_max && s->req->prod->state >= SI_ST_EST && !s->req->analysers && !(s->req->flags & BF_HIJACK)) { if (s->req->to_forward < FORWARD_DEFAULT_SIZE) - s->req->to_forward += FORWARD_DEFAULT_SIZE; - s->req->send_max = s->req->l; + buffer_forward(s->req, FORWARD_DEFAULT_SIZE); } /* reflect what the L7 analysers have seen last */ @@ -896,10 +895,8 @@ void uxst_process_session(struct task *t, int *next) */ if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST && !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) { - if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) { - s->rep->to_forward += FORWARD_DEFAULT_SIZE; - } - s->rep->send_max = s->rep->l; + if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) + buffer_forward(s->rep, FORWARD_DEFAULT_SIZE); } /* reflect what the L7 analysers have seen last */ diff --git a/src/session.c b/src/session.c index 97ce822a1..3443c46b6 100644 --- a/src/session.c +++ b/src/session.c @@ -753,8 +753,7 @@ resync_stream_interface: if (!s->req->send_max && s->req->prod->state >= SI_ST_EST && !s->req->analysers && !(s->req->flags & BF_HIJACK)) { if (s->req->to_forward < FORWARD_DEFAULT_SIZE) - s->req->to_forward += FORWARD_DEFAULT_SIZE; - s->req->send_max = s->req->l; + buffer_forward(s->req, FORWARD_DEFAULT_SIZE); } /* reflect what the L7 analysers have seen last */ @@ -868,10 +867,8 @@ resync_stream_interface: */ if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST && !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) { - if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) { - s->rep->to_forward += FORWARD_DEFAULT_SIZE; - } - s->rep->send_max = s->rep->l; + if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) + buffer_forward(s->rep, FORWARD_DEFAULT_SIZE); } /* reflect what the L7 analysers have seen last */ diff --git a/src/stream_sock.c b/src/stream_sock.c index 7b684f4e3..ca2fdee73 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -115,9 +115,12 @@ int stream_sock_read(int fd) { b->l += ret; cur_read += ret; - /* if noone is interested in analysing data, let's forward everything */ - if (b->to_forward - b->splice_len > b->send_max) - b->send_max = MIN(b->to_forward - b->splice_len, b->l); + /* if we're allowed to directly forward data, we must update send_max */ + if (b->to_forward > 0) { + int fwd = MIN(b->to_forward, ret); + b->send_max += fwd; + b->to_forward -= fwd; + } if (fdtab[fd].state == FD_STCONN) fdtab[fd].state = FD_STREADY; @@ -385,13 +388,6 @@ int stream_sock_write(int fd) { b->l -= ret; b->w += ret; b->send_max -= ret; - /* we can send up to send_max, we just want to know when - * to_forward has been reached. - */ - if ((signed)(b->to_forward - ret) >= 0) - b->to_forward -= ret; - else - b->to_forward = 0; if (fdtab[fd].state == FD_STCONN) fdtab[fd].state = FD_STREADY; @@ -475,10 +471,10 @@ int stream_sock_write(int fd) { b->prod->chk_rcv(b->prod); /* we have to wake up if there is a special event or if we don't have - * any more data to forward. + * any more data to forward and it's not planned to send any more. */ if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || - !b->to_forward || + (!b->to_forward && !b->send_max && !b->splice_len) || si->state != SI_ST_EST || b->prod->state != SI_ST_EST) task_wakeup(si->owner, TASK_WOKEN_IO);