diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 903e32623..fca28f7fa 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -136,7 +136,6 @@ struct qcs { struct qcc_app_ops { int (*init)(struct qcc *qcc); int (*attach)(struct qcs *qcs); - int (*attach_ruqs)(struct qcs *qcs, void *ctx); int (*decode_qcs)(struct qcs *qcs, int fin, void *ctx); size_t (*snd_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); void (*detach)(struct qcs *qcs); diff --git a/src/h3.c b/src/h3.c index e3a3f77c3..d5e96d541 100644 --- a/src/h3.c +++ b/src/h3.c @@ -165,9 +165,8 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, * * Returns 0 on success else non-zero. */ -static int h3_parse_uni_stream_no_h3(struct qcs *qcs, void *ctx) +static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf) { - struct ncbuf *rxbuf = &qcs->rx.ncbuf; struct h3s *h3s = qcs->ctx; BUG_ON_HOT(!quic_stream_is_uni(qcs->id) || @@ -418,6 +417,47 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, return htx_sent; } +/* Parse a SETTINGS frame which must not be truncated with as length from + * buffer. This function does not update this buffer. + * + * Returns 0 on success else non-zero. + */ +static int h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, size_t flen) +{ + uint64_t id, value; + const unsigned char *buf, *end; + + buf = (const unsigned char *)ncb_head(rxbuf); + end = buf + flen; + + while (buf < end) { + if (!quic_dec_int(&id, &buf, end) || !quic_dec_int(&value, &buf, end)) + return 1; + + h3_debug_printf(stderr, "%s id: %llu value: %llu\n", + __func__, (unsigned long long)id, (unsigned long long)value); + switch (id) { + case H3_SETTINGS_QPACK_MAX_TABLE_CAPACITY: + h3c->qpack_max_table_capacity = value; + break; + case H3_SETTINGS_MAX_FIELD_SECTION_SIZE: + h3c->max_field_section_size = value; + break; + case H3_SETTINGS_QPACK_BLOCKED_STREAMS: + h3c->qpack_blocked_streams = value; + break; + case H3_SETTINGS_RESERVED_2 ... H3_SETTINGS_RESERVED_5: + h3c->err = H3_SETTINGS_ERROR; + return 1; + default: + /* MUST be ignored */ + break; + } + } + + return 0; +} + /* Decode remotely initiated bidi-stream. must be set to indicate * that we received the last data of the stream. * @@ -434,6 +474,18 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) if (!ncb_data(rxbuf, 0)) return 0; + if (quic_stream_is_uni(qcs->id) && !(h3s->flags & H3_SF_UNI_INIT)) { + if (h3_init_uni_stream(h3c, qcs, rxbuf)) + return 1; + } + + if (quic_stream_is_uni(qcs->id) && (h3s->flags & H3_SF_UNI_NO_H3)) { + /* For non-h3 STREAM, parse it and return immediately. */ + if (h3_parse_uni_stream_no_h3(qcs, rxbuf)) + return 1; + return 0; + } + while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) { uint64_t ftype, flen; struct buffer b; @@ -492,10 +544,18 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) /* TODO handle error reporting. Stream closure required. */ if (ret < 0) { ABORT_NOW(); } break; + case H3_FT_CANCEL_PUSH: case H3_FT_PUSH_PROMISE: + case H3_FT_MAX_PUSH_ID: + case H3_FT_GOAWAY: /* Not supported */ ret = flen; break; + case H3_FT_SETTINGS: + if (h3_parse_settings_frm(qcs->qcc->ctx, rxbuf, flen)) + return 1; + ret = flen; + break; default: /* draft-ietf-quic-http34 9. Extensions to HTTP/3 * @@ -521,105 +581,6 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) return 0; } -/* Parse a SETTINGS frame which must not be truncated with as length from - * buffer. This function does not update this buffer. - * Returns 0 if something wrong happened, 1 if not. - */ -static int h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, size_t flen) -{ - uint64_t id, value; - const unsigned char *buf, *end; - - buf = (const unsigned char *)ncb_head(rxbuf); - end = buf + flen; - - while (buf < end) { - if (!quic_dec_int(&id, &buf, end) || !quic_dec_int(&value, &buf, end)) - return 0; - - h3_debug_printf(stderr, "%s id: %llu value: %llu\n", - __func__, (unsigned long long)id, (unsigned long long)value); - switch (id) { - case H3_SETTINGS_QPACK_MAX_TABLE_CAPACITY: - h3c->qpack_max_table_capacity = value; - break; - case H3_SETTINGS_MAX_FIELD_SECTION_SIZE: - h3c->max_field_section_size = value; - break; - case H3_SETTINGS_QPACK_BLOCKED_STREAMS: - h3c->qpack_blocked_streams = value; - break; - case H3_SETTINGS_RESERVED_2 ... H3_SETTINGS_RESERVED_5: - h3c->err = H3_SETTINGS_ERROR; - return 0; - default: - /* MUST be ignored */ - break; - } - } - - return 1; -} - -/* Decode remotely initiated uni-stream. We stop parsing a frame as soon as - * there is not enough received data. - * Returns 0 if something wrong happened, 1 if not. - */ -static int h3_control_recv(struct qcs *qcs, void *ctx) -{ - struct ncbuf *rxbuf = &qcs->rx.ncbuf; - struct h3c *h3c = ctx; - - h3_debug_printf(stderr, "%s STREAM ID: %lu\n", __func__, qcs->id); - if (!ncb_data(rxbuf, 0)) - return 1; - - while (ncb_data(rxbuf, 0)) { - size_t hlen; - uint64_t ftype, flen; - struct buffer b; - - /* Work on a copy of */ - b = h3_b_dup(rxbuf); - hlen = h3_decode_frm_header(&ftype, &flen, &b); - if (!hlen) - break; - - h3_debug_printf(stderr, "%s: ftype: %llu, flen: %llu\n", __func__, - (unsigned long long)ftype, (unsigned long long)flen); - if (flen > b_data(&b)) - break; - - qcs_consume(qcs, hlen); - /* From here, a frame must not be truncated */ - switch (ftype) { - case H3_FT_CANCEL_PUSH: - /* XXX TODO XXX */ - ABORT_NOW(); - break; - case H3_FT_SETTINGS: - if (!h3_parse_settings_frm(h3c, rxbuf, flen)) - return 0; - break; - case H3_FT_GOAWAY: - /* XXX TODO XXX */ - ABORT_NOW(); - break; - case H3_FT_MAX_PUSH_ID: - /* XXX TODO XXX */ - ABORT_NOW(); - break; - default: - /* Error */ - h3c->err = H3_FRAME_UNEXPECTED; - return 0; - } - qcs_consume(qcs, flen); - } - - return 1; -} - /* Returns buffer for data sending. * May be NULL if the allocation failed. */ @@ -957,50 +918,6 @@ static int h3_attach(struct qcs *qcs) return 0; } -/* Finalize the initialization of remotely initiated uni-stream . - * Return 1 if succeeded, 0 if not. In this latter case, set the ->err h3 error - * to inform the QUIC mux layer of the encountered error. - */ -static int h3_attach_ruqs(struct qcs *qcs, void *ctx) -{ - struct h3c *h3c = ctx; - struct h3s *h3s = qcs->ctx; - struct ncbuf *rxbuf = &qcs->rx.ncbuf; - - if (h3_init_uni_stream(h3c, qcs, rxbuf)) - return 0; - - /* Note that for all the uni-streams below, this is an error to receive two times the - * same type of uni-stream (even for Push stream which is not supported at this time. - */ - switch (h3s->type) { - case H3S_T_CTRL: - h3c->rctrl.qcs = qcs; - h3c->rctrl.cb = h3_control_recv; - qcs_subscribe(qcs, SUB_RETRY_RECV, &h3c->rctrl.wait_event); - break; - case H3S_T_PUSH: - /* NOT SUPPORTED */ - break; - case H3S_T_QPACK_ENC: - h3c->rqpack_enc.qcs = qcs; - h3c->rqpack_enc.cb = h3_parse_uni_stream_no_h3; - qcs_subscribe(qcs, SUB_RETRY_RECV, &h3c->rqpack_enc.wait_event); - break; - case H3S_T_QPACK_DEC: - h3c->rqpack_dec.qcs = qcs; - h3c->rqpack_dec.cb = h3_parse_uni_stream_no_h3; - qcs_subscribe(qcs, SUB_RETRY_RECV, &h3c->rqpack_dec.wait_event); - break; - default: - /* Error */ - h3c->err = H3_STREAM_CREATION_ERROR; - return 0; - } - - return 1; -} - static void h3_detach(struct qcs *qcs) { struct h3s *h3s = qcs->ctx; @@ -1109,7 +1026,7 @@ static int h3_init(struct qcc *qcc) if (!h3_uqs_init(&h3c->rqpack_enc, h3c, NULL, h3_uqs_task) || !h3_uqs_init(&h3c->rqpack_dec, h3c, NULL, h3_uqs_task) || - !h3_uqs_init(&h3c->rctrl, h3c, h3_control_recv, h3_uqs_task)) + !h3_uqs_init(&h3c->rctrl, h3c, NULL, h3_uqs_task)) goto fail_no_h3_ruqs; if (!h3_uqs_init(&h3c->lctrl, h3c, NULL, h3_uqs_task) || @@ -1156,7 +1073,6 @@ static int h3_is_active(const struct qcc *qcc, void *ctx) const struct qcc_app_ops h3_ops = { .init = h3_init, .attach = h3_attach, - .attach_ruqs = h3_attach_ruqs, .decode_qcs = h3_decode_qcs, .snd_buf = h3_snd_buf, .detach = h3_detach, diff --git a/src/mux_quic.c b/src/mux_quic.c index 206307f55..c47e83a2c 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1175,17 +1175,17 @@ static int qc_recv(struct qcc *qcc) node = eb64_first(&qcc->streams_by_id); while (node) { - qcs = eb64_entry(node, struct qcs, by_id); + uint64_t id; - /* TODO unidirectional streams have their own mechanism for Rx. - * This should be unified. - */ - if (quic_stream_is_uni(qcs->id)) { + qcs = eb64_entry(node, struct qcs, by_id); + id = qcs->id; + + if (!ncb_data(&qcs->rx.ncbuf, 0) || (qcs->flags & QC_SF_DEM_FULL)) { node = eb64_next(node); continue; } - if (!ncb_data(&qcs->rx.ncbuf, 0) || (qcs->flags & QC_SF_DEM_FULL)) { + if (quic_stream_is_uni(id) && quic_stream_is_local(qcc, id)) { node = eb64_next(node); continue; } diff --git a/src/xprt_quic.c b/src/xprt_quic.c index b081c88fe..41d7ef6c4 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -2169,18 +2169,25 @@ static inline int qc_provide_cdata(struct quic_enc_level *el, return 0; } -/* Handle bidirectional STREAM frame. Depending on its ID, several - * streams may be open. The data are copied to the stream RX buffer if possible. - * If not, the STREAM frame is stored to be treated again later. - * We rely on the flow control so that not to store too much STREAM frames. - * Return 1 if succeeded, 0 if not. +/* Parse a STREAM frame + * + * Return 1 on success. On error, 0 is returned. In this case, the packet + * containing the frame must not be acknowledged. */ -static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, - struct quic_stream *strm_frm, - struct quic_conn *qc) +static inline int qc_handle_strm_frm(struct quic_rx_packet *pkt, + struct quic_stream *strm_frm, + struct quic_conn *qc) { int ret; + /* RFC9000 13.1. Packet Processing + * + * A packet MUST NOT be acknowledged until packet protection has been + * successfully removed and all frames contained in the packet have + * been processed. For STREAM frames, this means the data has been + * enqueued in preparation to be received by the application protocol, + * but it does not require that data be delivered and consumed. + */ ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len, strm_frm->offset.key, strm_frm->fin, (char *)strm_frm->data); @@ -2192,84 +2199,6 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, return 1; } -/* Handle unidirectional STREAM frame. Depending on its ID, several - * streams may be open. The data are copied to the stream RX buffer if possible. - * If not, the STREAM frame is stored to be treated again later. - * We rely on the flow control so that not to store too much STREAM frames. - * Return 1 if succeeded, 0 if not. - */ -static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt, - struct quic_stream *strm_frm, - struct quic_conn *qc) -{ - struct qcs *strm; - enum ncb_ret ret; - - strm = qcc_get_qcs(qc->qcc, strm_frm->id); - if (!strm) { - TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc); - return 0; - } - - if (strm_frm->offset.key < strm->rx.offset) { - size_t diff; - - if (strm_frm->offset.key + strm_frm->len <= strm->rx.offset) { - TRACE_PROTO("Already received STREAM data", - QUIC_EV_CONN_PSTRM, qc); - goto out; - } - - TRACE_PROTO("Partially already received STREAM data", QUIC_EV_CONN_PSTRM, qc); - diff = strm->rx.offset - strm_frm->offset.key; - strm_frm->offset.key = strm->rx.offset; - strm_frm->len -= diff; - strm_frm->data += diff; - } - - qc_get_ncbuf(strm, &strm->rx.ncbuf); - if (ncb_is_null(&strm->rx.ncbuf)) - return 0; - - ret = ncb_add(&strm->rx.ncbuf, strm_frm->offset.key - strm->rx.offset, - (char *)strm_frm->data, strm_frm->len, NCB_ADD_COMPARE); - if (ret != NCB_RET_OK) - return 0; - - /* Inform the application of the arrival of this new stream */ - if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) { - TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc); - return 0; - } - - qcs_notify_recv(strm); - - out: - return 1; -} - -/* Returns 1 on success or 0 on error. On error, the packet containing the - * frame must not be acknowledged. - */ -static inline int qc_handle_strm_frm(struct quic_rx_packet *pkt, - struct quic_stream *strm_frm, - struct quic_conn *qc) -{ - /* RFC9000 13.1. Packet Processing - * - * A packet MUST NOT be acknowledged until packet protection has been - * successfully removed and all frames contained in the packet have - * been processed. For STREAM frames, this means the data has been - * enqueued in preparation to be received by the application protocol, - * but it does not require that data be delivered and consumed. - */ - - if (strm_frm->id & QCS_ID_DIR_BIT) - return qc_handle_uni_strm_frm(pkt, strm_frm, qc); - else - return qc_handle_bidi_strm_frm(pkt, strm_frm, qc); -} - /* Duplicate all frames from list into list * for QUIC connection. * This is a best effort function which never fails even if no memory could be