MINOR: stconn: Start to introduce mux-to-mux fast-forwarding notion

Instead of talking about kernel splicing at stconn/sedesc level, we now try
to talk about mux-to-mux fast-forwarding. To do so, 2 functions were added
to know if there are fast-forwarded data and to retrieve this amount of
data. Of course, for now, there is only data in a pipe.

In addition, some flags were renamed to reflect this notion. Note the
channel's documentation was not updated yet.
This commit is contained in:
Christopher Faulet 2023-08-03 09:45:09 +02:00
parent 8bee0dcd7d
commit e52519ac83
9 changed files with 71 additions and 57 deletions

View File

@ -409,7 +409,7 @@ static inline void channel_htx_forward_forever(struct channel *chn, struct htx *
*/
static inline unsigned int channel_is_empty(const struct channel *c)
{
return !(co_data(c) | (long)chn_cons(c)->sedesc->iobuf.pipe);
return (!co_data(c) && !sc_ep_have_ff_data(chn_cons(c)));
}
/* Returns non-zero if the channel is rewritable, which means that the buffer

View File

@ -45,7 +45,7 @@
#define H1C_F_SILENT_SHUT 0x00000800 /* if H1C is closed closed, silent (or dirty) shutdown must be performed */
#define H1C_F_ABRT_PENDING 0x00001000 /* An error must be sent (previous attempt failed) and H1 connection must be closed ASAP */
#define H1C_F_ABRTED 0x00002000 /* An error must be sent (previous attempt failed) and H1 connection must be closed ASAP */
#define H1C_F_WANT_SPLICE 0x00004000 /* Don't read into a buffer because we want to use or we are using splicing */
#define H1C_F_WANT_FASTFWD 0x00004000 /* Don't read into a buffer because we want to fast forward data */
#define H1C_F_WAIT_NEXT_REQ 0x00008000 /* waiting for the next request to start, use keep-alive timeout */
#define H1C_F_UPG_H2C 0x00010000 /* set if an upgrade to h2 should be done */
#define H1C_F_CO_MSG_MORE 0x00020000 /* set if CO_SFL_MSG_MORE must be set when calling xprt->snd_buf() */
@ -69,7 +69,7 @@ static forceinline char *h1c_show_flags(char *buf, size_t len, const char *delim
_(H1C_F_IN_ALLOC, _(H1C_F_IN_FULL, _(H1C_F_IN_SALLOC,
_(H1C_F_EOS, _(H1C_F_ERR_PENDING, _(H1C_F_ERROR,
_(H1C_F_SILENT_SHUT, _(H1C_F_ABRT_PENDING, _(H1C_F_ABRTED,
_(H1C_F_WANT_SPLICE, _(H1C_F_WAIT_NEXT_REQ, _(H1C_F_UPG_H2C, _(H1C_F_CO_MSG_MORE,
_(H1C_F_WANT_FASTFWD, _(H1C_F_WAIT_NEXT_REQ, _(H1C_F_UPG_H2C, _(H1C_F_CO_MSG_MORE,
_(H1C_F_CO_STREAMER, _(H1C_F_IS_BACK)))))))))))))))));
/* epilogue */
_(~0U);

View File

@ -75,7 +75,7 @@ enum se_flags {
SE_FL_ERROR = 0x00010000, /* a fatal error was reported */
/* Transient flags */
SE_FL_ERR_PENDING= 0x00020000, /* An error is pending, but there's still data to be read */
SE_FL_MAY_SPLICE = 0x00040000, /* The endpoint may use the kernel splicing to forward data to the other side (implies SE_FL_CAN_SPLICE) */
SE_FL_MAY_FASTFWD= 0x00040000, /* The endpoint may fast-forward data to the other side */
SE_FL_RCV_MORE = 0x00080000, /* Endpoint may have more bytes to transfer */
SE_FL_WANT_ROOM = 0x00100000, /* More bytes to transfer, but not enough room */
SE_FL_EXP_NO_DATA= 0x00200000, /* No data expected by the endpoint */
@ -108,7 +108,7 @@ static forceinline char *se_show_flags(char *buf, size_t len, const char *delim,
_(SE_FL_T_MUX, _(SE_FL_T_APPLET, _(SE_FL_DETACHED, _(SE_FL_ORPHAN,
_(SE_FL_SHRD, _(SE_FL_SHRR, _(SE_FL_SHWN, _(SE_FL_SHWS,
_(SE_FL_NOT_FIRST, _(SE_FL_WEBSOCKET, _(SE_FL_EOI, _(SE_FL_EOS,
_(SE_FL_ERROR, _(SE_FL_ERR_PENDING, _(SE_FL_MAY_SPLICE,
_(SE_FL_ERROR, _(SE_FL_ERR_PENDING, _(SE_FL_MAY_FASTFWD,
_(SE_FL_RCV_MORE, _(SE_FL_WANT_ROOM, _(SE_FL_EXP_NO_DATA,
_(SE_FL_WAIT_FOR_HS, _(SE_FL_KILL_CONN, _(SE_FL_WAIT_DATA,
_(SE_FL_WONT_CONSUME, _(SE_FL_HAVE_NO_DATA, _(SE_FL_APPLET_NEED_CONN))))))))))))))))))))))));

View File

@ -122,6 +122,17 @@ static inline void se_expect_data(struct sedesc *se)
se_fl_clr(se, SE_FL_EXP_NO_DATA);
}
static inline unsigned int se_have_ff_data(struct sedesc *se)
{
return ((long)se->iobuf.pipe);
}
static inline size_t se_ff_data(struct sedesc *se)
{
return ((se->iobuf.pipe ? se->iobuf.pipe->data : 0));
}
/* stream connector version */
static forceinline void sc_ep_zero(struct stconn *sc)
{
@ -190,6 +201,16 @@ static forceinline void sc_ep_report_send_activity(struct stconn *sc)
sc_ep_report_read_activity(sc);
}
static forceinline unsigned int sc_ep_have_ff_data(struct stconn *sc)
{
return se_have_ff_data(sc->sedesc);
}
static forceinline size_t sc_ep_ff_data(struct stconn *sc)
{
return se_ff_data(sc->sedesc);
}
static forceinline int sc_ep_rcv_ex(const struct stconn *sc)
{
return (tick_isset(sc->sedesc->lra)

View File

@ -383,7 +383,7 @@ int appctx_buf_available(void *arg)
sc_have_buff(sc);
/* was already allocated another way ? if so, don't take this one */
if (c_size(sc_ic(sc)) || sc_opposite(sc)->sedesc->iobuf.pipe)
if (c_size(sc_ic(sc)) || sc_ep_have_ff_data(sc_opposite(sc)))
return 0;
/* allocation possible now ? */

View File

@ -855,7 +855,7 @@ static void h1s_destroy(struct h1s *h1s)
h1_release_buf(h1c, &h1s->rxbuf);
h1c->flags &= ~(H1C_F_WANT_SPLICE|
h1c->flags &= ~(H1C_F_WANT_FASTFWD|
H1C_F_OUT_FULL|H1C_F_OUT_ALLOC|H1C_F_IN_SALLOC|
H1C_F_CO_MSG_MORE|H1C_F_CO_STREAMER);
@ -1908,13 +1908,13 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count
/* Here h1s_sc(h1s) is always defined */
if ((!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
(h1m->state == H1_MSG_DATA || h1m->state == H1_MSG_TUNNEL)) {
TRACE_STATE("notify the mux can use splicing", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
se_fl_set(h1s->sd, SE_FL_MAY_SPLICE);
TRACE_STATE("notify the mux can use fast-forward", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
se_fl_set(h1s->sd, SE_FL_MAY_FASTFWD);
}
else {
TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
se_fl_clr(h1s->sd, SE_FL_MAY_SPLICE);
h1c->flags &= ~H1C_F_WANT_SPLICE;
TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
h1c->flags &= ~H1C_F_WANT_FASTFWD;
}
/* Set EOI on stream connector in DONE state iff:
@ -2576,8 +2576,8 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf,
/* The message is chunked. We need to check if we must
* emit the chunk size, the CRLF marking the end of the
* current chunk and eventually the CRLF marking the end
* of the previous chunk (because of the kernel
* splicing). If it is the end of the message, we must
* of the previous chunk (because of fast-forwarding).
* If it is the end of the message, we must
* also emit the last chunk.
*
* We have at least the size of the struct htx to write
@ -3320,8 +3320,8 @@ static int h1_recv(struct h1c *h1c)
return (b_data(&h1c->ibuf));
}
if ((h1c->flags & H1C_F_WANT_SPLICE) || !h1_recv_allowed(h1c)) {
TRACE_DEVEL("leaving on (want_splice|!recv_allowed)", H1_EV_H1C_RECV, h1c->conn);
if ((h1c->flags & H1C_F_WANT_FASTFWD) || !h1_recv_allowed(h1c)) {
TRACE_DEVEL("leaving on (want_fastfwde|!recv_allowed)", H1_EV_H1C_RECV, h1c->conn);
return 1;
}
@ -3655,8 +3655,8 @@ static int h1_process(struct h1c * h1c)
}
}
if (h1c->state == H1_CS_RUNNING && (h1c->flags & H1C_F_WANT_SPLICE) && !h1s_data_pending(h1c->h1s)) {
TRACE_DEVEL("xprt rcv_buf blocked (want_splice), notify h1s for recv", H1_EV_H1C_RECV, h1c->conn);
if (h1c->state == H1_CS_RUNNING && (h1c->flags & H1C_F_WANT_FASTFWD) && !h1s_data_pending(h1c->h1s)) {
TRACE_DEVEL("xprt rcv_buf blocked (want_fastfwd), notify h1s for recv", H1_EV_H1C_RECV, h1c->conn);
h1_wake_stream_for_recv(h1c->h1s);
}
@ -4182,8 +4182,8 @@ static int h1_subscribe(struct stconn *sc, int event_type, struct wait_event *es
* The caller is responsible for defragmenting <buf> if necessary. But <flags>
* must be tested to know the calling context. If CO_RFL_BUF_FLUSH is set, it
* means the caller wants to flush input data (from the mux buffer and the
* channel buffer) to be able to use kernel splicing or any kind of mux-to-mux
* xfer. If CO_RFL_KEEP_RECV is set, the mux must always subscribe for read
* channel buffer) to be able to use fast-forwarding.
* If CO_RFL_KEEP_RECV is set, the mux must always subscribe for read
* events before giving back. CO_RFL_BUF_WET is set if <buf> is congested with
* data scheduled for leaving soon. CO_RFL_BUF_NOT_STUCK is set to instruct the
* mux it may optimize the data copy to <buf> if necessary. Otherwise, it should
@ -4209,9 +4209,9 @@ static size_t h1_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, in
else
TRACE_DEVEL("h1c ibuf not allocated", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
if ((flags & CO_RFL_BUF_FLUSH) && se_fl_test(h1s->sd, SE_FL_MAY_SPLICE)) {
h1c->flags |= H1C_F_WANT_SPLICE;
TRACE_STATE("Block xprt rcv_buf to flush stream's buffer (want_splice)", H1_EV_STRM_RECV, h1c->conn, h1s);
if ((flags & CO_RFL_BUF_FLUSH) && se_fl_test(h1s->sd, SE_FL_MAY_FASTFWD)) {
h1c->flags |= H1C_F_WANT_FASTFWD;
TRACE_STATE("Block xprt rcv_buf to flush stream's buffer (want_fastfwd)", H1_EV_STRM_RECV, h1c->conn, h1s);
}
else {
if (((flags & CO_RFL_KEEP_RECV) || (h1m->state != H1_MSG_DONE)) && !(h1c->wait_event.events & SUB_RETRY_RECV))
@ -4307,12 +4307,12 @@ static int h1_rcv_pipe(struct stconn *sc, struct pipe *pipe, unsigned int count)
TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count});
if (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL) {
h1c->flags &= ~H1C_F_WANT_SPLICE;
h1c->flags &= ~H1C_F_WANT_FASTFWD;
TRACE_STATE("Allow xprt rcv_buf on !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end;
}
h1c->flags |= H1C_F_WANT_SPLICE;
h1c->flags |= H1C_F_WANT_FASTFWD;
if (h1s_data_pending(h1s)) {
TRACE_STATE("flush input buffer before splicing", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end;
@ -4341,7 +4341,7 @@ static int h1_rcv_pipe(struct stconn *sc, struct pipe *pipe, unsigned int count)
h1m->state = H1_MSG_DONE;
else
h1m->state = H1_MSG_CHUNK_CRLF;
h1c->flags &= ~H1C_F_WANT_SPLICE;
h1c->flags &= ~H1C_F_WANT_FASTFWD;
if (!(h1c->flags & H1C_F_IS_BACK)) {
/* The request was fully received. It means the H1S now
@ -4369,22 +4369,22 @@ static int h1_rcv_pipe(struct stconn *sc, struct pipe *pipe, unsigned int count)
}
else {
se_fl_set(h1s->sd, SE_FL_ERROR);
h1c->flags = (h1c->flags & ~H1C_F_WANT_SPLICE) | H1C_F_ERROR;
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
TRACE_ERROR("message aborted, set error on SC", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s);
}
h1c->flags = (h1c->flags & ~H1C_F_WANT_SPLICE) | H1C_F_EOS;
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_EOS;
TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s);
}
end:
if (h1c->conn->flags & CO_FL_ERROR) {
se_fl_set(h1s->sd, SE_FL_ERROR);
h1c->flags = (h1c->flags & ~H1C_F_WANT_SPLICE) | H1C_F_ERROR;
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
}
if (!(h1c->flags & H1C_F_WANT_SPLICE)) {
TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_STRM_RECV, h1c->conn, h1s);
se_fl_clr(h1s->sd, SE_FL_MAY_SPLICE);
if (!(h1c->flags & H1C_F_WANT_FASTFWD)) {
TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_STRM_RECV, h1c->conn, h1s);
se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, h1c->conn, h1s);
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
@ -4435,7 +4435,7 @@ static int h1_snd_pipe(struct stconn *sc, struct pipe *pipe)
end:
if (h1c->conn->flags & CO_FL_ERROR) {
h1c->flags = (h1c->flags & ~H1C_F_WANT_SPLICE) | H1C_F_ERR_PENDING;
h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING;
if (h1c->flags & H1C_F_EOS)
h1c->flags |= H1C_F_ERROR;
else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {

View File

@ -323,7 +323,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
conn->ctx = ctx;
se_fl_set(ctx->sd, SE_FL_RCV_MORE);
if (global.tune.options & GTUNE_USE_SPLICE)
se_fl_set(ctx->sd, SE_FL_MAY_SPLICE);
se_fl_set(ctx->sd, SE_FL_MAY_FASTFWD);
TRACE_LEAVE(PT_EV_CONN_NEW, conn);
return 0;

View File

@ -629,7 +629,7 @@ static void sc_app_shut(struct stconn *sc)
/* default chk_rcv function for scheduled tasks */
static void sc_app_chk_rcv(struct stconn *sc)
{
if (sc_opposite(sc)->sedesc->iobuf.pipe) {
if (sc_ep_have_ff_data(sc_opposite(sc))) {
/* stop reading */
sc_need_room(sc, -1);
}
@ -800,7 +800,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc)
if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
return;
if (!sc->sedesc->iobuf.pipe && /* spliced data wants to be forwarded ASAP */
if (!sc_ep_have_ff_data(sc) && /* data wants to be fast-forwarded ASAP */
!sc_ep_test(sc, SE_FL_WAIT_DATA)) /* not waiting for data */
return;
@ -946,7 +946,7 @@ static void sc_app_chk_rcv_applet(struct stconn *sc)
{
BUG_ON(!sc_appctx(sc));
if (!sc_opposite(sc)->sedesc->iobuf.pipe) {
if (!sc_ep_have_ff_data(sc_opposite(sc))) {
/* (re)start reading */
appctx_wakeup(__sc_appctx(sc));
}
@ -1090,18 +1090,12 @@ static void sc_notify(struct stconn *sc)
*/
if (!channel_is_empty(ic) &&
sc_ep_test(sco, SE_FL_WAIT_DATA) &&
(!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sco->sedesc->iobuf.pipe)) {
(!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sc_ep_have_ff_data(sco))) {
int new_len, last_len;
last_len = co_data(ic);
if (sco->sedesc->iobuf.pipe)
last_len += sco->sedesc->iobuf.pipe->data;
last_len = co_data(ic) + sc_ep_ff_data(sco);
sc_chk_snd(sco);
new_len = co_data(ic);
if (sco->sedesc->iobuf.pipe)
new_len += sco->sedesc->iobuf.pipe->data;
new_len = co_data(ic) + sc_ep_ff_data(sco);
/* check if the consumer has freed some space either in the
* buffer or in the pipe.
@ -1265,8 +1259,8 @@ static int sc_conn_recv(struct stconn *sc)
/* First, let's see if we may splice data across the channel without
* using a buffer.
*/
if (sc_ep_test(sc, SE_FL_MAY_SPLICE) &&
(sc_opposite(sc)->sedesc->iobuf.pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) &&
(sc_ep_have_ff_data(sc_opposite(sc)) || ic->to_forward >= MIN_SPLICE_FORWARD) &&
ic->flags & CF_KERN_SPLICING) {
if (c_data(ic)) {
/* We're embarrassed, there are already data pending in
@ -1315,13 +1309,13 @@ static int sc_conn_recv(struct stconn *sc)
}
abort_splice:
if (sc_opposite(sc)->sedesc->iobuf.pipe && unlikely(!sc_opposite(sc)->sedesc->iobuf.pipe->data)) {
if (sc_ep_have_ff_data(sc_opposite(sc)) && unlikely(!sc_ep_ff_data(sc_opposite(sc)))) {
put_pipe(sc_opposite(sc)->sedesc->iobuf.pipe);
sc_opposite(sc)->sedesc->iobuf.pipe = NULL;
}
if (sc_opposite(sc)->sedesc->iobuf.pipe && ic->to_forward &&
!(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) {
if (sc_ep_have_ff_data(sc_opposite(sc)) && ic->to_forward &&
!(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_FASTFWD)) {
/* don't break splicing by reading, but still call rcv_buf()
* to pass the flag.
*/
@ -1601,17 +1595,16 @@ static int sc_conn_send(struct stconn *sc)
if (!conn->mux)
return 0;
if (sc->sedesc->iobuf.pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
if (sc_ep_have_ff_data(sc) && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
ret = conn->mux->snd_pipe(sc, sc->sedesc->iobuf.pipe);
if (ret > 0)
did_send = 1;
if (!sc->sedesc->iobuf.pipe->data) {
if (!sc_ep_ff_data(sc)) {
put_pipe(sc->sedesc->iobuf.pipe);
sc->sedesc->iobuf.pipe = NULL;
}
if (sc->sedesc->iobuf.pipe)
else
goto end;
}

View File

@ -320,10 +320,10 @@ int stream_buf_available(void *arg)
{
struct stream *s = arg;
if (!s->req.buf.size && !s->scb->sedesc->iobuf.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
if (!s->req.buf.size && !sc_ep_have_ff_data(s->scb) && s->scf->flags & SC_FL_NEED_BUFF &&
b_alloc(&s->req.buf))
sc_have_buff(s->scf);
else if (!s->res.buf.size && !s->scf->sedesc->iobuf.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
else if (!s->res.buf.size && !sc_ep_have_ff_data(s->scf) && s->scb->flags & SC_FL_NEED_BUFF &&
b_alloc(&s->res.buf))
sc_have_buff(s->scb);
else