diff --git a/src/stconn.c b/src/stconn.c index 8af16deb9..8a00b18cc 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -806,7 +806,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc) !sc_ep_test(sc, SE_FL_WAIT_DATA)) /* not waiting for data */ return; - if (!(sc->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(sc_oc(sc))) + if (!(sc->wait_event.events & SUB_RETRY_SEND)) sc_conn_send(sc); if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING) || sc_is_conn_error(sc)) { @@ -1090,9 +1090,9 @@ static void sc_notify(struct stconn *sc) * alone because an HTTP parser might need more data to complete the * parsing. */ - if (!channel_is_empty(ic) && - sc_ep_test(sco, SE_FL_WAIT_DATA) && - (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sc_ep_have_ff_data(sco))) { + if (sc_ep_have_ff_data(sc_opposite(sc)) || + (!channel_is_empty(ic) && sc_ep_test(sco, SE_FL_WAIT_DATA) && + (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0))) { int new_len, last_len; last_len = co_data(ic) + sc_ep_ff_data(sco); @@ -1258,14 +1258,51 @@ static int sc_conn_recv(struct stconn *sc) ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST); } - if (sc_ep_have_ff_data(sc_opposite(sc)) && ic->to_forward && - !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_FASTFWD)) { - /* don't break splicing by reading, but still call rcv_buf() - * to pass the flag. - */ - goto done_recv; +#if defined(USE_LINUX_SPLICE) + /* Detect if the splicing is possible depending on the stream policy */ + if ((global.tune.options & GTUNE_USE_SPLICE) && + (ic->to_forward >= MIN_SPLICE_FORWARD) && + ((!(sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_REQ)) || + ((sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_RTR)) || + ((ic->flags & CF_STREAMER_FAST) && ((strm_sess(__sc_strm(sc))->fe->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_AUT)))) + flags |= CO_RFL_MAY_SPLICE; +#endif + + /* First, let's see if we may fast-forward data from a side to the other + * one without using the channel buffer. + */ + if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) { + if (c_data(ic)) { + /* We're embarrassed, there are already data pending in + * the buffer and we don't want to have them at two + * locations at a time. Let's indicate we need some + * place and ask the consumer to hurry. + */ + flags |= CO_RFL_BUF_FLUSH; + goto abort_fastfwd; + } + ret = conn->mux->fastfwd(sc, ic->to_forward, flags); + if (ret < 0) + goto abort_fastfwd; + else if (ret > 0) { + if (ic->to_forward != CHN_INFINITE_FORWARD) + ic->to_forward -= ret; + ic->total += ret; + cur_read += ret; + ic->flags |= CF_READ_EVENT; + } + + if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR)) + goto end_recv; + + if (sc_ep_test(sc, SE_FL_WANT_ROOM)) + sc_need_room(sc, -1); + + if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) + goto done_recv; } + abort_fastfwd: /* now we'll need a input buffer for the stream */ if (!sc_alloc_ibuf(sc, &(__sc_strm(sc)->buffer_wait))) goto end_recv; @@ -1397,7 +1434,9 @@ static int sc_conn_recv(struct stconn *sc) } /* while !flags */ done_recv: - if (cur_read) { + if (!cur_read) + se_have_no_more_data(sc->sedesc); + else { if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && (cur_read <= ic->buf.size / 2)) { ic->xfer_large = 0; @@ -1461,7 +1500,8 @@ static int sc_conn_recv(struct stconn *sc) sc->flags |= SC_FL_ERROR; ret = 1; } - else if (!(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) && + else if (!cur_read && + !(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) && !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) { /* Subscribe to receive events if we're blocking on I/O */ conn->mux->subscribe(sc, SUB_RETRY_RECV, &sc->wait_event); @@ -1539,6 +1579,30 @@ static int sc_conn_send(struct stconn *sc) if (!conn->mux) return 0; + if (sc_ep_have_ff_data(sc)) { + unsigned int send_flag = 0; + + if ((!(sc->flags & (SC_FL_SND_ASAP|SC_FL_SND_NEVERWAIT)) && + ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) || + (sc->flags & SC_FL_SND_EXP_MORE) || + (IS_HTX_STRM(s) && + (!(sco->flags & (SC_FL_EOI|SC_FL_EOS|SC_FL_ABRT_DONE)) && htx_expect_more(htxbuf(&oc->buf)))))) || + ((oc->flags & CF_ISRESP) && + (oc->flags & CF_AUTO_CLOSE) && + (sc->flags & SC_FL_SHUT_WANTED))) + send_flag |= CO_SFL_MSG_MORE; + + if (oc->flags & CF_STREAMER) + send_flag |= CO_SFL_STREAMER; + + ret = conn->mux->resume_fastfwd(sc, send_flag); + if (ret > 0) + did_send = 1; + + if (sc_ep_have_ff_data(sc)) + goto end; + } + /* At this point, the pipe is empty, but we may still have data pending * in the normal buffer. */ @@ -1637,7 +1701,18 @@ static int sc_conn_send(struct stconn *sc) return 1; } - if (!channel_is_empty(oc)) { + /* FIXME: Must be reviewed for FF */ + if (channel_is_empty(oc)) { + /* If fast-forwarding is blocked, unblock it now to check for + * receive on the other side + */ + if (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED) { + sc->sedesc->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED; + sc_have_room(sco); + did_send = 1; + } + } + else { /* We couldn't send all of our data, let the mux know we'd like to send more */ conn->mux->subscribe(sc, SUB_RETRY_SEND, &sc->wait_event); } @@ -1777,7 +1852,7 @@ struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state) if (!sc_conn(sc)) return t; - if (!(sc->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(sc_oc(sc))) + if (!(sc->wait_event.events & SUB_RETRY_SEND) && (!channel_is_empty(sc_oc(sc)) || (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED))) ret = sc_conn_send(sc); if (!(sc->wait_event.events & SUB_RETRY_RECV)) ret |= sc_conn_recv(sc);