mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-04-04 02:21:53 +02:00
MEDIUM: haterm: Add support for 0-copy data forwading and option to disable it
The support for the zero-copy data forwarding was added and enabled by default. The command line option '-dZ' was also added to disable the feature. Concretely, when haterm pushes the response payload, if the zero-copy forwarding is supported, a dedicated function is used to do so. hstream_ff_snd() will rely on se_nego_ff() to know how many data can send and at the end, on se_done_ff() to really send data. hstream_add_ff_data() function was added to perform the raw copy of the payload in the sedesc I/O buffer.
This commit is contained in:
parent
2a1afcf39d
commit
ecf36f2ca8
117
src/haterm.c
117
src/haterm.c
@ -5,6 +5,7 @@
|
||||
#include <haproxy/hstream-t.h>
|
||||
#include <haproxy/http_htx.h>
|
||||
#include <haproxy/http.h>
|
||||
#include <haproxy/istbuf.h>
|
||||
#include <haproxy/pool.h>
|
||||
#include <haproxy/proxy-t.h>
|
||||
#include <haproxy/sc_strm.h>
|
||||
@ -61,6 +62,7 @@ static char common_chunk_resp[RESPSIZE];
|
||||
static char *random_resp;
|
||||
static int random_resp_len = RESPSIZE;
|
||||
|
||||
static size_t hstream_add_ff_data(struct hstream *hs, struct sedesc *sd, unsigned long long len);
|
||||
static size_t hstream_add_htx_data(struct hstream *hs, struct htx *htx, unsigned long long len);
|
||||
|
||||
#define TRACE_SOURCE &trace_haterm
|
||||
@ -298,6 +300,55 @@ static int hstream_htx_buf_rcv(struct connection *conn, struct hstream *hs)
|
||||
goto leave;
|
||||
}
|
||||
|
||||
static int hstream_ff_snd(struct connection *conn, struct hstream *hs)
|
||||
{
|
||||
size_t len;
|
||||
unsigned int nego_flags = NEGO_FF_FL_NONE;
|
||||
struct sedesc *sd = hs->sc->sedesc;
|
||||
int ret = 0;
|
||||
|
||||
/* First try to resume FF*/
|
||||
if (se_have_ff_data(sd)) {
|
||||
ret = CALL_MUX_WITH_RET(conn->mux, resume_fastfwd(hs->sc, 0));
|
||||
if (ret > 0)
|
||||
sd->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
|
||||
}
|
||||
|
||||
nego_flags |= NEGO_FF_FL_EXACT_SIZE;
|
||||
len = se_nego_ff(sd, &BUF_NULL, hs->to_write, nego_flags);
|
||||
if (sd->iobuf.flags & IOBUF_FL_NO_FF) {
|
||||
TRACE_DEVEL("Fast-forwarding not supported by endpoint, disable it", HS_EV_HSTRM_RESP, hs);
|
||||
goto abort;
|
||||
}
|
||||
if (sd->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
|
||||
TRACE_DEVEL("Fast-forwarding blocked", HS_EV_HSTRM_RESP, hs);
|
||||
goto out;
|
||||
}
|
||||
|
||||
hs->to_write -= hstream_add_ff_data(hs, sd, len);
|
||||
if (!hs->to_write)
|
||||
sd->iobuf.flags |= IOBUF_FL_EOI;
|
||||
|
||||
if (se_done_ff(sd) != 0 || !(sd->iobuf.flags & (IOBUF_FL_FF_BLOCKED|IOBUF_FL_FF_WANT_ROOM))) {
|
||||
/* Something was forwarding or the consumer states it is not
|
||||
* blocked anyore, don't reclaim more room */
|
||||
}
|
||||
|
||||
if (se_have_ff_data(sd)) {
|
||||
TRACE_DEVEL("data not fully sent, wait", HS_EV_HSTRM_SEND, hs);
|
||||
conn->mux->subscribe(hs->sc, SUB_RETRY_SEND, &hs->sc->wait_event);
|
||||
}
|
||||
else if (hs->to_write) {
|
||||
TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
|
||||
task_wakeup(hs->task, TASK_WOKEN_IO);
|
||||
}
|
||||
out:
|
||||
return ret;
|
||||
|
||||
abort:
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Send HTX data prepared for <hs> haterm stream from <conn> connection */
|
||||
static int hstream_htx_buf_snd(struct connection *conn, struct hstream *hs)
|
||||
{
|
||||
@ -449,6 +500,44 @@ err:
|
||||
goto leave;
|
||||
}
|
||||
|
||||
static size_t hstream_add_ff_data(struct hstream *hs, struct sedesc *sd, unsigned long long len)
|
||||
{
|
||||
size_t ret;
|
||||
char *data_ptr;
|
||||
unsigned int offset;
|
||||
char *buffer;
|
||||
size_t buffer_len;
|
||||
int modulo;
|
||||
|
||||
TRACE_ENTER(HS_EV_HSTRM_ADD_DATA, hs);
|
||||
b_add(sd->iobuf.buf, sd->iobuf.offset);
|
||||
|
||||
if (hs->req_random) {
|
||||
buffer = random_resp;
|
||||
buffer_len = random_resp_len;
|
||||
modulo = random_resp_len;
|
||||
}
|
||||
else {
|
||||
buffer = common_response;
|
||||
buffer_len = sizeof(common_response);
|
||||
modulo = HS_COMMON_RESPONSE_LINE_SZ;
|
||||
}
|
||||
|
||||
offset = (hs->req_size - len) % modulo;
|
||||
data_ptr = buffer + offset;
|
||||
if (len > (unsigned long long)(buffer_len - offset))
|
||||
len = (unsigned long long)(buffer_len - offset);
|
||||
|
||||
ret = b_putist(sd->iobuf.buf, ist2(data_ptr, len));
|
||||
if (!ret)
|
||||
TRACE_STATE("unable to fast-forward payload", HS_EV_HSTRM_ADD_DATA, hs);
|
||||
|
||||
b_sub(sd->iobuf.buf, sd->iobuf.offset);
|
||||
sd->iobuf.data += ret;
|
||||
TRACE_LEAVE(HS_EV_HSTRM_ADD_DATA, hs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Add data to HTX response buffer from pre-built responses */
|
||||
static size_t hstream_add_htx_data(struct hstream *hs, struct htx *htx, unsigned long long len)
|
||||
{
|
||||
@ -758,6 +847,14 @@ static inline int hstream_must_drain(struct hstream *hs)
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline int hstream_is_fastfwd_supported(struct hstream *hs)
|
||||
{
|
||||
return (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) &&
|
||||
sc_ep_test(hs->sc, SE_FL_MAY_FASTFWD_CONS) &&
|
||||
!(hs->sc->sedesc->iobuf.flags & IOBUF_FL_NO_FF) &&
|
||||
!hs->req_chunked && hs->to_write);
|
||||
}
|
||||
|
||||
/* haterm stream processing task */
|
||||
static struct task *process_hstream(struct task *t, void *context, unsigned int state)
|
||||
{
|
||||
@ -854,6 +951,7 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
|
||||
else {
|
||||
struct buffer *buf;
|
||||
struct htx *htx;
|
||||
int ret = 0;
|
||||
|
||||
/* HTX RX part */
|
||||
if (hstream_must_drain(hs)) {
|
||||
@ -875,7 +973,19 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
|
||||
if (!hstream_sl_hdrs_htx_buf_snd(hs, conn))
|
||||
goto err;
|
||||
|
||||
/* HTX TX part */
|
||||
/* TX part */
|
||||
if (hstream_is_fastfwd_supported(hs)) {
|
||||
if (!htx_is_empty(htxbuf(&hs->res)))
|
||||
goto flush_res_buf;
|
||||
if (!hs->to_write && !se_have_ff_data(hs->sc->sedesc))
|
||||
goto out;
|
||||
|
||||
ret = hstream_ff_snd(conn, hs);
|
||||
if (ret >= 0)
|
||||
goto send_done;
|
||||
/* fallback to regular send */
|
||||
}
|
||||
|
||||
if (!hs->to_write && htx_is_empty(htxbuf(&hs->res)))
|
||||
goto out;
|
||||
|
||||
@ -891,8 +1001,11 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
|
||||
if (hs->to_write <= 0)
|
||||
htx->flags |= HTX_FL_EOM;
|
||||
htx_to_buf(htx, &hs->res);
|
||||
|
||||
flush_res_buf:
|
||||
hstream_htx_buf_snd(conn, hs);
|
||||
|
||||
send_done:
|
||||
if (hs->req_body && hs->req_after_res && !hs->to_write) {
|
||||
/* Response sending has just complete. The body will be drained upon
|
||||
* next wakeup.
|
||||
@ -904,7 +1017,7 @@ static struct task *process_hstream(struct task *t, void *context, unsigned int
|
||||
}
|
||||
|
||||
out:
|
||||
if (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res))) {
|
||||
if (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res)) && !se_have_ff_data(hs->sc->sedesc)) {
|
||||
TRACE_DEVEL("shutting down stream", HS_EV_HSTRM_SEND, hs);
|
||||
CALL_MUX_NO_RET(conn->mux, shut(hs->sc, SE_SHW_SILENT|SE_SHW_NORMAL, NULL));
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ static void haterm_usage(char *name)
|
||||
" -c <curves> : ECSDA curves (ex: \"P-256\", \"P-384\"...)\n"
|
||||
" -v : shows version\n"
|
||||
" -d : enable the traces for all http protocols\n"
|
||||
" -dZ : disable zero-copy forwarding\n"
|
||||
" --" QUIC_BIND_LONG_OPT " <opts> : append options to QUIC \"bind\" lines\n"
|
||||
" --" TCP_BIND_LONG_OPT " <opts> : append options to TCP \"bind\" lines\n"
|
||||
, name);
|
||||
@ -242,6 +243,9 @@ void haproxy_init_args(int argc, char **argv)
|
||||
else
|
||||
haterm_usage(progname);
|
||||
}
|
||||
else if (*opt == 'd' && *(opt+1) == 'Z') {
|
||||
global.tune.no_zero_copy_fwd |= NO_ZERO_COPY_FWD;
|
||||
}
|
||||
else if (*opt == 'd') {
|
||||
/* empty option */
|
||||
if (*(opt + 1))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user