diff --git a/include/haproxy/sc_strm.h b/include/haproxy/sc_strm.h index 722662eaa..8d4d6fbf2 100644 --- a/include/haproxy/sc_strm.h +++ b/include/haproxy/sc_strm.h @@ -40,6 +40,9 @@ struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state); int sc_conn_sync_recv(struct stconn *sc); void sc_conn_sync_send(struct stconn *sc); +int sc_applet_sync_recv(struct stconn *sc); +void sc_applet_sync_send(struct stconn *sc); + /* returns the channel which receives data from this stream connector (input channel) */ static inline struct channel *sc_ic(const struct stconn *sc) diff --git a/src/stconn.c b/src/stconn.c index c6d61a40d..729003602 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -1864,6 +1864,313 @@ static void sc_applet_eos(struct stconn *sc) return sc_app_shut_applet(sc); } +/* + * This is the callback which is called by the applet layer to receive data into + * the buffer from the appctx. It iterates over the applet's rcv_buf + * function. Please do not statify this function, it's often present in + * backtraces, it's useful to recognize it. + */ +int sc_applet_recv(struct stconn *sc) +{ + struct appctx *appctx = __sc_appctx(sc); + struct channel *ic = sc_ic(sc); + int ret, max, cur_read = 0; + int read_poll = MAX_READ_POLL_LOOPS; + int flags = 0; + + + /* If another call to sc_applet_recv() failed, give up now. + */ + if (sc_waiting_room(sc)) + return 0; + + /* maybe we were called immediately after an asynchronous abort */ + if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) + return 1; + + /* We must wait because the applet is not fully initialized */ + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return 0; + + /* stop immediately on errors. */ + if (!sc_ep_test(sc, SE_FL_RCV_MORE)) { + // TODO: be sure SE_FL_RCV_MORE may be set for applet ? + if (sc_ep_test(sc, SE_FL_ERROR)) + goto end_recv; + } + + /* prepare to detect if the mux needs more room */ + sc_ep_clr(sc, SE_FL_WANT_ROOM); + + channel_check_idletimer(ic); + + /* TODO: Handle fastfwd here be implement callback function first ! */ + + if (!sc_alloc_ibuf(sc, &appctx->buffer_wait)) + goto end_recv; + + /* For an HTX stream, if the buffer is stuck (no output data with some + * input data) and if the HTX message is fragmented or if its free space + * wraps, we force an HTX deframentation. It is a way to have a + * contiguous free space nad to let the mux to copy as much data as + * possible. + * + * NOTE: A possible optim may be to let the mux decides if defrag is + * required or not, depending on amount of data to be xferred. + */ + if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) { + struct htx *htx = htxbuf(&ic->buf); + + if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx))) + htx_defrag(htx, NULL, 0); + } + + /* Compute transient CO_RFL_* flags */ + if (co_data(ic)) { + flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK); + } + + /* may be null. This is the mux responsibility to set + * SE_FL_RCV_MORE on the SC if more space is needed. + */ + max = channel_recv_max(ic); + ret = appctx->applet->rcv_buf(sc, &ic->buf, max, flags); + if (sc_ep_test(sc, SE_FL_WANT_ROOM)) { + /* SE_FL_WANT_ROOM must not be reported if the channel's + * buffer is empty. + */ + BUG_ON(c_empty(ic)); + + sc_need_room(sc, channel_recv_max(ic) + 1); + /* Add READ_PARTIAL because some data are pending but + * cannot be xferred to the channel + */ + ic->flags |= CF_READ_EVENT; + sc_ep_report_read_activity(sc); + } + + if (ret <= 0) { + /* if we refrained from reading because we asked for a flush to + * satisfy rcv_pipe(), report that there's not enough room here + * to proceed. + */ + if (flags & CO_RFL_BUF_FLUSH) + sc_need_room(sc, -1); + goto done_recv; + } + + cur_read += ret; + + /* if we're allowed to directly forward data, we must update ->o */ + if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) { + unsigned long fwd = ret; + if (ic->to_forward != CHN_INFINITE_FORWARD) { + if (fwd > ic->to_forward) + fwd = ic->to_forward; + ic->to_forward -= fwd; + } + c_adv(ic, fwd); + } + + ic->flags |= CF_READ_EVENT; + ic->total += ret; + + /* End-of-input reached, we can leave. In this case, it is + * important to break the loop to not block the SC because of + * the channel's policies.This way, we are still able to receive + * shutdowns. + */ + if (sc_ep_test(sc, SE_FL_EOI)) + goto done_recv; + + if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) { + /* we don't expect to read more data */ + sc_wont_read(sc); + goto done_recv; + } + + /* if too many bytes were missing from last read, it means that + * it's pointless trying to read again because the system does + * not have them in buffers. + */ + if (ret < max) { + /* if a streamer has read few data, it may be because we + * have exhausted system buffers. It's not worth trying + * again. + */ + if (ic->flags & CF_STREAMER) { + /* we're stopped by the channel's policy */ + sc_wont_read(sc); + goto done_recv; + } + + /* if we read a large block smaller than what we requested, + * it's almost certain we'll never get anything more. + */ + if (ret >= global.tune.recv_enough) { + /* we're stopped by the channel's policy */ + sc_wont_read(sc); + } + } + + done_recv: + if (!cur_read) + se_have_no_more_data(sc->sedesc); + else { + channel_check_xfer(ic, cur_read); + sc_ep_report_read_activity(sc); + } + + end_recv: + ret = (cur_read != 0); + + /* Report EOI on the channel if it was reached from the mux point of + * view. */ + if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { + sc_ep_report_read_activity(sc); + sc->flags |= SC_FL_EOI; + ic->flags |= CF_READ_EVENT; + ret = 1; + } + + if (sc_ep_test(sc, SE_FL_EOS)) { + /* we received a shutdown */ + if (ic->flags & CF_AUTO_CLOSE) + sc_schedule_shutdown(sc_opposite(sc)); + sc_applet_eos(sc); + ret = 1; + } + + if (sc_ep_test(sc, SE_FL_ERROR)) { + sc->flags |= SC_FL_ERROR; + ret = 1; + } + else if (!cur_read && + !(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) && + !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) { + se_have_no_more_data(sc->sedesc); + } + else { + se_have_more_data(sc->sedesc); + ret = 1; + } + + return ret; +} + +/* This tries to perform a synchronous receive on the stream connector to + * try to collect last arrived data. In practice it's only implemented on + * stconns. Returns 0 if nothing was done, non-zero if new data or a + * shutdown were collected. This may result on some delayed receive calls + * to be programmed and performed later, though it doesn't provide any + * such guarantee. + */ +int sc_applet_sync_recv(struct stconn *sc) +{ + if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST)) + return 0; + + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return 0; + + if (!sc_is_recv_allowed(sc)) + return 0; // already failed + + return sc_applet_recv(sc); +} + +/* + * This function is called to send buffer data to an applet. It calls the + * applet's snd_buf function. Please do not statify this function, it's often + * present in backtraces, it's useful to recognize it. + */ +int sc_applet_send(struct stconn *sc) +{ + struct appctx *appctx = __sc_appctx(sc); + struct stconn *sco = sc_opposite(sc); + struct channel *oc = sc_oc(sc); + size_t ret; + int did_send = 0; + + if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { + BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); + return 1; + } + + if (sc_ep_test(sc, SE_FL_WONT_CONSUME)) + return 0; + + /* we might have been called just after an asynchronous shutw */ + if (sc->flags & SC_FL_SHUT_DONE) + return 1; + + /* We must wait because the applet is not fully initialized */ + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return 0; + + if (co_data(oc)) { + ret = appctx->applet->snd_buf(sc, &oc->buf, co_data(oc), 0); + if (ret > 0) { + did_send = 1; + c_rew(oc, ret); + c_realign_if_empty(oc); + + if (!co_data(oc)) { + /* Always clear both flags once everything has been sent, they're one-shot */ + sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE); + } + /* if some data remain in the buffer, it's only because the + * system buffers are full, we will try next time. + */ + } + } + + if (did_send) + oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA; + + if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed))) + sc_have_room(sco); + + if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { + oc->flags |= CF_WRITE_EVENT; + BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); + if (sc_ep_test(sc, SE_FL_ERROR)) + sc->flags |= SC_FL_ERROR; + return 1; + } + + if (!co_data(oc)) { + if (did_send) + sc_ep_report_send_activity(sc); + } + else { + sc_ep_report_blocked_send(sc, did_send); + } + + return did_send; +} + +void sc_applet_sync_send(struct stconn *sc) +{ + struct channel *oc = sc_oc(sc); + + oc->flags &= ~CF_WRITE_EVENT; + + if (sc->flags & SC_FL_SHUT_DONE) + return; + + if (!co_data(oc)) + return; + + if (!sc_state_in(sc->state, SC_SB_EST)) + return; + + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return; + + sc_applet_send(sc); +} + /* Callback to be used by applet handlers upon completion. It updates the stream * (which may or may not take this opportunity to try to forward data), then * may re-enable the applet's based on the channels and stream connector's final