mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 07:37:02 +02:00
[MEDIUM] splice: make use of pipe pools
Using pipe pools makes pipe management a lot easier. It also allows to remove quite a bunch of #ifdefs in areas which depended on the presence or not of support for kernel splicing. The buffer now holds a pointer to a pipe structure which is always NULL except if there are still data in the pipe. When it needs to use that pipe, it dynamically allocates it from the pipe pool. When the data is consumed, the pipe is immediately released. That way, there is no need anymore to care about pipe closure upon session termination, nor about pipe creation when trying to use splice(). Another immediate advantage of this method is that it considerably reduces the number of pipes needed to use splice(). Tests have shown that even with 0.2 pipe per connection, almost all sessions can use splice(), because the same pipe may be used by several consecutive calls to splice().
This commit is contained in:
parent
982b6e37e4
commit
3eba98aa57
@ -48,15 +48,12 @@ static inline void buffer_init(struct buffer *buf)
|
||||
buf->send_max = 0;
|
||||
buf->to_forward = 0;
|
||||
buf->l = buf->total = 0;
|
||||
buf->splice_len = 0;
|
||||
buf->pipe = NULL;
|
||||
buf->analysers = 0;
|
||||
buf->cons = NULL;
|
||||
buf->flags = BF_EMPTY;
|
||||
buf->r = buf->lr = buf->w = buf->data;
|
||||
buf->max_len = BUFSIZE;
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
buf->splice.prod = buf->splice.cons = -1; /* closed */
|
||||
#endif
|
||||
}
|
||||
|
||||
/* returns 1 if the buffer is empty, 0 otherwise */
|
||||
|
@ -128,7 +128,6 @@ struct buffer {
|
||||
int wto; /* write timeout, in ticks */
|
||||
int cto; /* connect timeout, in ticks */
|
||||
unsigned int l; /* data length */
|
||||
unsigned int splice_len; /* number of bytes remaining in splice, out of buffer */
|
||||
char *r, *w, *lr; /* read ptr, write ptr, last read */
|
||||
unsigned int max_len; /* read limit, used to keep room for header rewriting */
|
||||
unsigned int send_max; /* number of bytes the sender can consume om this buffer, <= l */
|
||||
@ -141,12 +140,7 @@ struct buffer {
|
||||
unsigned long long total; /* total data read */
|
||||
struct stream_interface *prod; /* producer attached to this buffer */
|
||||
struct stream_interface *cons; /* consumer attached to this buffer */
|
||||
struct {
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
int prod; /* -1 or fd of the pipe's end towards the producer */
|
||||
int cons; /* -1 or fd of the pipe's end towards the consumer */
|
||||
#endif
|
||||
} splice;
|
||||
struct pipe *pipe; /* non-NULL only when data present */
|
||||
char data[BUFSIZE];
|
||||
};
|
||||
|
||||
@ -158,7 +152,7 @@ struct buffer {
|
||||
split in two parts :
|
||||
- the visible data (->data, for ->l bytes)
|
||||
- the invisible data, typically in kernel buffers forwarded directly from
|
||||
the source stream sock to the destination stream sock (->splice_len
|
||||
the source stream sock to the destination stream sock (->pipe->data
|
||||
bytes). Those are used only during forward.
|
||||
|
||||
In order not to mix data streams, the producer may only feed the invisible
|
||||
@ -173,7 +167,7 @@ struct buffer {
|
||||
buffer is expected to reach the destination file descriptor, by any means.
|
||||
However, it's the consumer's responsibility to ensure that the invisible
|
||||
data has been entirely consumed before consuming visible data. This must be
|
||||
reflected by ->splice_len. This is very important as this and only this can
|
||||
reflected by ->pipe->data. This is very important as this and only this can
|
||||
ensure strict ordering of data between buffers.
|
||||
|
||||
The producer is responsible for decreasing ->to_forward and increasing
|
||||
@ -184,7 +178,7 @@ struct buffer {
|
||||
well as any data forwarded through the visible buffer.
|
||||
|
||||
The consumer is responsible for decreasing ->send_max when it sends data
|
||||
from the visible buffer, and ->splice_len when it sends data from the
|
||||
from the visible buffer, and ->pipe->data when it sends data from the
|
||||
invisible buffer.
|
||||
|
||||
A real-world example consists in part in an HTTP response waiting in a
|
||||
|
@ -75,7 +75,6 @@ extern char *progname; /* program name */
|
||||
extern int pid; /* current process id */
|
||||
extern int relative_pid; /* process id starting at 1 */
|
||||
extern int actconn; /* # of active sessions */
|
||||
extern int usedpipes; /* # of used pipes */
|
||||
extern int listeners;
|
||||
extern char trash[BUFSIZE];
|
||||
extern const int zero;
|
||||
|
3
src/fd.c
3
src/fd.c
@ -18,15 +18,12 @@
|
||||
#include <common/compat.h>
|
||||
#include <common/config.h>
|
||||
|
||||
//#include <types/global.h>
|
||||
|
||||
#include <proto/fd.h>
|
||||
|
||||
struct fdtab *fdtab = NULL; /* array of all the file descriptors */
|
||||
int maxfd; /* # of the highest fd + 1 */
|
||||
int totalconn; /* total # of terminated sessions */
|
||||
int actconn; /* # of active sessions */
|
||||
int usedpipes; /* # of pipes in use (2 fds each) */
|
||||
|
||||
int cfg_polling_mechanism = 0; /* POLL_USE_{SELECT|POLL|EPOLL} */
|
||||
|
||||
|
@ -393,7 +393,7 @@ void init(int argc, char **argv)
|
||||
* Initialize the previously static variables.
|
||||
*/
|
||||
|
||||
usedpipes = totalconn = actconn = maxfd = listeners = stopping = 0;
|
||||
totalconn = actconn = maxfd = listeners = stopping = 0;
|
||||
|
||||
|
||||
#ifdef HAPROXY_MEMMAX
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <proto/hdr_idx.h>
|
||||
#include <proto/log.h>
|
||||
#include <proto/session.h>
|
||||
#include <proto/pipe.h>
|
||||
#include <proto/proto_http.h>
|
||||
#include <proto/proto_tcp.h>
|
||||
#include <proto/queue.h>
|
||||
@ -64,23 +65,11 @@ void session_free(struct session *s)
|
||||
sess_change_server(s, NULL);
|
||||
}
|
||||
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
if (s->req->splice.prod >= 0)
|
||||
close(s->req->splice.prod);
|
||||
if (s->req->splice.cons >= 0)
|
||||
close(s->req->splice.cons);
|
||||
|
||||
if (s->req->splice.prod >= 0 || s->req->splice.cons >= 0)
|
||||
usedpipes--;
|
||||
if (s->req->pipe)
|
||||
put_pipe(s->req->pipe);
|
||||
|
||||
if (s->rep->splice.prod >= 0)
|
||||
close(s->rep->splice.prod);
|
||||
if (s->rep->splice.cons >= 0)
|
||||
close(s->rep->splice.cons);
|
||||
|
||||
if (s->rep->splice.prod >= 0 || s->rep->splice.cons >= 0)
|
||||
usedpipes--;
|
||||
#endif
|
||||
if (s->rep->pipe)
|
||||
put_pipe(s->rep->pipe);
|
||||
|
||||
pool_free2(pool2_buffer, s->req);
|
||||
pool_free2(pool2_buffer, s->rep);
|
||||
@ -772,7 +761,7 @@ void process_session(struct task *t, int *next)
|
||||
!s->req->analysers && !(s->req->flags & BF_HIJACK)) {
|
||||
/* check if it is wise to enable kernel splicing on the request buffer */
|
||||
if (!(s->req->flags & BF_KERN_SPLICING) &&
|
||||
(usedpipes < global.maxpipes) &&
|
||||
(pipes_used < global.maxpipes) &&
|
||||
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
|
||||
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
|
||||
(s->req->flags & BF_STREAMER_FAST))))
|
||||
@ -895,7 +884,7 @@ void process_session(struct task *t, int *next)
|
||||
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
|
||||
/* check if it is wise to enable kernel splicing on the response buffer */
|
||||
if (!(s->rep->flags & BF_KERN_SPLICING) &&
|
||||
(usedpipes < global.maxpipes) &&
|
||||
(pipes_used < global.maxpipes) &&
|
||||
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
|
||||
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
|
||||
(s->rep->flags & BF_STREAMER_FAST))))
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* Functions operating on SOCK_STREAM and buffers.
|
||||
*
|
||||
* Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
|
||||
* Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
@ -30,6 +30,7 @@
|
||||
#include <proto/buffers.h>
|
||||
#include <proto/client.h>
|
||||
#include <proto/fd.h>
|
||||
#include <proto/pipe.h>
|
||||
#include <proto/stream_sock.h>
|
||||
#include <proto/task.h>
|
||||
|
||||
@ -95,6 +96,9 @@ _syscall6(int, splice, int, fdin, loff_t *, off_in, int, fdout, loff_t *, off_ou
|
||||
* SI_FL_ERR
|
||||
* SI_FL_WAIT_ROOM
|
||||
* (SI_FL_WAIT_RECV)
|
||||
*
|
||||
* This function automatically allocates a pipe from the pipe pool. It also
|
||||
* carefully ensures to clear b->pipe whenever it leaves the pipe empty.
|
||||
*/
|
||||
static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
|
||||
{
|
||||
@ -121,17 +125,15 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (unlikely(b->splice.prod == -1)) {
|
||||
int pipefd[2];
|
||||
if (usedpipes >= global.maxpipes || pipe(pipefd) < 0) {
|
||||
if (unlikely(b->pipe == NULL)) {
|
||||
if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
|
||||
b->flags &= ~BF_KERN_SPLICING;
|
||||
return -1;
|
||||
}
|
||||
usedpipes++;
|
||||
b->splice.prod = pipefd[1];
|
||||
b->splice.cons = pipefd[0];
|
||||
}
|
||||
|
||||
/* At this point, b->pipe is valid */
|
||||
|
||||
while (1) {
|
||||
max = b->to_forward;
|
||||
if (max <= 0) {
|
||||
@ -144,7 +146,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
|
||||
break;
|
||||
}
|
||||
|
||||
ret = splice(fd, NULL, b->splice.prod, NULL, max,
|
||||
ret = splice(fd, NULL, b->pipe->prod, NULL, max,
|
||||
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
|
||||
|
||||
if (ret <= 0) {
|
||||
@ -167,7 +169,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
|
||||
* will almost always fill/empty the pipe.
|
||||
*/
|
||||
|
||||
if (b->splice_len > 0) {
|
||||
if (b->pipe->data) {
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
retval = 1;
|
||||
break;
|
||||
@ -191,17 +193,22 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
|
||||
b->to_forward -= ret;
|
||||
total += ret;
|
||||
b->total += ret;
|
||||
b->splice_len += ret;
|
||||
b->pipe->data += ret;
|
||||
b->flags |= BF_READ_PARTIAL;
|
||||
b->flags &= ~BF_EMPTY; /* to prevent shutdowns */
|
||||
|
||||
if (b->splice_len >= SPLICE_FULL_HINT) {
|
||||
if (b->pipe->data >= SPLICE_FULL_HINT) {
|
||||
/* We've read enough of it for this time. */
|
||||
retval = 1;
|
||||
break;
|
||||
}
|
||||
} /* while */
|
||||
|
||||
if (unlikely(!b->pipe->data)) {
|
||||
put_pipe(b->pipe);
|
||||
b->pipe = NULL;
|
||||
}
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
@ -431,13 +438,14 @@ int stream_sock_read(int fd) {
|
||||
|
||||
out_wakeup:
|
||||
/* We might have some data the consumer is waiting for */
|
||||
if ((b->send_max || b->splice_len) && (b->cons->flags & SI_FL_WAIT_DATA)) {
|
||||
int last_len = b->splice_len;
|
||||
if ((b->send_max || b->pipe) && (b->cons->flags & SI_FL_WAIT_DATA)) {
|
||||
int last_len = b->pipe ? b->pipe->data : 0;
|
||||
|
||||
b->cons->chk_snd(b->cons);
|
||||
|
||||
/* check if the consumer has freed some space */
|
||||
if (!(b->flags & BF_FULL) && (!last_len || b->splice_len < last_len))
|
||||
if (!(b->flags & BF_FULL) &&
|
||||
(!last_len || !b->pipe || b->pipe->data < last_len))
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
|
||||
@ -487,7 +495,8 @@ int stream_sock_read(int fd) {
|
||||
/*
|
||||
* This function is called to send buffer data to a stream socket.
|
||||
* It returns -1 in case of unrecoverable error, 0 if the caller needs to poll
|
||||
* before calling it again, otherwise 1.
|
||||
* before calling it again, otherwise 1. If a pipe was associated with the
|
||||
* buffer and it empties it, it releases it as well.
|
||||
*/
|
||||
static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
|
||||
{
|
||||
@ -496,8 +505,8 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
|
||||
int ret, max;
|
||||
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
while (b->splice_len) {
|
||||
ret = splice(b->splice.cons, NULL, si->fd, NULL, b->splice_len,
|
||||
while (b->pipe) {
|
||||
ret = splice(b->pipe->cons, NULL, si->fd, NULL, b->pipe->data,
|
||||
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
|
||||
if (ret <= 0) {
|
||||
if (ret == 0 || errno == EAGAIN) {
|
||||
@ -510,10 +519,13 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
|
||||
}
|
||||
|
||||
b->flags |= BF_WRITE_PARTIAL;
|
||||
b->splice_len -= ret;
|
||||
b->pipe->data -= ret;
|
||||
|
||||
if (!b->splice_len)
|
||||
if (!b->pipe->data) {
|
||||
put_pipe(b->pipe);
|
||||
b->pipe = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
if (--write_poll <= 0)
|
||||
return retval;
|
||||
@ -575,7 +587,7 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
|
||||
if (likely(!b->l)) {
|
||||
/* optimize data alignment in the buffer */
|
||||
b->r = b->w = b->lr = b->data;
|
||||
if (likely(!b->splice_len))
|
||||
if (likely(!b->pipe))
|
||||
b->flags |= BF_EMPTY;
|
||||
}
|
||||
|
||||
@ -669,7 +681,7 @@ int stream_sock_write(int fd)
|
||||
*/
|
||||
}
|
||||
|
||||
if (!b->splice_len && !b->send_max) {
|
||||
if (!b->pipe && !b->send_max) {
|
||||
/* the connection is established but we can't write. Either the
|
||||
* buffer is empty, or we just refrain from sending because the
|
||||
* send_max limit was reached. Maybe we just wrote the last
|
||||
@ -692,7 +704,7 @@ int stream_sock_write(int fd)
|
||||
out_may_wakeup:
|
||||
if (b->flags & BF_WRITE_ACTIVITY) {
|
||||
/* update timeout if we have written something */
|
||||
if ((b->send_max || b->splice_len) &&
|
||||
if ((b->send_max || b->pipe) &&
|
||||
(b->flags & (BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
|
||||
b->wex = tick_add_ifset(now_ms, b->wto);
|
||||
|
||||
@ -716,7 +728,7 @@ int stream_sock_write(int fd)
|
||||
* any more data to forward and it's not planned to send any more.
|
||||
*/
|
||||
if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
|
||||
(!b->to_forward && !b->send_max && !b->splice_len) ||
|
||||
(!b->to_forward && !b->send_max && !b->pipe) ||
|
||||
si->state != SI_ST_EST ||
|
||||
b->prod->state != SI_ST_EST))
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
@ -850,7 +862,7 @@ void stream_sock_data_finish(struct stream_interface *si)
|
||||
/* Check if we need to close the write side */
|
||||
if (!(ob->flags & BF_SHUTW)) {
|
||||
/* Write not closed, update FD status and timeout for writes */
|
||||
if ((ob->send_max == 0 && ob->splice_len == 0) ||
|
||||
if ((ob->send_max == 0 && !ob->pipe) ||
|
||||
(ob->flags & BF_EMPTY) ||
|
||||
(ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
|
||||
/* stop writing */
|
||||
@ -938,7 +950,7 @@ void stream_sock_chk_snd(struct stream_interface *si)
|
||||
|
||||
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
|
||||
(fdtab[si->fd].ev & FD_POLL_OUT) || /* we'll be called anyway */
|
||||
!(ob->send_max || ob->splice_len) || /* called with nothing to send ! */
|
||||
!(ob->send_max || ob->pipe) || /* called with nothing to send ! */
|
||||
!(ob->flags & (BF_HIJACK|BF_WRITE_ENA))) /* we may not write */
|
||||
return;
|
||||
|
||||
@ -953,7 +965,7 @@ void stream_sock_chk_snd(struct stream_interface *si)
|
||||
goto out_wakeup;
|
||||
}
|
||||
|
||||
if (retval > 0 || (ob->send_max == 0 && ob->splice_len == 0)) {
|
||||
if (retval > 0 || (ob->send_max == 0 && !ob->pipe)) {
|
||||
/* the connection is established but we can't write. Either the
|
||||
* buffer is empty, or we just refrain from sending because the
|
||||
* send_max limit was reached. Maybe we just wrote the last
|
||||
@ -980,7 +992,7 @@ void stream_sock_chk_snd(struct stream_interface *si)
|
||||
* have to notify the task.
|
||||
*/
|
||||
if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
|
||||
(!ob->to_forward && !ob->send_max && !ob->splice_len) ||
|
||||
(!ob->to_forward && !ob->send_max && !ob->pipe) ||
|
||||
si->state != SI_ST_EST)) {
|
||||
out_wakeup:
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
|
Loading…
Reference in New Issue
Block a user