mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-21 13:51:26 +02:00
MEDIUM: stconn/channel: Move pipes used for the splicing in the SE descriptors
The pipes used to put data when the kernel splicing is in used are moved in the SE descriptors. For now, it is just a simple remplacement but there is a major difference with the pipes in the channel. The data are pushed in the consumer's pipe while it was pushed in the producer's pipe. So it means the request data are now pushed in the pipe of the backend SE descriptor and response data are pushed in the pipe of the frontend SE descriptor. The idea is to hide the pipe from the channel/SC side and to be able to handle fast-forwading in pipe but also in buffer. To do so, the pipe is inside a new entity, called iobuf. This entity will be extended.
This commit is contained in:
parent
1fdfa4f9ba
commit
8bee0dcd7d
@ -202,7 +202,6 @@ struct channel {
|
|||||||
unsigned int flags; /* CF_* */
|
unsigned int flags; /* CF_* */
|
||||||
unsigned int analysers; /* bit field indicating what to do on the channel */
|
unsigned int analysers; /* bit field indicating what to do on the channel */
|
||||||
struct buffer buf; /* buffer attached to the channel, always present but may move */
|
struct buffer buf; /* buffer attached to the channel, always present but may move */
|
||||||
struct pipe *pipe; /* non-NULL only when data present */
|
|
||||||
size_t output; /* part of buffer which is to be forwarded */
|
size_t output; /* part of buffer which is to be forwarded */
|
||||||
unsigned int to_forward; /* number of bytes to forward after out without a wake-up */
|
unsigned int to_forward; /* number of bytes to forward after out without a wake-up */
|
||||||
unsigned short last_read; /* 16 lower bits of last read date (max pause=65s) */
|
unsigned short last_read; /* 16 lower bits of last read date (max pause=65s) */
|
||||||
|
@ -323,7 +323,6 @@ static inline void channel_init(struct channel *chn)
|
|||||||
chn->last_read = now_ms;
|
chn->last_read = now_ms;
|
||||||
chn->xfer_small = chn->xfer_large = 0;
|
chn->xfer_small = chn->xfer_large = 0;
|
||||||
chn->total = 0;
|
chn->total = 0;
|
||||||
chn->pipe = NULL;
|
|
||||||
chn->analysers = 0;
|
chn->analysers = 0;
|
||||||
chn->flags = 0;
|
chn->flags = 0;
|
||||||
chn->output = 0;
|
chn->output = 0;
|
||||||
@ -404,13 +403,13 @@ static inline void channel_htx_forward_forever(struct channel *chn, struct htx *
|
|||||||
/*********************************************************************/
|
/*********************************************************************/
|
||||||
|
|
||||||
/* Reports non-zero if the channel is empty, which means both its
|
/* Reports non-zero if the channel is empty, which means both its
|
||||||
* buffer and pipe are empty. The construct looks strange but is
|
* buffer and pipe on the opposite SE are empty. The construct looks
|
||||||
* jump-less and much more efficient on both 32 and 64-bit than
|
* strange but is jump-less and much more efficient on both 32 and
|
||||||
* the boolean test.
|
* 64-bit than the boolean test.
|
||||||
*/
|
*/
|
||||||
static inline unsigned int channel_is_empty(const struct channel *c)
|
static inline unsigned int channel_is_empty(const struct channel *c)
|
||||||
{
|
{
|
||||||
return !(co_data(c) | (long)c->pipe);
|
return !(co_data(c) | (long)chn_cons(c)->sedesc->iobuf.pipe);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns non-zero if the channel is rewritable, which means that the buffer
|
/* Returns non-zero if the channel is rewritable, which means that the buffer
|
||||||
|
@ -24,9 +24,19 @@
|
|||||||
|
|
||||||
#include <haproxy/obj_type-t.h>
|
#include <haproxy/obj_type-t.h>
|
||||||
#include <haproxy/connection-t.h>
|
#include <haproxy/connection-t.h>
|
||||||
|
#include <haproxy/pipe-t.h>
|
||||||
#include <haproxy/show_flags-t.h>
|
#include <haproxy/show_flags-t.h>
|
||||||
#include <haproxy/xref-t.h>
|
#include <haproxy/xref-t.h>
|
||||||
|
|
||||||
|
enum iobuf_flags {
|
||||||
|
IOBUF_FL_NONE = 0x00000000, /* For initialization purposes */
|
||||||
|
};
|
||||||
|
|
||||||
|
struct iobuf {
|
||||||
|
struct pipe *pipe; /* non-NULL only when data present */
|
||||||
|
unsigned int flags;
|
||||||
|
};
|
||||||
|
|
||||||
/* Stream Endpoint Flags.
|
/* Stream Endpoint Flags.
|
||||||
* Please also update the se_show_flags() function below in case of changes.
|
* Please also update the se_show_flags() function below in case of changes.
|
||||||
*/
|
*/
|
||||||
@ -246,11 +256,13 @@ struct stconn;
|
|||||||
|
|
||||||
* <fsb> should be updated when the first send of a series is blocked and reset
|
* <fsb> should be updated when the first send of a series is blocked and reset
|
||||||
* when a successful send is reported.
|
* when a successful send is reported.
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
struct sedesc {
|
struct sedesc {
|
||||||
void *se; /* the stream endpoint, i.e. the mux stream or the appctx */
|
void *se; /* the stream endpoint, i.e. the mux stream or the appctx */
|
||||||
struct connection *conn; /* the connection for connection-based streams */
|
struct connection *conn; /* the connection for connection-based streams */
|
||||||
struct stconn *sc; /* the stream connector we're attached to, or NULL */
|
struct stconn *sc; /* the stream connector we're attached to, or NULL */
|
||||||
|
struct iobuf iobuf; /* contains data forwarded by the other side and that must be sent by the stream endpoint */
|
||||||
unsigned int flags; /* SE_FL_* */
|
unsigned int flags; /* SE_FL_* */
|
||||||
unsigned int lra; /* the last read activity */
|
unsigned int lra; /* the last read activity */
|
||||||
unsigned int fsb; /* the first send blocked */
|
unsigned int fsb; /* the first send blocked */
|
||||||
|
@ -383,7 +383,7 @@ int appctx_buf_available(void *arg)
|
|||||||
sc_have_buff(sc);
|
sc_have_buff(sc);
|
||||||
|
|
||||||
/* was already allocated another way ? if so, don't take this one */
|
/* was already allocated another way ? if so, don't take this one */
|
||||||
if (c_size(sc_ic(sc)) || sc_ic(sc)->pipe)
|
if (c_size(sc_ic(sc)) || sc_opposite(sc)->sedesc->iobuf.pipe)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
/* allocation possible now ? */
|
/* allocation possible now ? */
|
||||||
|
56
src/stconn.c
56
src/stconn.c
@ -97,6 +97,9 @@ void sedesc_init(struct sedesc *sedesc)
|
|||||||
sedesc->fsb = TICK_ETERNITY;
|
sedesc->fsb = TICK_ETERNITY;
|
||||||
sedesc->xref.peer = NULL;
|
sedesc->xref.peer = NULL;
|
||||||
se_fl_setall(sedesc, SE_FL_NONE);
|
se_fl_setall(sedesc, SE_FL_NONE);
|
||||||
|
|
||||||
|
sedesc->iobuf.pipe = NULL;
|
||||||
|
sedesc->iobuf.flags = IOBUF_FL_NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Tries to alloc an endpoint and initialize it. Returns NULL on failure. */
|
/* Tries to alloc an endpoint and initialize it. Returns NULL on failure. */
|
||||||
@ -117,7 +120,11 @@ struct sedesc *sedesc_new()
|
|||||||
*/
|
*/
|
||||||
void sedesc_free(struct sedesc *sedesc)
|
void sedesc_free(struct sedesc *sedesc)
|
||||||
{
|
{
|
||||||
|
if (sedesc) {
|
||||||
|
if (sedesc->iobuf.pipe)
|
||||||
|
put_pipe(sedesc->iobuf.pipe);
|
||||||
pool_free(pool_head_sedesc, sedesc);
|
pool_free(pool_head_sedesc, sedesc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Tries to allocate a new stconn and initialize its main fields. On
|
/* Tries to allocate a new stconn and initialize its main fields. On
|
||||||
@ -622,9 +629,7 @@ static void sc_app_shut(struct stconn *sc)
|
|||||||
/* default chk_rcv function for scheduled tasks */
|
/* default chk_rcv function for scheduled tasks */
|
||||||
static void sc_app_chk_rcv(struct stconn *sc)
|
static void sc_app_chk_rcv(struct stconn *sc)
|
||||||
{
|
{
|
||||||
struct channel *ic = sc_ic(sc);
|
if (sc_opposite(sc)->sedesc->iobuf.pipe) {
|
||||||
|
|
||||||
if (ic->pipe) {
|
|
||||||
/* stop reading */
|
/* stop reading */
|
||||||
sc_need_room(sc, -1);
|
sc_need_room(sc, -1);
|
||||||
}
|
}
|
||||||
@ -795,7 +800,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc)
|
|||||||
if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
|
if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
|
if (!sc->sedesc->iobuf.pipe && /* spliced data wants to be forwarded ASAP */
|
||||||
!sc_ep_test(sc, SE_FL_WAIT_DATA)) /* not waiting for data */
|
!sc_ep_test(sc, SE_FL_WAIT_DATA)) /* not waiting for data */
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -939,11 +944,9 @@ static void sc_app_shut_applet(struct stconn *sc)
|
|||||||
/* chk_rcv function for applets */
|
/* chk_rcv function for applets */
|
||||||
static void sc_app_chk_rcv_applet(struct stconn *sc)
|
static void sc_app_chk_rcv_applet(struct stconn *sc)
|
||||||
{
|
{
|
||||||
struct channel *ic = sc_ic(sc);
|
|
||||||
|
|
||||||
BUG_ON(!sc_appctx(sc));
|
BUG_ON(!sc_appctx(sc));
|
||||||
|
|
||||||
if (!ic->pipe) {
|
if (!sc_opposite(sc)->sedesc->iobuf.pipe) {
|
||||||
/* (re)start reading */
|
/* (re)start reading */
|
||||||
appctx_wakeup(__sc_appctx(sc));
|
appctx_wakeup(__sc_appctx(sc));
|
||||||
}
|
}
|
||||||
@ -1087,18 +1090,18 @@ static void sc_notify(struct stconn *sc)
|
|||||||
*/
|
*/
|
||||||
if (!channel_is_empty(ic) &&
|
if (!channel_is_empty(ic) &&
|
||||||
sc_ep_test(sco, SE_FL_WAIT_DATA) &&
|
sc_ep_test(sco, SE_FL_WAIT_DATA) &&
|
||||||
(!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
|
(!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sco->sedesc->iobuf.pipe)) {
|
||||||
int new_len, last_len;
|
int new_len, last_len;
|
||||||
|
|
||||||
last_len = co_data(ic);
|
last_len = co_data(ic);
|
||||||
if (ic->pipe)
|
if (sco->sedesc->iobuf.pipe)
|
||||||
last_len += ic->pipe->data;
|
last_len += sco->sedesc->iobuf.pipe->data;
|
||||||
|
|
||||||
sc_chk_snd(sco);
|
sc_chk_snd(sco);
|
||||||
|
|
||||||
new_len = co_data(ic);
|
new_len = co_data(ic);
|
||||||
if (ic->pipe)
|
if (sco->sedesc->iobuf.pipe)
|
||||||
new_len += ic->pipe->data;
|
new_len += sco->sedesc->iobuf.pipe->data;
|
||||||
|
|
||||||
/* check if the consumer has freed some space either in the
|
/* check if the consumer has freed some space either in the
|
||||||
* buffer or in the pipe.
|
* buffer or in the pipe.
|
||||||
@ -1263,7 +1266,7 @@ static int sc_conn_recv(struct stconn *sc)
|
|||||||
* using a buffer.
|
* using a buffer.
|
||||||
*/
|
*/
|
||||||
if (sc_ep_test(sc, SE_FL_MAY_SPLICE) &&
|
if (sc_ep_test(sc, SE_FL_MAY_SPLICE) &&
|
||||||
(ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
|
(sc_opposite(sc)->sedesc->iobuf.pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
|
||||||
ic->flags & CF_KERN_SPLICING) {
|
ic->flags & CF_KERN_SPLICING) {
|
||||||
if (c_data(ic)) {
|
if (c_data(ic)) {
|
||||||
/* We're embarrassed, there are already data pending in
|
/* We're embarrassed, there are already data pending in
|
||||||
@ -1275,14 +1278,14 @@ static int sc_conn_recv(struct stconn *sc)
|
|||||||
goto abort_splice;
|
goto abort_splice;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unlikely(ic->pipe == NULL)) {
|
if (unlikely(sc_opposite(sc)->sedesc->iobuf.pipe == NULL)) {
|
||||||
if (pipes_used >= global.maxpipes || !(ic->pipe = get_pipe())) {
|
if (pipes_used >= global.maxpipes || !(sc_opposite(sc)->sedesc->iobuf.pipe = get_pipe())) {
|
||||||
ic->flags &= ~CF_KERN_SPLICING;
|
ic->flags &= ~CF_KERN_SPLICING;
|
||||||
goto abort_splice;
|
goto abort_splice;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = conn->mux->rcv_pipe(sc, ic->pipe, ic->to_forward);
|
ret = conn->mux->rcv_pipe(sc, sc_opposite(sc)->sedesc->iobuf.pipe, ic->to_forward);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
/* splice not supported on this end, let's disable it */
|
/* splice not supported on this end, let's disable it */
|
||||||
ic->flags &= ~CF_KERN_SPLICING;
|
ic->flags &= ~CF_KERN_SPLICING;
|
||||||
@ -1312,12 +1315,13 @@ static int sc_conn_recv(struct stconn *sc)
|
|||||||
}
|
}
|
||||||
|
|
||||||
abort_splice:
|
abort_splice:
|
||||||
if (ic->pipe && unlikely(!ic->pipe->data)) {
|
if (sc_opposite(sc)->sedesc->iobuf.pipe && unlikely(!sc_opposite(sc)->sedesc->iobuf.pipe->data)) {
|
||||||
put_pipe(ic->pipe);
|
put_pipe(sc_opposite(sc)->sedesc->iobuf.pipe);
|
||||||
ic->pipe = NULL;
|
sc_opposite(sc)->sedesc->iobuf.pipe = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ic->pipe && ic->to_forward && !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) {
|
if (sc_opposite(sc)->sedesc->iobuf.pipe && ic->to_forward &&
|
||||||
|
!(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) {
|
||||||
/* don't break splicing by reading, but still call rcv_buf()
|
/* don't break splicing by reading, but still call rcv_buf()
|
||||||
* to pass the flag.
|
* to pass the flag.
|
||||||
*/
|
*/
|
||||||
@ -1597,17 +1601,17 @@ static int sc_conn_send(struct stconn *sc)
|
|||||||
if (!conn->mux)
|
if (!conn->mux)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
|
if (sc->sedesc->iobuf.pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
|
||||||
ret = conn->mux->snd_pipe(sc, oc->pipe);
|
ret = conn->mux->snd_pipe(sc, sc->sedesc->iobuf.pipe);
|
||||||
if (ret > 0)
|
if (ret > 0)
|
||||||
did_send = 1;
|
did_send = 1;
|
||||||
|
|
||||||
if (!oc->pipe->data) {
|
if (!sc->sedesc->iobuf.pipe->data) {
|
||||||
put_pipe(oc->pipe);
|
put_pipe(sc->sedesc->iobuf.pipe);
|
||||||
oc->pipe = NULL;
|
sc->sedesc->iobuf.pipe = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oc->pipe)
|
if (sc->sedesc->iobuf.pipe)
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
14
src/stream.c
14
src/stream.c
@ -320,10 +320,10 @@ int stream_buf_available(void *arg)
|
|||||||
{
|
{
|
||||||
struct stream *s = arg;
|
struct stream *s = arg;
|
||||||
|
|
||||||
if (!s->req.buf.size && !s->req.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
|
if (!s->req.buf.size && !s->scb->sedesc->iobuf.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
|
||||||
b_alloc(&s->req.buf))
|
b_alloc(&s->req.buf))
|
||||||
sc_have_buff(s->scf);
|
sc_have_buff(s->scf);
|
||||||
else if (!s->res.buf.size && !s->res.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
|
else if (!s->res.buf.size && !s->scf->sedesc->iobuf.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
|
||||||
b_alloc(&s->res.buf))
|
b_alloc(&s->res.buf))
|
||||||
sc_have_buff(s->scb);
|
sc_have_buff(s->scb);
|
||||||
else
|
else
|
||||||
@ -631,12 +631,6 @@ void stream_free(struct stream *s)
|
|||||||
sess_change_server(s, NULL);
|
sess_change_server(s, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s->req.pipe)
|
|
||||||
put_pipe(s->req.pipe);
|
|
||||||
|
|
||||||
if (s->res.pipe)
|
|
||||||
put_pipe(s->res.pipe);
|
|
||||||
|
|
||||||
/* We may still be present in the buffer wait queue */
|
/* We may still be present in the buffer wait queue */
|
||||||
if (LIST_INLIST(&s->buffer_wait.list))
|
if (LIST_INLIST(&s->buffer_wait.list))
|
||||||
LIST_DEL_INIT(&s->buffer_wait.list);
|
LIST_DEL_INIT(&s->buffer_wait.list);
|
||||||
@ -3419,7 +3413,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch
|
|||||||
pfx,
|
pfx,
|
||||||
&strm->req,
|
&strm->req,
|
||||||
strm->req.flags, strm->req.analysers,
|
strm->req.flags, strm->req.analysers,
|
||||||
strm->req.pipe ? strm->req.pipe->data : 0,
|
strm->scb->sedesc->iobuf.pipe ? strm->scb->sedesc->iobuf.pipe->data : 0,
|
||||||
strm->req.to_forward, strm->req.total,
|
strm->req.to_forward, strm->req.total,
|
||||||
pfx,
|
pfx,
|
||||||
strm->req.analyse_exp ?
|
strm->req.analyse_exp ?
|
||||||
@ -3452,7 +3446,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch
|
|||||||
pfx,
|
pfx,
|
||||||
&strm->res,
|
&strm->res,
|
||||||
strm->res.flags, strm->res.analysers,
|
strm->res.flags, strm->res.analysers,
|
||||||
strm->res.pipe ? strm->res.pipe->data : 0,
|
strm->scf->sedesc->iobuf.pipe ? strm->scf->sedesc->iobuf.pipe->data : 0,
|
||||||
strm->res.to_forward, strm->res.total,
|
strm->res.to_forward, strm->res.total,
|
||||||
pfx,
|
pfx,
|
||||||
strm->res.analyse_exp ?
|
strm->res.analyse_exp ?
|
||||||
|
Loading…
x
Reference in New Issue
Block a user