mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-04-03 10:01:03 +02:00
MEDIUM: mux-quic: implement QMux send
This patchs implement mux-quic reception for the new QMux protocol. This is performed via the new function qcc_qstrm_send_frames(). Its interface is similar to the QUIC equivalent : it takes a list of frames and encodes them in a buffer before sending it via snd_buf. Contrary to QUIC, a check on CO_FL_ERROR flag is performed prior to every qcc_qstrm_send_frames() invokation to interrupt emission. This is necessary as the transport layer may set it during snd_buf. This is not the case currently for quic_conn layer, but maybe a similar mechanism should be implemented as well for QUIC in the future.
This commit is contained in:
parent
0f0574ee96
commit
e8d9eb4f7a
@ -5,4 +5,6 @@
|
||||
|
||||
int qcc_qstrm_recv(struct qcc *qcc);
|
||||
|
||||
int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms);
|
||||
|
||||
#endif /* _HAPROXY_MUX_QUIC_QSTRM_H */
|
||||
|
||||
@ -2567,24 +2567,13 @@ static int qcc_subscribe_send(struct qcc *qcc)
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
|
||||
* connection <qcc>.
|
||||
*
|
||||
* Returns 0 if all data sent with success. On fatal error, a negative error
|
||||
* code is returned. A positive 1 is used if emission should be paced.
|
||||
*/
|
||||
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
|
||||
static int qcc_quic_send_frames(struct qcc *qcc, struct list *frms, int stream)
|
||||
{
|
||||
enum quic_tx_err ret;
|
||||
struct quic_pacer *pacer = NULL;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
|
||||
|
||||
if (LIST_ISEMPTY(frms)) {
|
||||
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (stream && qcc_is_pacing_active(qcc->conn))
|
||||
pacer = &qcc->tx.pacer;
|
||||
|
||||
@ -2612,6 +2601,23 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
|
||||
* connection <qcc>.
|
||||
*
|
||||
* Returns 0 if all data sent with success. On fatal error, a negative error
|
||||
* code is returned. A positive 1 is used if emission should be paced.
|
||||
*/
|
||||
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
|
||||
{
|
||||
if (LIST_ISEMPTY(frms)) {
|
||||
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return conn_is_quic(qcc->conn) ? qcc_quic_send_frames(qcc, frms, stream) :
|
||||
qcc_qstrm_send_frames(qcc, frms);
|
||||
}
|
||||
|
||||
/* Emit a RESET_STREAM on <qcs>.
|
||||
*
|
||||
* Returns 0 if the frame has been successfully sent else non-zero.
|
||||
@ -3072,6 +3078,12 @@ static int qcc_io_send(struct qcc *qcc)
|
||||
* flow-control limit reached.
|
||||
*/
|
||||
while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
|
||||
/* TODO should this check also be performed for QUIC ? */
|
||||
if (!conn_is_quic(qcc->conn) && (qcc->conn->flags & CO_FL_ERROR)) {
|
||||
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
|
||||
goto out;
|
||||
}
|
||||
|
||||
window_conn = qfctl_rcap(&qcc->tx.fc);
|
||||
resent = 0;
|
||||
|
||||
@ -3083,7 +3095,8 @@ static int qcc_io_send(struct qcc *qcc)
|
||||
* new qc_stream_desc should be present in send_list as
|
||||
* long as transport layer can handle all data.
|
||||
*/
|
||||
BUG_ON(qcs->tx.stream->buf && !qfctl_rblocked(&qcs->tx.fc));
|
||||
BUG_ON((!conn_is_quic(qcc->conn) || qcs->tx.stream->buf) &&
|
||||
!qfctl_rblocked(&qcs->tx.fc));
|
||||
|
||||
/* Total sent bytes must not exceed connection window. */
|
||||
BUG_ON(resent > window_conn);
|
||||
|
||||
@ -147,3 +147,88 @@ int qcc_qstrm_recv(struct qcc *qcc)
|
||||
err:
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Sends <frms> list of frames for <qcc> connection.
|
||||
*
|
||||
* Returns 0 if all data are emitted or a positive value if sending should be
|
||||
* retry later. A negative error code is used for a fatal failure.
|
||||
*/
|
||||
int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms)
|
||||
{
|
||||
struct connection *conn = qcc->conn;
|
||||
struct quic_frame *frm, *frm_old;
|
||||
struct quic_frame *split_frm, *orig_frm;
|
||||
unsigned char *pos, *old, *end;
|
||||
size_t ret;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
|
||||
list_for_each_entry_safe(frm, frm_old, frms, list) {
|
||||
loop:
|
||||
split_frm = NULL;
|
||||
b_reset(&trash);
|
||||
old = pos = (unsigned char *)b_orig(&trash);
|
||||
end = (unsigned char *)b_wrap(&trash);
|
||||
|
||||
BUG_ON(!frm);
|
||||
TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0,
|
||||
"frm type %02llx", (ullong)frm->type);
|
||||
|
||||
if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) {
|
||||
size_t flen, split_size;
|
||||
|
||||
flen = quic_strm_frm_fillbuf(end - pos, frm, &split_size);
|
||||
if (!flen)
|
||||
continue;
|
||||
|
||||
if (split_size) {
|
||||
split_frm = quic_strm_frm_split(frm, split_size);
|
||||
if (!split_frm) {
|
||||
ABORT_NOW();
|
||||
continue;
|
||||
}
|
||||
|
||||
orig_frm = frm;
|
||||
frm = split_frm;
|
||||
}
|
||||
}
|
||||
|
||||
qc_build_frm(frm, &pos, end, NULL);
|
||||
BUG_ON(pos - old > global.tune.bufsize);
|
||||
BUG_ON(pos == old);
|
||||
b_add(&trash, pos - old);
|
||||
|
||||
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0);
|
||||
if (!ret) {
|
||||
TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn);
|
||||
if (split_frm)
|
||||
LIST_INSERT(frms, &split_frm->list);
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret != b_data(&trash)) {
|
||||
/* TODO */
|
||||
ABORT_NOW();
|
||||
}
|
||||
|
||||
if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F)
|
||||
/* TODO notify MUX */
|
||||
|
||||
LIST_DEL_INIT(&frm->list);
|
||||
if (split_frm) {
|
||||
frm = orig_frm;
|
||||
goto loop;
|
||||
}
|
||||
}
|
||||
|
||||
if (conn->flags & CO_FL_ERROR) {
|
||||
/* TODO */
|
||||
//ABORT_NOW();
|
||||
}
|
||||
else if (!LIST_ISEMPTY(frms) && !(qcc->wait_event.events & SUB_RETRY_SEND)) {
|
||||
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event);
|
||||
return 1;
|
||||
}
|
||||
|
||||
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user