mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-09 16:47:18 +02:00
MINOR: mux-quic: implement MAX_STREAM_DATA emission
Send MAX_STREAM_DATA frames when at least half of the allocated flow-control has been demuxed, frame and cleared. This is necessary to support QUIC STREAM with received data greater than a buffer. Transcoders must use the new function qcc_consume_qcs() to empty the QCS buffer. This will allow to monitor current flow-control level and generate a MAX_STREAM_DATA frame if required. This frame will be emitted via qc_io_cb().
This commit is contained in:
parent
c985cb167d
commit
a977355aa1
@ -105,7 +105,8 @@ struct qcs {
|
|||||||
uint64_t offset; /* absolute current base offset of ncbuf */
|
uint64_t offset; /* absolute current base offset of ncbuf */
|
||||||
struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
|
struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
|
||||||
struct buffer app_buf; /* receive buffer used by conn_stream layer */
|
struct buffer app_buf; /* receive buffer used by conn_stream layer */
|
||||||
uint64_t msd; /* fctl bytes limit to enforce */
|
uint64_t msd; /* current max-stream-data limit to enforce */
|
||||||
|
uint64_t msd_init; /* initial max-stream-data */
|
||||||
} rx;
|
} rx;
|
||||||
struct {
|
struct {
|
||||||
uint64_t offset; /* last offset of data ready to be sent */
|
uint64_t offset; /* last offset of data ready to be sent */
|
||||||
|
@ -23,6 +23,7 @@ struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf);
|
|||||||
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
|
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
|
||||||
void qcs_notify_recv(struct qcs *qcs);
|
void qcs_notify_recv(struct qcs *qcs);
|
||||||
void qcs_notify_send(struct qcs *qcs);
|
void qcs_notify_send(struct qcs *qcs);
|
||||||
|
void qcs_consume(struct qcs *qcs, uint64_t bytes);
|
||||||
|
|
||||||
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
||||||
char fin, char *data);
|
char fin, char *data);
|
||||||
|
15
src/h3.c
15
src/h3.c
@ -288,10 +288,9 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
|||||||
h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
|
h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
|
||||||
__func__, ftype, flen);
|
__func__, ftype, flen);
|
||||||
|
|
||||||
ncb_advance(rxbuf, hlen);
|
|
||||||
h3s->demux_frame_type = ftype;
|
h3s->demux_frame_type = ftype;
|
||||||
h3s->demux_frame_len = flen;
|
h3s->demux_frame_len = flen;
|
||||||
qcs->rx.offset += hlen;
|
qcs_consume(qcs, hlen);
|
||||||
}
|
}
|
||||||
|
|
||||||
flen = h3s->demux_frame_len;
|
flen = h3s->demux_frame_len;
|
||||||
@ -327,10 +326,9 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
ncb_advance(rxbuf, ret);
|
|
||||||
BUG_ON(h3s->demux_frame_len < ret);
|
BUG_ON(h3s->demux_frame_len < ret);
|
||||||
h3s->demux_frame_len -= ret;
|
h3s->demux_frame_len -= ret;
|
||||||
qcs->rx.offset += ret;
|
qcs_consume(qcs, ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -410,8 +408,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
|
|||||||
if (flen > b_data(&b))
|
if (flen > b_data(&b))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
ncb_advance(rxbuf, hlen);
|
qcs_consume(h3_uqs->qcs, hlen);
|
||||||
h3_uqs->qcs->rx.offset += hlen;
|
|
||||||
/* From here, a frame must not be truncated */
|
/* From here, a frame must not be truncated */
|
||||||
switch (ftype) {
|
switch (ftype) {
|
||||||
case H3_FT_CANCEL_PUSH:
|
case H3_FT_CANCEL_PUSH:
|
||||||
@ -435,8 +432,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
|
|||||||
h3->err = H3_FRAME_UNEXPECTED;
|
h3->err = H3_FRAME_UNEXPECTED;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
ncb_advance(rxbuf, flen);
|
qcs_consume(h3_uqs->qcs, flen);
|
||||||
h3_uqs->qcs->rx.offset += flen;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handle the case where remaining data are present in the buffer. This
|
/* Handle the case where remaining data are present in the buffer. This
|
||||||
@ -796,8 +792,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
|
|||||||
if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
|
if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
ncb_advance(rxbuf, len);
|
qcs_consume(qcs, len);
|
||||||
qcs->rx.offset += len;
|
|
||||||
|
|
||||||
/* Note that for all the uni-streams below, this is an error to receive two times the
|
/* Note that for all the uni-streams below, this is an error to receive two times the
|
||||||
* same type of uni-stream (even for Push stream which is not supported at this time.
|
* same type of uni-stream (even for Push stream which is not supported at this time.
|
||||||
|
@ -76,9 +76,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
|||||||
if (!cs)
|
if (!cs)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
|
qcs_consume(qcs, ncb_data(rxbuf, 0));
|
||||||
qcs->rx.offset += ncb_data(rxbuf, 0);
|
|
||||||
ncb_advance(rxbuf, ncb_data(rxbuf, 0));
|
|
||||||
b_free(&htx_buf);
|
b_free(&htx_buf);
|
||||||
|
|
||||||
if (fin)
|
if (fin)
|
||||||
|
@ -160,6 +160,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
/* TODO use uni limit for unidirectional streams */
|
/* TODO use uni limit for unidirectional streams */
|
||||||
qcs->rx.msd = quic_stream_is_local(qcc, id) ? qcc->lfctl.msd_bidi_l :
|
qcs->rx.msd = quic_stream_is_local(qcc, id) ? qcc->lfctl.msd_bidi_l :
|
||||||
qcc->lfctl.msd_bidi_r;
|
qcc->lfctl.msd_bidi_r;
|
||||||
|
qcs->rx.msd_init = qcs->rx.msd;
|
||||||
|
|
||||||
qcs->tx.buf = BUF_NULL;
|
qcs->tx.buf = BUF_NULL;
|
||||||
qcs->tx.offset = 0;
|
qcs->tx.offset = 0;
|
||||||
@ -283,6 +284,38 @@ void qcs_notify_send(struct qcs *qcs)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
|
||||||
|
* after STREAM parsing. Flow-control for received offsets may be allocated for
|
||||||
|
* the peer if needed.
|
||||||
|
*/
|
||||||
|
void qcs_consume(struct qcs *qcs, uint64_t bytes)
|
||||||
|
{
|
||||||
|
struct qcc *qcc = qcs->qcc;
|
||||||
|
struct quic_frame *frm;
|
||||||
|
enum ncb_ret ret;
|
||||||
|
|
||||||
|
ret = ncb_advance(&qcs->rx.ncbuf, bytes);
|
||||||
|
if (ret) {
|
||||||
|
ABORT_NOW(); /* should not happens because removal only in data */
|
||||||
|
}
|
||||||
|
|
||||||
|
qcs->rx.offset += bytes;
|
||||||
|
if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
|
||||||
|
frm = pool_zalloc(pool_head_quic_frame);
|
||||||
|
BUG_ON(!frm); /* TODO handle this properly */
|
||||||
|
|
||||||
|
qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
|
||||||
|
|
||||||
|
LIST_INIT(&frm->reflist);
|
||||||
|
frm->type = QUIC_FT_MAX_STREAM_DATA;
|
||||||
|
frm->max_stream_data.id = qcs->id;
|
||||||
|
frm->max_stream_data.max_stream_data = qcs->rx.msd;
|
||||||
|
|
||||||
|
LIST_APPEND(&qcc->lfctl.frms, &frm->list);
|
||||||
|
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
|
/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
|
||||||
* several streams, depending on the already open ones.
|
* several streams, depending on the already open ones.
|
||||||
* Return this node if succeeded, NULL if not.
|
* Return this node if succeeded, NULL if not.
|
||||||
@ -449,9 +482,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
|
|
||||||
BUG_ON(offset + len == qcs->rx.msd);
|
|
||||||
|
|
||||||
if (fin)
|
if (fin)
|
||||||
qcs->flags |= QC_SF_FIN_RECV;
|
qcs->flags |= QC_SF_FIN_RECV;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user