MINOR: mux_quic/h3: report termination events at stream layer

This patch adds termination level on QUIC MUX at the stream layer.
Similarly to the previous patch, error codes should be similar to the
ones already used in H2.

Most of the values are reported via qcc_reset_stream() which have now an
extra argument for this. This is used by application protocol H3 and
hq-interop.

qmux_sctl() callback is extended to support MUX_SCTL_TEVTS to report the
current stream termination event at the upper layer.
This commit is contained in:
Amaury Denoyelle 2026-04-27 10:31:11 +02:00
parent f8a1d24e31
commit 5160b84c7a
4 changed files with 28 additions and 9 deletions

View File

@ -41,7 +41,7 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs);
int qcc_realign_stream_txbuf(const struct qcs *qcs, struct buffer *out);
int qcc_release_stream_txbuf(struct qcs *qcs);
int qcc_stream_can_send(const struct qcs *qcs);
void qcc_reset_stream(struct qcs *qcs, int err);
void qcc_reset_stream(struct qcs *qcs, int err, int term_evt);
void qcc_send_stream(struct qcs *qcs, int urg, int count);
void qcc_abort_stream_read(struct qcs *qcs);
void qcc_update_shut_id(struct qcc *qcc, uint64_t val);

View File

@ -1789,7 +1789,7 @@ static ssize_t h3_rcv_buf(struct qcs *qcs, struct buffer *b, int fin)
/* FIN received, ensure body length is conform to any content-length header. */
if ((h3s->flags & H3_SF_HAVE_CLEN) && h3_check_body_size(qcs, 1)) {
qcc_abort_stream_read(qcs);
qcc_reset_stream(qcs, h3s->err);
qcc_reset_stream(qcs, h3s->err, se_tevt_type_proto_err);
goto done;
}
@ -1977,8 +1977,11 @@ static ssize_t h3_rcv_buf(struct qcs *qcs, struct buffer *b, int fin)
/* Interrupt decoding on stream/connection error detected. */
if (h3s->err) {
/* TODO Only unimplemented CONNECT reports H3_ERR_REQUEST_REJECTED here. */
const int tevt =
(h3s->err == H3_ERR_REQUEST_REJECTED) ? 0 : se_tevt_type_proto_err;
qcc_abort_stream_read(qcs);
qcc_reset_stream(qcs, h3s->err);
qcc_reset_stream(qcs, h3s->err, tevt);
total = b_data(b);
goto done;
}
@ -3147,7 +3150,7 @@ static int h3_attach(struct qcs *qcs, void *conn_ctx)
TRACE_STATE("close stream outside of GOAWAY range", H3_EV_H3S_NEW, qcs->qcc->conn, qcs);
qcc_abort_stream_read(qcs);
qcc_reset_stream(qcs, H3_ERR_REQUEST_REJECTED);
qcc_reset_stream(qcs, H3_ERR_REQUEST_REJECTED, 0);
}
/* TODO support push uni-stream rejection. */

View File

@ -1704,8 +1704,10 @@ static void _qcc_send_stream(struct qcs *qcs, int urg)
}
}
/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
void qcc_reset_stream(struct qcs *qcs, int err)
/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. If
* <tevt> is non null, it is used as a stream level termination event code.
*/
void qcc_reset_stream(struct qcs *qcs, int err, int tevt)
{
struct qcc *qcc = qcs->qcc;
const uint64_t diff = qcs_prep_bytes(qcs);
@ -1739,6 +1741,8 @@ void qcc_reset_stream(struct qcs *qcs, int err)
/* Report send error to stream-endpoint layer. */
if (qcs_sc(qcs)) {
se_fl_set_error(qcs->sd);
if (tevt)
se_report_term_evt(qcs->sd, tevt);
qcs_alert(qcs);
}
@ -2398,8 +2402,12 @@ int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err)
/* Manually set EOS if FIN already reached as futures RESET_STREAM will be ignored in this case. */
if (qcs_sc(qcs) && se_fl_test(qcs->sd, SE_FL_EOI)) {
se_fl_set(qcs->sd, SE_FL_EOS);
se_report_term_evt(qcs->sd, (qcc->flags & QC_CF_ERR_CONN ? se_tevt_type_rcv_err : se_tevt_type_eos));
qcs_alert(qcs);
}
else {
se_report_term_evt(qcs->sd, se_tevt_type_rst_rcvd);
}
/* If not defined yet, set abort info for the sedesc */
if (!qcs->sd->abort_info.info) {
@ -2422,7 +2430,7 @@ int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err)
* the RESET_STREAM frame it sends, but it can use any application error
* code.
*/
qcc_reset_stream(qcs, err);
qcc_reset_stream(qcs, err, 0);
if (qcc_may_expire(qcc) && !qcc->nb_hreq)
qcc_refresh_timeout(qcc);
@ -4266,6 +4274,10 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf,
if (!se_fl_test(qcs->sd, SE_FL_EOI)) {
TRACE_STATE("report error on stream aborted", QMUX_EV_STRM_RECV, qcc->conn, qcs);
se_fl_set(qcs->sd, SE_FL_ERROR);
se_report_term_evt(qcs->sd, (qcc->flags & QC_CF_ERR_CONN ? se_tevt_type_truncated_rcv_err : se_tevt_type_truncated_eos));
}
else {
se_report_term_evt(qcs->sd, (qcc->flags & QC_CF_ERR_CONN ? se_tevt_type_rcv_err : se_tevt_type_eos));
}
}
@ -4595,7 +4607,7 @@ static void qmux_strm_shut(struct stconn *sc, unsigned int mode, struct se_abort
}
else {
/* RESET_STREAM necessary. */
qcc_reset_stream(qcs, 0);
qcc_reset_stream(qcs, 0, 0);
}
tasklet_wakeup(qcc->wait_event.tasklet);
@ -4673,6 +4685,9 @@ static int qmux_sctl(struct stconn *sc, enum mux_sctl_type mux_sctl, void *outpu
dbg_ctx->ret.buf = *buf;
return ret;
case MUX_SCTL_TEVTS:
return qcs->sd->term_evts_log;
default:
return -1;
}

View File

@ -166,7 +166,8 @@ void qmux_dump_qcs_info(struct buffer *msg, const struct qcs *qcs)
if (qcs->sd) {
chunk_appendf(msg, " .sd=%p", qcs->sd);
chunk_appendf(msg, "(.flg=0x%08x)", se_fl_get(qcs->sd));
chunk_appendf(msg, "(.flg=0x%08x .evts=%s)",
se_fl_get(qcs->sd), tevt_evts2str(qcs->sd->term_evts_log));
}
chunk_appendf(msg, " .rx=%llu/%llu rxb=%u(%u)",