diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index dc9224b79..2f84a1168 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -51,6 +51,7 @@ void appctx_free(struct appctx *appctx); size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags); size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags); +int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags); static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc) { diff --git a/src/applet.c b/src/applet.c index 4ddb0da3b..28341838c 100644 --- a/src/applet.c +++ b/src/applet.c @@ -23,6 +23,7 @@ #include #include #include +#include unsigned int nb_applets = 0; @@ -594,6 +595,75 @@ size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsig return ret; } +int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) +{ + struct appctx *appctx = __sc_appctx(sc); + struct xref *peer; + struct sedesc *sdo = NULL; + unsigned int len; + int ret = 0; + + TRACE_ENTER(APPLET_EV_RECV, appctx); + + /* TODO: outbuf must be empty. Find a better way to handle that but for now just return -1 */ + if (b_data(&appctx->outbuf)) { + TRACE_STATE("Output buffer not empty, cannot fast-forward data", APPLET_EV_RECV, appctx); + return -1; + } + + peer = xref_get_peer_and_lock(&appctx->sedesc->xref); + if (!peer) { + TRACE_STATE("Opposite endpoint not available yet", APPLET_EV_RECV, appctx); + goto end; + } + sdo = container_of(peer, struct sedesc, xref); + xref_unlock(&appctx->sedesc->xref, peer); + + if (appctx->to_forward && count > appctx->to_forward) + count = appctx->to_forward; + + len = se_nego_ff(sdo, &BUF_NULL, count, 0); + if (sdo->iobuf.flags & IOBUF_FL_NO_FF) { + sc_ep_clr(sc, SE_FL_MAY_FASTFWD); + TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", APPLET_EV_RECV, appctx); + goto end; + } + if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) { + sc_ep_set(sc, /* SE_FL_RCV_MORE | */SE_FL_WANT_ROOM); + TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + goto end; + } + + b_add(sdo->iobuf.buf, sdo->iobuf.offset); + ret = appctx->applet->fastfwd(appctx, sdo->iobuf.buf, len, 0); + b_sub(sdo->iobuf.buf, sdo->iobuf.offset); + sdo->iobuf.data += ret; + + if (applet_fl_test(appctx, APPCTX_FL_EOI)) { + se_fl_set(appctx->sedesc, SE_FL_EOI); + sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can + * forward the EOI the to consumer side + */ + TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_EOS)) { + se_fl_set(appctx->sedesc, SE_FL_EOS); + TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_ERROR)) { + se_fl_set(appctx->sedesc, SE_FL_ERROR); + TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + /* else */ + /* applet_have_more_data(appctx); */ + + se_done_ff(sdo); + +end: + TRACE_LEAVE(APPLET_EV_RECV, appctx); + return ret; +} + /* Default applet handler */ struct task *task_run_applet(struct task *t, void *context, unsigned int state) { diff --git a/src/stconn.c b/src/stconn.c index 729003602..3817e7b4d 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -1904,8 +1904,42 @@ int sc_applet_recv(struct stconn *sc) channel_check_idletimer(ic); - /* TODO: Handle fastfwd here be implement callback function first ! */ + /* First, let's see if we may fast-forward data from a side to the other + * one without using the channel buffer. + */ + if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) && + sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) { + if (channel_data(ic)) { + /* We're embarrassed, there are already data pending in + * the buffer and we don't want to have them at two + * locations at a time. Let's indicate we need some + * place and ask the consumer to hurry. + */ + flags |= CO_RFL_BUF_FLUSH; + goto abort_fastfwd; + } + ret = appctx_fastfwd(sc, ic->to_forward, flags); + if (ret < 0) + goto abort_fastfwd; + else if (ret > 0) { + if (ic->to_forward != CHN_INFINITE_FORWARD) + ic->to_forward -= ret; + ic->total += ret; + cur_read += ret; + ic->flags |= CF_READ_EVENT; + } + if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR)) + goto end_recv; + + if (sc_ep_test(sc, SE_FL_WANT_ROOM)) + sc_need_room(sc, -1); + + if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) + goto done_recv; + } + + abort_fastfwd: if (!sc_alloc_ibuf(sc, &appctx->buffer_wait)) goto end_recv; @@ -2108,6 +2142,9 @@ int sc_applet_send(struct stconn *sc) if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) return 0; + /* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */ + BUG_ON(sc_ep_have_ff_data(sc)); + if (co_data(oc)) { ret = appctx->applet->snd_buf(sc, &oc->buf, co_data(oc), 0); if (ret > 0) {