mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 22:31:28 +02:00
MEDIUM: mux-quic/h3/hq-interop: use ncbuf for bidir streams
Add a ncbuf for data reception on qcs. Thanks to this, the MUX is able to buffered all received frame directly into the buffer. Flow control parameters will be used to ensure there is never an overflow. This change will simplify Rx path with the future deletion of acked frames tree previously used for frames out of order.
This commit is contained in:
parent
06749f3d6f
commit
1290f1ebfb
@ -11,6 +11,7 @@
|
|||||||
#include <haproxy/buf-t.h>
|
#include <haproxy/buf-t.h>
|
||||||
#include <haproxy/connection-t.h>
|
#include <haproxy/connection-t.h>
|
||||||
#include <haproxy/list-t.h>
|
#include <haproxy/list-t.h>
|
||||||
|
#include <haproxy/ncbuf-t.h>
|
||||||
#include <haproxy/quic_stream-t.h>
|
#include <haproxy/quic_stream-t.h>
|
||||||
#include <haproxy/conn_stream-t.h>
|
#include <haproxy/conn_stream-t.h>
|
||||||
|
|
||||||
@ -101,8 +102,9 @@ struct qcs {
|
|||||||
|
|
||||||
struct {
|
struct {
|
||||||
struct eb_root frms; /* received frames ordered by their offsets */
|
struct eb_root frms; /* received frames ordered by their offsets */
|
||||||
uint64_t offset; /* the current offset of received data */
|
uint64_t offset; /* absolute current base offset of ncbuf */
|
||||||
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
|
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
|
||||||
|
struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
|
||||||
struct buffer app_buf; /* receive buffer used by conn_stream layer */
|
struct buffer app_buf; /* receive buffer used by conn_stream layer */
|
||||||
uint64_t msd; /* fctl bytes limit to enforce */
|
uint64_t msd; /* fctl bytes limit to enforce */
|
||||||
} rx;
|
} rx;
|
||||||
|
@ -18,13 +18,14 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type);
|
|||||||
void qcs_free(struct qcs *qcs);
|
void qcs_free(struct qcs *qcs);
|
||||||
|
|
||||||
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
|
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
|
||||||
|
struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf);
|
||||||
|
|
||||||
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
|
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
|
||||||
void qcs_notify_recv(struct qcs *qcs);
|
void qcs_notify_recv(struct qcs *qcs);
|
||||||
void qcs_notify_send(struct qcs *qcs);
|
void qcs_notify_send(struct qcs *qcs);
|
||||||
|
|
||||||
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
||||||
char fin, char *data, struct qcs **out_qcs, size_t *done);
|
char fin, char *data, struct qcs **out_qcs);
|
||||||
int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
|
int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
|
||||||
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
|
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
|
||||||
int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs);
|
int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs);
|
||||||
|
51
src/h3.c
51
src/h3.c
@ -25,6 +25,7 @@
|
|||||||
#include <haproxy/htx.h>
|
#include <haproxy/htx.h>
|
||||||
#include <haproxy/istbuf.h>
|
#include <haproxy/istbuf.h>
|
||||||
#include <haproxy/mux_quic.h>
|
#include <haproxy/mux_quic.h>
|
||||||
|
#include <haproxy/ncbuf.h>
|
||||||
#include <haproxy/pool.h>
|
#include <haproxy/pool.h>
|
||||||
#include <haproxy/qpack-dec.h>
|
#include <haproxy/qpack-dec.h>
|
||||||
#include <haproxy/qpack-enc.h>
|
#include <haproxy/qpack-enc.h>
|
||||||
@ -76,9 +77,9 @@ struct h3s {
|
|||||||
DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
|
DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
|
||||||
|
|
||||||
/* Simple function to duplicate a buffer */
|
/* Simple function to duplicate a buffer */
|
||||||
static inline struct buffer h3_b_dup(struct buffer *b)
|
static inline struct buffer h3_b_dup(struct ncbuf *b)
|
||||||
{
|
{
|
||||||
return b_make(b->area, b->size, b->head, b->data);
|
return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Decode a h3 frame header made of two QUIC varints from <b> buffer.
|
/* Decode a h3 frame header made of two QUIC varints from <b> buffer.
|
||||||
@ -104,7 +105,7 @@ static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen,
|
|||||||
*
|
*
|
||||||
* Returns the number of bytes handled or a negative error code.
|
* Returns the number of bytes handled or a negative error code.
|
||||||
*/
|
*/
|
||||||
static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
|
||||||
char fin)
|
char fin)
|
||||||
{
|
{
|
||||||
struct buffer htx_buf = BUF_NULL;
|
struct buffer htx_buf = BUF_NULL;
|
||||||
@ -119,8 +120,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
|||||||
int hdr_idx;
|
int hdr_idx;
|
||||||
|
|
||||||
/* TODO support buffer wrapping */
|
/* TODO support buffer wrapping */
|
||||||
BUG_ON(b_contig_data(buf, 0) != b_data(buf));
|
BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf));
|
||||||
if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0)
|
if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
qc_get_buf(qcs, &htx_buf);
|
qc_get_buf(qcs, &htx_buf);
|
||||||
@ -200,12 +201,12 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
|||||||
*
|
*
|
||||||
* Returns the number of bytes handled or a negative error code.
|
* Returns the number of bytes handled or a negative error code.
|
||||||
*/
|
*/
|
||||||
static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
|
||||||
char fin)
|
char fin)
|
||||||
{
|
{
|
||||||
struct buffer *appbuf;
|
struct buffer *appbuf;
|
||||||
struct htx *htx = NULL;
|
struct htx *htx = NULL;
|
||||||
size_t contig = 0, htx_sent = 0;
|
size_t htx_sent = 0;
|
||||||
int htx_space;
|
int htx_space;
|
||||||
char *head;
|
char *head;
|
||||||
|
|
||||||
@ -213,12 +214,12 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
|||||||
BUG_ON(!appbuf);
|
BUG_ON(!appbuf);
|
||||||
htx = htx_from_buf(appbuf);
|
htx = htx_from_buf(appbuf);
|
||||||
|
|
||||||
if (len > b_data(buf)) {
|
if (len > ncb_data(buf, 0)) {
|
||||||
len = b_data(buf);
|
len = ncb_data(buf, 0);
|
||||||
fin = 0;
|
fin = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
head = b_head(buf);
|
head = ncb_head(buf);
|
||||||
retry:
|
retry:
|
||||||
htx_space = htx_free_data_space(htx);
|
htx_space = htx_free_data_space(htx);
|
||||||
if (!htx_space) {
|
if (!htx_space) {
|
||||||
@ -231,10 +232,10 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
|||||||
fin = 0;
|
fin = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
contig = b_contig_data(buf, contig);
|
if (head + len > ncb_wrap(buf)) {
|
||||||
if (len > contig) {
|
size_t contig = ncb_wrap(buf) - head;
|
||||||
htx_sent = htx_add_data(htx, ist2(b_head(buf), contig));
|
htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig));
|
||||||
head = b_orig(buf);
|
head = ncb_orig(buf);
|
||||||
len -= contig;
|
len -= contig;
|
||||||
goto retry;
|
goto retry;
|
||||||
}
|
}
|
||||||
@ -256,15 +257,15 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
|||||||
*/
|
*/
|
||||||
static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
||||||
{
|
{
|
||||||
struct buffer *rxbuf = &qcs->rx.buf;
|
struct ncbuf *rxbuf = &qcs->rx.ncbuf;
|
||||||
struct h3s *h3s = qcs->ctx;
|
struct h3s *h3s = qcs->ctx;
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
|
|
||||||
h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
|
h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
|
||||||
if (!b_data(rxbuf))
|
if (!ncb_data(rxbuf, 0))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
while (b_data(rxbuf) && !(qcs->flags & QC_SF_DEM_FULL)) {
|
while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
|
||||||
uint64_t ftype, flen;
|
uint64_t ftype, flen;
|
||||||
struct buffer b;
|
struct buffer b;
|
||||||
char last_stream_frame = 0;
|
char last_stream_frame = 0;
|
||||||
@ -279,16 +280,17 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
|||||||
h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
|
h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
|
||||||
__func__, ftype, flen);
|
__func__, ftype, flen);
|
||||||
|
|
||||||
b_del(rxbuf, hlen);
|
ncb_advance(rxbuf, hlen);
|
||||||
h3s->demux_frame_type = ftype;
|
h3s->demux_frame_type = ftype;
|
||||||
h3s->demux_frame_len = flen;
|
h3s->demux_frame_len = flen;
|
||||||
|
qcs->rx.offset += hlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
flen = h3s->demux_frame_len;
|
flen = h3s->demux_frame_len;
|
||||||
ftype = h3s->demux_frame_type;
|
ftype = h3s->demux_frame_type;
|
||||||
if (flen > b_data(&b) && !b_full(rxbuf))
|
if (flen > b_data(&b) && !ncb_is_full(rxbuf))
|
||||||
break;
|
break;
|
||||||
last_stream_frame = (fin && flen == b_data(rxbuf));
|
last_stream_frame = (fin && flen == ncb_total_data(rxbuf));
|
||||||
|
|
||||||
switch (ftype) {
|
switch (ftype) {
|
||||||
case H3_FT_DATA:
|
case H3_FT_DATA:
|
||||||
@ -303,20 +305,21 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
|||||||
break;
|
break;
|
||||||
case H3_FT_PUSH_PROMISE:
|
case H3_FT_PUSH_PROMISE:
|
||||||
/* Not supported */
|
/* Not supported */
|
||||||
ret = MIN(b_data(rxbuf), flen);
|
ret = MIN(ncb_data(rxbuf, 0), flen);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
/* draft-ietf-quic-http34 9. Extensions to HTTP/3
|
/* draft-ietf-quic-http34 9. Extensions to HTTP/3
|
||||||
* unknown frame types MUST be ignored
|
* unknown frame types MUST be ignored
|
||||||
*/
|
*/
|
||||||
h3_debug_printf(stderr, "ignore unknown frame type 0x%lx\n", ftype);
|
h3_debug_printf(stderr, "ignore unknown frame type 0x%lx\n", ftype);
|
||||||
ret = MIN(b_data(rxbuf), flen);
|
ret = MIN(ncb_data(rxbuf, 0), flen);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
b_del(rxbuf, ret);
|
ncb_advance(rxbuf, ret);
|
||||||
BUG_ON(h3s->demux_frame_len < ret);
|
BUG_ON(h3s->demux_frame_len < ret);
|
||||||
h3s->demux_frame_len -= ret;
|
h3s->demux_frame_len -= ret;
|
||||||
|
qcs->rx.offset += ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -386,7 +389,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
|
|||||||
struct buffer b;
|
struct buffer b;
|
||||||
|
|
||||||
/* Work on a copy of <rxbuf> */
|
/* Work on a copy of <rxbuf> */
|
||||||
b = h3_b_dup(rxbuf);
|
b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data);
|
||||||
hlen = h3_decode_frm_header(&ftype, &flen, &b);
|
hlen = h3_decode_frm_header(&ftype, &flen, &b);
|
||||||
if (!hlen)
|
if (!hlen)
|
||||||
break;
|
break;
|
||||||
|
@ -7,19 +7,20 @@
|
|||||||
#include <haproxy/htx.h>
|
#include <haproxy/htx.h>
|
||||||
#include <haproxy/http.h>
|
#include <haproxy/http.h>
|
||||||
#include <haproxy/mux_quic.h>
|
#include <haproxy/mux_quic.h>
|
||||||
|
#include <haproxy/ncbuf.h>
|
||||||
|
|
||||||
static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
||||||
{
|
{
|
||||||
struct buffer *rxbuf = &qcs->rx.buf;
|
struct ncbuf *rxbuf = &qcs->rx.ncbuf;
|
||||||
struct htx *htx;
|
struct htx *htx;
|
||||||
struct htx_sl *sl;
|
struct htx_sl *sl;
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
struct buffer htx_buf = BUF_NULL;
|
struct buffer htx_buf = BUF_NULL;
|
||||||
struct ist path;
|
struct ist path;
|
||||||
char *ptr = b_head(rxbuf);
|
char *ptr = ncb_head(rxbuf);
|
||||||
char *end = b_wrap(rxbuf);
|
char *end = ncb_wrap(rxbuf);
|
||||||
size_t size = b_size(rxbuf);
|
size_t size = ncb_size(rxbuf);
|
||||||
size_t data = b_data(rxbuf);
|
size_t data = ncb_data(rxbuf, 0);
|
||||||
|
|
||||||
b_alloc(&htx_buf);
|
b_alloc(&htx_buf);
|
||||||
htx = htx_from_buf(&htx_buf);
|
htx = htx_from_buf(&htx_buf);
|
||||||
@ -76,7 +77,8 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
|||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
|
|
||||||
b_del(rxbuf, b_data(rxbuf));
|
qcs->rx.offset += ncb_data(rxbuf, 0);
|
||||||
|
ncb_advance(rxbuf, ncb_data(rxbuf, 0));
|
||||||
b_free(&htx_buf);
|
b_free(&htx_buf);
|
||||||
|
|
||||||
if (fin)
|
if (fin)
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
#include <haproxy/dynbuf.h>
|
#include <haproxy/dynbuf.h>
|
||||||
#include <haproxy/htx.h>
|
#include <haproxy/htx.h>
|
||||||
#include <haproxy/list.h>
|
#include <haproxy/list.h>
|
||||||
|
#include <haproxy/ncbuf.h>
|
||||||
#include <haproxy/pool.h>
|
#include <haproxy/pool.h>
|
||||||
#include <haproxy/quic_stream.h>
|
#include <haproxy/quic_stream.h>
|
||||||
#include <haproxy/sink.h>
|
#include <haproxy/sink.h>
|
||||||
@ -153,6 +154,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
qcc->rfctl.msd_bidi_l;
|
qcc->rfctl.msd_bidi_l;
|
||||||
|
|
||||||
qcs->rx.buf = BUF_NULL;
|
qcs->rx.buf = BUF_NULL;
|
||||||
|
qcs->rx.ncbuf = NCBUF_NULL;
|
||||||
qcs->rx.app_buf = BUF_NULL;
|
qcs->rx.app_buf = BUF_NULL;
|
||||||
qcs->rx.offset = 0;
|
qcs->rx.offset = 0;
|
||||||
qcs->rx.frms = EB_ROOT_UNIQUE;
|
qcs->rx.frms = EB_ROOT_UNIQUE;
|
||||||
@ -184,6 +186,16 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
|
||||||
|
{
|
||||||
|
struct buffer buf;
|
||||||
|
|
||||||
|
buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
|
||||||
|
b_free(&buf);
|
||||||
|
|
||||||
|
*ncbuf = NCBUF_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Free a qcs. This function must only be done to remove a stream on allocation
|
/* Free a qcs. This function must only be done to remove a stream on allocation
|
||||||
* error or connection shutdown. Else use qcs_destroy which handle all the
|
* error or connection shutdown. Else use qcs_destroy which handle all the
|
||||||
* QUIC connection mechanism.
|
* QUIC connection mechanism.
|
||||||
@ -191,6 +203,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
void qcs_free(struct qcs *qcs)
|
void qcs_free(struct qcs *qcs)
|
||||||
{
|
{
|
||||||
b_free(&qcs->rx.buf);
|
b_free(&qcs->rx.buf);
|
||||||
|
qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
|
||||||
b_free(&qcs->tx.buf);
|
b_free(&qcs->tx.buf);
|
||||||
|
|
||||||
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
|
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
|
||||||
@ -215,6 +228,21 @@ struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr)
|
|||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
|
||||||
|
{
|
||||||
|
struct buffer buf = BUF_NULL;
|
||||||
|
|
||||||
|
if (ncb_is_null(ncbuf)) {
|
||||||
|
b_alloc(&buf);
|
||||||
|
BUG_ON(b_is_null(&buf));
|
||||||
|
|
||||||
|
*ncbuf = ncb_make(buf.area, buf.size, 0);
|
||||||
|
ncb_init(ncbuf, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ncbuf;
|
||||||
|
}
|
||||||
|
|
||||||
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
|
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
|
||||||
{
|
{
|
||||||
struct qcc *qcc = qcs->qcc;
|
struct qcc *qcc = qcs->qcc;
|
||||||
@ -344,22 +372,17 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
|
|||||||
* <out_qcs>. In case of success, the caller can immediatly call qcc_decode_qcs
|
* <out_qcs>. In case of success, the caller can immediatly call qcc_decode_qcs
|
||||||
* to process the frame content.
|
* to process the frame content.
|
||||||
*
|
*
|
||||||
* Returns a code indicating how the frame was handled.
|
* Returns 0 on success else non-zero.
|
||||||
* - 0: frame received completely and can be dropped.
|
|
||||||
* - 1: frame not received but can be dropped.
|
|
||||||
* - 2: frame cannot be handled, either partially or not at all. <done>
|
|
||||||
* indicated the number of bytes handled. The rest should be buffered.
|
|
||||||
*/
|
*/
|
||||||
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
||||||
char fin, char *data, struct qcs **out_qcs, size_t *done)
|
char fin, char *data, struct qcs **out_qcs)
|
||||||
{
|
{
|
||||||
struct qcs *qcs;
|
struct qcs *qcs;
|
||||||
size_t total, diff;
|
enum ncb_ret ret;
|
||||||
|
|
||||||
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
|
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
|
||||||
|
|
||||||
*out_qcs = NULL;
|
*out_qcs = NULL;
|
||||||
*done = 0;
|
|
||||||
|
|
||||||
qcs = qcc_get_qcs(qcc, id);
|
qcs = qcc_get_qcs(qcc, id);
|
||||||
if (!qcs) {
|
if (!qcs) {
|
||||||
@ -375,44 +398,46 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
|||||||
|
|
||||||
*out_qcs = qcs;
|
*out_qcs = qcs;
|
||||||
|
|
||||||
if (offset > qcs->rx.offset)
|
|
||||||
return 2;
|
|
||||||
|
|
||||||
if (offset + len <= qcs->rx.offset) {
|
if (offset + len <= qcs->rx.offset) {
|
||||||
TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Last frame already handled for this stream. */
|
/* TODO if last frame already received, stream size must not change.
|
||||||
BUG_ON(qcs->flags & QC_SF_FIN_RECV);
|
* Else send FINAL_SIZE_ERROR.
|
||||||
|
*/
|
||||||
|
|
||||||
/* TODO initial max-stream-data overflow. Implement FLOW_CONTROL_ERROR emission. */
|
/* TODO initial max-stream-data overflow. Implement FLOW_CONTROL_ERROR emission. */
|
||||||
BUG_ON(offset + len > qcs->rx.msd);
|
BUG_ON(offset + len > qcs->rx.msd);
|
||||||
|
|
||||||
if (!qc_get_buf(qcs, &qcs->rx.buf) || b_full(&qcs->rx.buf)) {
|
if (!qc_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
|
||||||
/* TODO should mark qcs as full */
|
/* TODO should mark qcs as full */
|
||||||
return 2;
|
ABORT_NOW();
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TRACE_DEVEL("newly received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
TRACE_DEVEL("newly received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||||
diff = qcs->rx.offset - offset;
|
if (offset < qcs->rx.offset) {
|
||||||
|
len -= qcs->rx.offset - offset;
|
||||||
|
offset = qcs->rx.offset;
|
||||||
|
}
|
||||||
|
|
||||||
len -= diff;
|
ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
|
||||||
data += diff;
|
if (ret != NCB_RET_OK) {
|
||||||
|
if (ret == NCB_RET_DATA_REJ) {
|
||||||
/* TODO handle STREAM frames larger than RX buffer. */
|
/* TODO generate PROTOCOL_VIOLATION error */
|
||||||
BUG_ON(len > b_size(&qcs->rx.buf));
|
TRACE_DEVEL("leaving on data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
|
||||||
|
qcc->conn, qcs);
|
||||||
total = b_putblk(&qcs->rx.buf, data, len);
|
}
|
||||||
qcs->rx.offset += total;
|
else if (ret == NCB_RET_GAP_SIZE) {
|
||||||
*done = total;
|
TRACE_DEVEL("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
|
||||||
|
qcc->conn, qcs);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
|
/* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
|
||||||
BUG_ON(qcs->rx.offset == qcs->rx.msd);
|
BUG_ON(offset + len == qcs->rx.msd);
|
||||||
|
|
||||||
if (total < len) {
|
|
||||||
TRACE_DEVEL("leaving on partially received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fin)
|
if (fin)
|
||||||
qcs->flags |= QC_SF_FIN_RECV;
|
qcs->flags |= QC_SF_FIN_RECV;
|
||||||
|
@ -2194,105 +2194,19 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
|
|||||||
struct quic_stream *strm_frm,
|
struct quic_stream *strm_frm,
|
||||||
struct quic_conn *qc)
|
struct quic_conn *qc)
|
||||||
{
|
{
|
||||||
struct quic_rx_strm_frm *frm;
|
|
||||||
struct eb64_node *frm_node;
|
|
||||||
struct qcs *qcs = NULL;
|
struct qcs *qcs = NULL;
|
||||||
size_t done, buf_was_full;
|
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
|
ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
|
||||||
strm_frm->offset.key, strm_frm->fin,
|
strm_frm->offset.key, strm_frm->fin,
|
||||||
(char *)strm_frm->data, &qcs, &done);
|
(char *)strm_frm->data, &qcs);
|
||||||
|
|
||||||
/* invalid frame */
|
/* frame rejected - packet must not be acknowledeged */
|
||||||
if (ret == 1)
|
if (ret)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
/* already fully received offset */
|
if (qcs)
|
||||||
if (ret == 0 && done == 0)
|
|
||||||
return 1;
|
|
||||||
|
|
||||||
/* frame not handled (partially or completely) must be buffered */
|
|
||||||
if (ret == 2) {
|
|
||||||
frm = new_quic_rx_strm_frm(strm_frm, pkt);
|
|
||||||
if (!frm) {
|
|
||||||
TRACE_PROTO("Could not alloc RX STREAM frame",
|
|
||||||
QUIC_EV_CONN_PSTRM, qc);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* frame partially handled by the MUX */
|
|
||||||
if (done) {
|
|
||||||
BUG_ON(done >= frm->len); /* must never happen */
|
|
||||||
frm->len -= done;
|
|
||||||
frm->data += done;
|
|
||||||
frm->offset_node.key += done;
|
|
||||||
}
|
|
||||||
|
|
||||||
eb64_insert(&qcs->rx.frms, &frm->offset_node);
|
|
||||||
quic_rx_packet_refinc(pkt);
|
|
||||||
|
|
||||||
/* interrupt only if frame was not received at all. */
|
|
||||||
if (!done)
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Decode the data if buffer is already full as it's not possible to
|
|
||||||
* dequeue a frame in this condition.
|
|
||||||
*/
|
|
||||||
if (b_full(&qcs->rx.buf))
|
|
||||||
qcc_decode_qcs(qc->qcc, qcs);
|
qcc_decode_qcs(qc->qcc, qcs);
|
||||||
|
|
||||||
retry:
|
|
||||||
/* Frame received (partially or not) by the mux.
|
|
||||||
* If there is buffered frame for next offset, it may be possible to
|
|
||||||
* receive them now.
|
|
||||||
*/
|
|
||||||
frm_node = eb64_first(&qcs->rx.frms);
|
|
||||||
while (frm_node) {
|
|
||||||
frm = eb64_entry(frm_node,
|
|
||||||
struct quic_rx_strm_frm, offset_node);
|
|
||||||
|
|
||||||
ret = qcc_recv(qc->qcc, qcs->id, frm->len,
|
|
||||||
frm->offset_node.key, frm->fin,
|
|
||||||
(char *)frm->data, &qcs, &done);
|
|
||||||
|
|
||||||
BUG_ON(ret == 1); /* must never happen for buffered frames */
|
|
||||||
|
|
||||||
/* interrupt the parsing if the frame cannot be handled
|
|
||||||
* entirely for the moment only.
|
|
||||||
*/
|
|
||||||
if (ret == 2) {
|
|
||||||
if (done) {
|
|
||||||
BUG_ON(done >= frm->len); /* must never happen */
|
|
||||||
frm->len -= done;
|
|
||||||
frm->data += done;
|
|
||||||
|
|
||||||
eb64_delete(&frm->offset_node);
|
|
||||||
frm->offset_node.key += done;
|
|
||||||
eb64_insert(&qcs->rx.frms, &frm->offset_node);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Remove a newly received frame or an invalid one. */
|
|
||||||
frm_node = eb64_next(frm_node);
|
|
||||||
eb64_delete(&frm->offset_node);
|
|
||||||
quic_rx_packet_refdec(frm->pkt);
|
|
||||||
pool_free(pool_head_quic_rx_strm_frm, frm);
|
|
||||||
}
|
|
||||||
|
|
||||||
buf_was_full = b_full(&qcs->rx.buf);
|
|
||||||
/* Decode the received data. */
|
|
||||||
qcc_decode_qcs(qc->qcc, qcs);
|
|
||||||
|
|
||||||
/* Buffer was full so the reception was stopped. Now the buffer has
|
|
||||||
* space available thanks to qcc_decode_qcs(). We can now retry to
|
|
||||||
* handle more data.
|
|
||||||
*/
|
|
||||||
if (buf_was_full && !b_full(&qcs->rx.buf))
|
|
||||||
goto retry;
|
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user