mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-21 22:01:31 +02:00
MEDIUM: mux-quic: improve bidir STREAM frames sending
The current implementation of STREAM frames emission has some limitation. Most notably when we cannot sent all frames in a single qc_send run. In this case, frames are left in front of the MUX list. It will be re-send individually before other frames, possibly another frame from the same STREAM with new data. An opportunity to merge the frames is lost here. This method is now improved. If a frame cannot be send entirely, it is discarded. On the next qc_send run, we retry to send to this position. A new field qcs.sent_offset is used to remember this. A new frame list is used for each qc_send. The impact of this change is not precisely known. The most notable point is that it is a more logical method of emission. It might also improve performance as we do not keep old STREAM frames which might delay other streams.
This commit is contained in:
parent
54445d04e4
commit
6ccfa3c40f
@ -58,7 +58,6 @@ struct qcc {
|
|||||||
} rx;
|
} rx;
|
||||||
struct {
|
struct {
|
||||||
uint64_t max_data; /* Maximum number of bytes which may be sent */
|
uint64_t max_data; /* Maximum number of bytes which may be sent */
|
||||||
struct list frms; /* list of frames ready to be sent */
|
|
||||||
} tx;
|
} tx;
|
||||||
|
|
||||||
struct eb_root streams_by_id; /* all active streams by their ID */
|
struct eb_root streams_by_id; /* all active streams by their ID */
|
||||||
@ -92,7 +91,8 @@ struct qcs {
|
|||||||
struct buffer app_buf; /* receive buffer used by conn_stream layer */
|
struct buffer app_buf; /* receive buffer used by conn_stream layer */
|
||||||
} rx;
|
} rx;
|
||||||
struct {
|
struct {
|
||||||
uint64_t offset; /* the current offset of received data */
|
uint64_t offset; /* last offset of data ready to be sent */
|
||||||
|
uint64_t sent_offset; /* last offset sent by transport layer */
|
||||||
struct eb_root acked_frms; /* acked frames ordered by their offsets */
|
struct eb_root acked_frms; /* acked frames ordered by their offsets */
|
||||||
uint64_t ack_offset; /* last acked ordered byte offset */
|
uint64_t ack_offset; /* last acked ordered byte offset */
|
||||||
struct buffer buf; /* transmit buffer before sending via xprt */
|
struct buffer buf; /* transmit buffer before sending via xprt */
|
||||||
|
@ -41,6 +41,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
qcs->tx.buf = BUF_NULL;
|
qcs->tx.buf = BUF_NULL;
|
||||||
qcs->tx.xprt_buf = BUF_NULL;
|
qcs->tx.xprt_buf = BUF_NULL;
|
||||||
qcs->tx.offset = 0;
|
qcs->tx.offset = 0;
|
||||||
|
qcs->tx.sent_offset = 0;
|
||||||
qcs->tx.ack_offset = 0;
|
qcs->tx.ack_offset = 0;
|
||||||
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
||||||
|
|
||||||
@ -350,50 +351,72 @@ static void qc_release(struct qcc *qcc)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset,
|
static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
|
||||||
|
struct buffer *payload, int fin,
|
||||||
struct list *frm_list)
|
struct list *frm_list)
|
||||||
{
|
{
|
||||||
struct quic_frame *frm;
|
struct quic_frame *frm;
|
||||||
struct buffer *buf = &qcs->tx.xprt_buf;
|
int head, left, to_xfer;
|
||||||
int total = 0, to_xfer;
|
int total = 0;
|
||||||
unsigned char *btail;
|
|
||||||
|
|
||||||
fprintf(stderr, "%s\n", __func__);
|
fprintf(stderr, "%s\n", __func__);
|
||||||
|
|
||||||
qc_get_buf(qcs, buf);
|
qc_get_buf(qcs, out);
|
||||||
to_xfer = QUIC_MIN(b_data(payload), b_room(buf));
|
|
||||||
if (!to_xfer)
|
/*
|
||||||
|
* QCS out buffer diagram
|
||||||
|
* head left to_xfer
|
||||||
|
* -------------> ----------> ----->
|
||||||
|
* ==================================================
|
||||||
|
* |...............|xxxxxxxxxxx|<<<<<
|
||||||
|
* ==================================================
|
||||||
|
* ^ ack-off ^ sent-off ^ off
|
||||||
|
*
|
||||||
|
* STREAM frame
|
||||||
|
* ^ ^
|
||||||
|
* |xxxxxxxxxxxxxxxxx|
|
||||||
|
*/
|
||||||
|
|
||||||
|
BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset);
|
||||||
|
BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
|
||||||
|
|
||||||
|
head = qcs->tx.sent_offset - qcs->tx.ack_offset;
|
||||||
|
left = qcs->tx.offset - qcs->tx.sent_offset;
|
||||||
|
to_xfer = QUIC_MIN(b_data(payload), b_room(out));
|
||||||
|
if (!left && !to_xfer)
|
||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
frm = pool_zalloc(pool_head_quic_frame);
|
frm = pool_zalloc(pool_head_quic_frame);
|
||||||
if (!frm)
|
if (!frm)
|
||||||
goto err;
|
goto err;
|
||||||
|
|
||||||
/* store buffer end before transfering data for frm.stream.data */
|
total = b_force_xfer(out, payload, to_xfer);
|
||||||
btail = (unsigned char *)b_tail(buf);
|
|
||||||
total = b_force_xfer(buf, payload, to_xfer);
|
frm->type = QUIC_FT_STREAM_8;
|
||||||
|
frm->stream.qcs = (struct qcs *)qcs;
|
||||||
|
frm->stream.id = qcs->by_id.key;
|
||||||
|
frm->stream.buf = out;
|
||||||
|
frm->stream.data = (unsigned char *)b_peek(out, head);
|
||||||
|
|
||||||
/* FIN is positioned only when the buffer has been totally emptied. */
|
/* FIN is positioned only when the buffer has been totally emptied. */
|
||||||
fin = fin && !b_data(payload);
|
fin = fin && !b_data(payload);
|
||||||
frm->type = QUIC_FT_STREAM_8;
|
|
||||||
if (fin)
|
if (fin)
|
||||||
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
|
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
|
||||||
if (offset) {
|
|
||||||
|
if (qcs->tx.sent_offset) {
|
||||||
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
|
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
|
||||||
frm->stream.offset.key = offset;
|
frm->stream.offset.key = qcs->tx.sent_offset;
|
||||||
}
|
}
|
||||||
frm->stream.qcs = (struct qcs *)qcs;
|
|
||||||
frm->stream.buf = buf;
|
if (left + total) {
|
||||||
frm->stream.data = btail;
|
|
||||||
frm->stream.id = qcs->by_id.key;
|
|
||||||
if (total) {
|
|
||||||
frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
|
frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
|
||||||
frm->stream.len = total;
|
frm->stream.len = left + total;
|
||||||
}
|
}
|
||||||
|
|
||||||
LIST_APPEND(frm_list, &frm->list);
|
LIST_APPEND(frm_list, &frm->list);
|
||||||
out:
|
out:
|
||||||
fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n",
|
fprintf(stderr, "%s: sent=%lu total=%d fin=%d id=%llu offset=%lu\n",
|
||||||
__func__, total, fin, (ull)qcs->by_id.key, offset);
|
__func__, (long unsigned)b_data(out), total, fin, (ull)qcs->by_id.key, qcs->tx.sent_offset);
|
||||||
return total;
|
return total;
|
||||||
|
|
||||||
err:
|
err:
|
||||||
@ -406,11 +429,20 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
|
|||||||
*/
|
*/
|
||||||
void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
|
void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
|
||||||
{
|
{
|
||||||
|
uint64_t diff = data;
|
||||||
|
|
||||||
|
BUG_ON(offset > qcs->tx.sent_offset);
|
||||||
|
|
||||||
/* check if the STREAM frame has already been notified. It can happen
|
/* check if the STREAM frame has already been notified. It can happen
|
||||||
* for retransmission.
|
* for retransmission.
|
||||||
*/
|
*/
|
||||||
if (offset + data <= qcs->tx.sent_offset)
|
if (offset + data <= qcs->tx.sent_offset)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
diff = offset + data - qcs->tx.sent_offset;
|
||||||
|
|
||||||
|
/* increase offset on stream */
|
||||||
|
qcs->tx.sent_offset += diff;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
|
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
|
||||||
@ -480,6 +512,7 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms)
|
|||||||
|
|
||||||
static int qc_send(struct qcc *qcc)
|
static int qc_send(struct qcc *qcc)
|
||||||
{
|
{
|
||||||
|
struct list frms = LIST_HEAD_INIT(frms);
|
||||||
struct eb64_node *node;
|
struct eb64_node *node;
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
@ -492,6 +525,7 @@ static int qc_send(struct qcc *qcc)
|
|||||||
while (node) {
|
while (node) {
|
||||||
struct qcs *qcs = container_of(node, struct qcs, by_id);
|
struct qcs *qcs = container_of(node, struct qcs, by_id);
|
||||||
struct buffer *buf = &qcs->tx.buf;
|
struct buffer *buf = &qcs->tx.buf;
|
||||||
|
struct buffer *out = &qcs->tx.xprt_buf;
|
||||||
|
|
||||||
/* TODO
|
/* TODO
|
||||||
* for the moment, unidirectional streams have their own
|
* for the moment, unidirectional streams have their own
|
||||||
@ -503,10 +537,9 @@ static int qc_send(struct qcc *qcc)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (b_data(buf)) {
|
if (b_data(buf) || b_data(out)) {
|
||||||
char fin = qcs->flags & QC_SF_FIN_STREAM;
|
char fin = qcs->flags & QC_SF_FIN_STREAM;
|
||||||
ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset,
|
ret = qcs_push_frame(qcs, out, buf, fin, &frms);
|
||||||
&qcc->tx.frms);
|
|
||||||
BUG_ON(ret < 0); /* TODO handle this properly */
|
BUG_ON(ret < 0); /* TODO handle this properly */
|
||||||
|
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
@ -527,7 +560,7 @@ static int qc_send(struct qcc *qcc)
|
|||||||
node = eb64_next(node);
|
node = eb64_next(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
qc_send_frames(qcc, &qcc->tx.frms);
|
qc_send_frames(qcc, &frms);
|
||||||
/* TODO adjust ret if not all frames are sent. */
|
/* TODO adjust ret if not all frames are sent. */
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -672,7 +705,6 @@ static int qc_init(struct connection *conn, struct proxy *prx,
|
|||||||
|
|
||||||
qcc->rx.max_data = lparams->initial_max_data;
|
qcc->rx.max_data = lparams->initial_max_data;
|
||||||
qcc->tx.max_data = 0;
|
qcc->tx.max_data = 0;
|
||||||
LIST_INIT(&qcc->tx.frms);
|
|
||||||
|
|
||||||
/* Client initiated streams must respect the server flow control. */
|
/* Client initiated streams must respect the server flow control. */
|
||||||
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
|
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user