diff --git a/include/haproxy/channel-t.h b/include/haproxy/channel-t.h index f876d9101..8629f5f1e 100644 --- a/include/haproxy/channel-t.h +++ b/include/haproxy/channel-t.h @@ -202,7 +202,6 @@ struct channel { unsigned int flags; /* CF_* */ unsigned int analysers; /* bit field indicating what to do on the channel */ struct buffer buf; /* buffer attached to the channel, always present but may move */ - struct pipe *pipe; /* non-NULL only when data present */ size_t output; /* part of buffer which is to be forwarded */ unsigned int to_forward; /* number of bytes to forward after out without a wake-up */ unsigned short last_read; /* 16 lower bits of last read date (max pause=65s) */ diff --git a/include/haproxy/channel.h b/include/haproxy/channel.h index 36199b172..154017bdf 100644 --- a/include/haproxy/channel.h +++ b/include/haproxy/channel.h @@ -323,7 +323,6 @@ static inline void channel_init(struct channel *chn) chn->last_read = now_ms; chn->xfer_small = chn->xfer_large = 0; chn->total = 0; - chn->pipe = NULL; chn->analysers = 0; chn->flags = 0; chn->output = 0; @@ -404,13 +403,13 @@ static inline void channel_htx_forward_forever(struct channel *chn, struct htx * /*********************************************************************/ /* Reports non-zero if the channel is empty, which means both its - * buffer and pipe are empty. The construct looks strange but is - * jump-less and much more efficient on both 32 and 64-bit than - * the boolean test. + * buffer and pipe on the opposite SE are empty. The construct looks + * strange but is jump-less and much more efficient on both 32 and + * 64-bit than the boolean test. */ static inline unsigned int channel_is_empty(const struct channel *c) { - return !(co_data(c) | (long)c->pipe); + return !(co_data(c) | (long)chn_cons(c)->sedesc->iobuf.pipe); } /* Returns non-zero if the channel is rewritable, which means that the buffer diff --git a/include/haproxy/stconn-t.h b/include/haproxy/stconn-t.h index e507e1e94..1d9888dc5 100644 --- a/include/haproxy/stconn-t.h +++ b/include/haproxy/stconn-t.h @@ -24,9 +24,19 @@ #include #include +#include #include #include +enum iobuf_flags { + IOBUF_FL_NONE = 0x00000000, /* For initialization purposes */ +}; + +struct iobuf { + struct pipe *pipe; /* non-NULL only when data present */ + unsigned int flags; +}; + /* Stream Endpoint Flags. * Please also update the se_show_flags() function below in case of changes. */ @@ -246,11 +256,13 @@ struct stconn; * should be updated when the first send of a series is blocked and reset * when a successful send is reported. + * */ struct sedesc { void *se; /* the stream endpoint, i.e. the mux stream or the appctx */ struct connection *conn; /* the connection for connection-based streams */ struct stconn *sc; /* the stream connector we're attached to, or NULL */ + struct iobuf iobuf; /* contains data forwarded by the other side and that must be sent by the stream endpoint */ unsigned int flags; /* SE_FL_* */ unsigned int lra; /* the last read activity */ unsigned int fsb; /* the first send blocked */ diff --git a/src/applet.c b/src/applet.c index cdcbc2556..95fc90323 100644 --- a/src/applet.c +++ b/src/applet.c @@ -383,7 +383,7 @@ int appctx_buf_available(void *arg) sc_have_buff(sc); /* was already allocated another way ? if so, don't take this one */ - if (c_size(sc_ic(sc)) || sc_ic(sc)->pipe) + if (c_size(sc_ic(sc)) || sc_opposite(sc)->sedesc->iobuf.pipe) return 0; /* allocation possible now ? */ diff --git a/src/stconn.c b/src/stconn.c index fbfebc7a5..89face267 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -97,6 +97,9 @@ void sedesc_init(struct sedesc *sedesc) sedesc->fsb = TICK_ETERNITY; sedesc->xref.peer = NULL; se_fl_setall(sedesc, SE_FL_NONE); + + sedesc->iobuf.pipe = NULL; + sedesc->iobuf.flags = IOBUF_FL_NONE; } /* Tries to alloc an endpoint and initialize it. Returns NULL on failure. */ @@ -117,7 +120,11 @@ struct sedesc *sedesc_new() */ void sedesc_free(struct sedesc *sedesc) { - pool_free(pool_head_sedesc, sedesc); + if (sedesc) { + if (sedesc->iobuf.pipe) + put_pipe(sedesc->iobuf.pipe); + pool_free(pool_head_sedesc, sedesc); + } } /* Tries to allocate a new stconn and initialize its main fields. On @@ -622,9 +629,7 @@ static void sc_app_shut(struct stconn *sc) /* default chk_rcv function for scheduled tasks */ static void sc_app_chk_rcv(struct stconn *sc) { - struct channel *ic = sc_ic(sc); - - if (ic->pipe) { + if (sc_opposite(sc)->sedesc->iobuf.pipe) { /* stop reading */ sc_need_room(sc, -1); } @@ -795,7 +800,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc) if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */ return; - if (!oc->pipe && /* spliced data wants to be forwarded ASAP */ + if (!sc->sedesc->iobuf.pipe && /* spliced data wants to be forwarded ASAP */ !sc_ep_test(sc, SE_FL_WAIT_DATA)) /* not waiting for data */ return; @@ -939,11 +944,9 @@ static void sc_app_shut_applet(struct stconn *sc) /* chk_rcv function for applets */ static void sc_app_chk_rcv_applet(struct stconn *sc) { - struct channel *ic = sc_ic(sc); - BUG_ON(!sc_appctx(sc)); - if (!ic->pipe) { + if (!sc_opposite(sc)->sedesc->iobuf.pipe) { /* (re)start reading */ appctx_wakeup(__sc_appctx(sc)); } @@ -1087,18 +1090,18 @@ static void sc_notify(struct stconn *sc) */ if (!channel_is_empty(ic) && sc_ep_test(sco, SE_FL_WAIT_DATA) && - (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) { + (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sco->sedesc->iobuf.pipe)) { int new_len, last_len; last_len = co_data(ic); - if (ic->pipe) - last_len += ic->pipe->data; + if (sco->sedesc->iobuf.pipe) + last_len += sco->sedesc->iobuf.pipe->data; sc_chk_snd(sco); new_len = co_data(ic); - if (ic->pipe) - new_len += ic->pipe->data; + if (sco->sedesc->iobuf.pipe) + new_len += sco->sedesc->iobuf.pipe->data; /* check if the consumer has freed some space either in the * buffer or in the pipe. @@ -1263,7 +1266,7 @@ static int sc_conn_recv(struct stconn *sc) * using a buffer. */ if (sc_ep_test(sc, SE_FL_MAY_SPLICE) && - (ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) && + (sc_opposite(sc)->sedesc->iobuf.pipe || ic->to_forward >= MIN_SPLICE_FORWARD) && ic->flags & CF_KERN_SPLICING) { if (c_data(ic)) { /* We're embarrassed, there are already data pending in @@ -1275,14 +1278,14 @@ static int sc_conn_recv(struct stconn *sc) goto abort_splice; } - if (unlikely(ic->pipe == NULL)) { - if (pipes_used >= global.maxpipes || !(ic->pipe = get_pipe())) { + if (unlikely(sc_opposite(sc)->sedesc->iobuf.pipe == NULL)) { + if (pipes_used >= global.maxpipes || !(sc_opposite(sc)->sedesc->iobuf.pipe = get_pipe())) { ic->flags &= ~CF_KERN_SPLICING; goto abort_splice; } } - ret = conn->mux->rcv_pipe(sc, ic->pipe, ic->to_forward); + ret = conn->mux->rcv_pipe(sc, sc_opposite(sc)->sedesc->iobuf.pipe, ic->to_forward); if (ret < 0) { /* splice not supported on this end, let's disable it */ ic->flags &= ~CF_KERN_SPLICING; @@ -1312,12 +1315,13 @@ static int sc_conn_recv(struct stconn *sc) } abort_splice: - if (ic->pipe && unlikely(!ic->pipe->data)) { - put_pipe(ic->pipe); - ic->pipe = NULL; + if (sc_opposite(sc)->sedesc->iobuf.pipe && unlikely(!sc_opposite(sc)->sedesc->iobuf.pipe->data)) { + put_pipe(sc_opposite(sc)->sedesc->iobuf.pipe); + sc_opposite(sc)->sedesc->iobuf.pipe = NULL; } - if (ic->pipe && ic->to_forward && !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) { + if (sc_opposite(sc)->sedesc->iobuf.pipe && ic->to_forward && + !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) { /* don't break splicing by reading, but still call rcv_buf() * to pass the flag. */ @@ -1597,17 +1601,17 @@ static int sc_conn_send(struct stconn *sc) if (!conn->mux) return 0; - if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) { - ret = conn->mux->snd_pipe(sc, oc->pipe); + if (sc->sedesc->iobuf.pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) { + ret = conn->mux->snd_pipe(sc, sc->sedesc->iobuf.pipe); if (ret > 0) did_send = 1; - if (!oc->pipe->data) { - put_pipe(oc->pipe); - oc->pipe = NULL; + if (!sc->sedesc->iobuf.pipe->data) { + put_pipe(sc->sedesc->iobuf.pipe); + sc->sedesc->iobuf.pipe = NULL; } - if (oc->pipe) + if (sc->sedesc->iobuf.pipe) goto end; } diff --git a/src/stream.c b/src/stream.c index 45a0ad766..4c300a336 100644 --- a/src/stream.c +++ b/src/stream.c @@ -320,10 +320,10 @@ int stream_buf_available(void *arg) { struct stream *s = arg; - if (!s->req.buf.size && !s->req.pipe && s->scf->flags & SC_FL_NEED_BUFF && + if (!s->req.buf.size && !s->scb->sedesc->iobuf.pipe && s->scf->flags & SC_FL_NEED_BUFF && b_alloc(&s->req.buf)) sc_have_buff(s->scf); - else if (!s->res.buf.size && !s->res.pipe && s->scb->flags & SC_FL_NEED_BUFF && + else if (!s->res.buf.size && !s->scf->sedesc->iobuf.pipe && s->scb->flags & SC_FL_NEED_BUFF && b_alloc(&s->res.buf)) sc_have_buff(s->scb); else @@ -631,12 +631,6 @@ void stream_free(struct stream *s) sess_change_server(s, NULL); } - if (s->req.pipe) - put_pipe(s->req.pipe); - - if (s->res.pipe) - put_pipe(s->res.pipe); - /* We may still be present in the buffer wait queue */ if (LIST_INLIST(&s->buffer_wait.list)) LIST_DEL_INIT(&s->buffer_wait.list); @@ -3419,7 +3413,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch pfx, &strm->req, strm->req.flags, strm->req.analysers, - strm->req.pipe ? strm->req.pipe->data : 0, + strm->scb->sedesc->iobuf.pipe ? strm->scb->sedesc->iobuf.pipe->data : 0, strm->req.to_forward, strm->req.total, pfx, strm->req.analyse_exp ? @@ -3452,7 +3446,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch pfx, &strm->res, strm->res.flags, strm->res.analysers, - strm->res.pipe ? strm->res.pipe->data : 0, + strm->scf->sedesc->iobuf.pipe ? strm->scf->sedesc->iobuf.pipe->data : 0, strm->res.to_forward, strm->res.total, pfx, strm->res.analyse_exp ?