diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index a2004a49c..ecee48407 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -111,10 +111,33 @@ struct qcc { /* Maximum size of stream Rx buffer. */ #define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ) +/* QUIC stream states + * + * On initialization a stream is put on idle state. It is opened as soon as + * data has been successfully sent or received on it. + * + * A bidirectional stream has two channels which can be closed separately. The + * local channel is closed when the STREAM frame with FIN or a RESET_STREAM has + * been emitted. The remote channel is closed as soon as all data from the peer + * has been received. The stream goes instantely to the close state once both + * channels are closed. + * + * A unidirectional stream has only one channel of communication. Thus, it does + * not use half closed states and transition directly from open to close state. + */ +enum qcs_state { + QC_SS_IDLE = 0, /* initial state */ + QC_SS_OPEN, /* opened */ + QC_SS_HLOC, /* half-closed local */ + QC_SS_HREM, /* half-closed remote */ + QC_SS_CLO, /* closed */ +} __attribute__((packed)); + struct qcs { struct qcc *qcc; struct sedesc *sd; uint32_t flags; /* QC_SF_* */ + enum qcs_state st; /* QC_SS_* state */ void *ctx; /* app-ops context */ struct { diff --git a/src/mux_quic.c b/src/mux_quic.c index f03a58170..53d0aab66 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -127,6 +127,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->qcc = qcc; qcs->sd = NULL; qcs->flags = QC_SF_NONE; + qcs->st = QC_SS_IDLE; qcs->ctx = NULL; /* Allocate transport layer stream descriptor. Only needed for TX. */ @@ -226,6 +227,72 @@ static forceinline struct stconn *qcs_sc(const struct qcs *qcs) return qcs->sd ? qcs->sd->sc : NULL; } +/* Mark a stream as open if it was idle. This can be used on every + * successful emission/reception operation to update the stream state. + */ +static void qcs_idle_open(struct qcs *qcs) +{ + /* This operation must not be used if the stream is already closed. */ + BUG_ON_HOT(qcs->st == QC_SS_CLO); + + if (qcs->st == QC_SS_IDLE) { + qcs->st = QC_SS_OPEN; + TRACE_DEVEL("opening stream", QMUX_EV_QCS_NEW, qcs->qcc->conn, qcs); + } +} + +/* Close the local channel of instance. */ +static void qcs_close_local(struct qcs *qcs) +{ + /* The stream must have already been opened. */ + BUG_ON_HOT(qcs->st == QC_SS_IDLE); + + /* This operation cannot be used multiple times. */ + BUG_ON_HOT(qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO); + + if (quic_stream_is_bidi(qcs->id)) { + qcs->st = (qcs->st == QC_SS_HREM) ? QC_SS_CLO : QC_SS_HLOC; + } + else { + /* Only local uni streams are valid for this operation. */ + BUG_ON_HOT(quic_stream_is_remote(qcs->qcc, qcs->id)); + qcs->st = QC_SS_CLO; + } + + TRACE_DEVEL("closing stream locally", QMUX_EV_QCS_END, qcs->qcc->conn, qcs); +} + +/* Close the remote channel of instance. */ +static void qcs_close_remote(struct qcs *qcs) +{ + /* The stream must have already been opened. */ + BUG_ON_HOT(qcs->st == QC_SS_IDLE); + + /* This operation cannot be used multiple times. */ + BUG_ON_HOT(qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO); + + if (quic_stream_is_bidi(qcs->id)) { + qcs->st = (qcs->st == QC_SS_HLOC) ? QC_SS_CLO : QC_SS_HREM; + } + else { + /* Only remote uni streams are valid for this operation. */ + BUG_ON_HOT(quic_stream_is_local(qcs->qcc, qcs->id)); + qcs->st = QC_SS_CLO; + } + + TRACE_DEVEL("closing stream remotely", QMUX_EV_QCS_END, qcs->qcc->conn, qcs); +} + +static int qcs_is_close_local(struct qcs *qcs) +{ + return qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO; +} + +static __maybe_unused int qcs_is_close_remote(struct qcs *qcs) +{ + return qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO; +} + struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr) { struct buffer *buf = b_alloc(bptr); @@ -665,6 +732,8 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, return 0; } + qcs_idle_open(qcs); + if (offset + len > qcs->rx.offset_max) { uint64_t diff = offset + len - qcs->rx.offset_max; qcs->rx.offset_max = offset + len; @@ -724,6 +793,9 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, if (fin) qcs->flags |= QC_SF_SIZE_KNOWN; + if (qcs->flags & QC_SF_SIZE_KNOWN && !ncb_is_fragmented(&qcs->rx.ncbuf)) + qcs_close_remote(qcs); + if (ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) qcc_decode_qcs(qcc, qcs); @@ -1093,6 +1165,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) if (offset + data < qcs->tx.sent_offset) return; + qcs_idle_open(qcs); + diff = offset + data - qcs->tx.sent_offset; if (diff) { /* increase offset sum on connection */ @@ -1118,6 +1192,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) } if (qcs->tx.offset == qcs->tx.sent_offset && qcs_stream_fin(qcs)) { + /* Close stream locally. */ + qcs_close_local(qcs); /* Reset flag to not emit multiple FIN STREAM frames. */ qcs->flags &= ~QC_SF_FIN_STREAM; } @@ -1302,6 +1378,11 @@ static int qc_send(struct qcc *qcc) continue; } + if (qcs_is_close_local(qcs)) { + node = eb64_next(node); + continue; + } + if (qcs->flags & QC_SF_BLK_SFCTL) { node = eb64_next(node); continue; @@ -1417,6 +1498,14 @@ static int qc_purge_streams(struct qcc *qcc) struct qcs *qcs = eb64_entry(node, struct qcs, by_id); node = eb64_next(node); + /* Release not attached closed streams. */ + if (qcs->st == QC_SS_CLO && !qcs_sc(qcs)) { + TRACE_DEVEL("purging closed stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs); + qcs_destroy(qcs); + release = 1; + continue; + } + /* Release detached streams with empty buffer. */ if (qcs->flags & QC_SF_DETACH) { if (!b_data(&qcs->tx.buf) && @@ -1635,6 +1724,14 @@ static void qc_detach(struct sedesc *sd) TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs); + /* TODO this BUG_ON_HOT() is not correct as the stconn layer may detach + * from the stream even if it is not closed remotely at the QUIC layer. + * This happens for example when a stream must be closed due to a + * rejected request. To better handle these cases, it will be required + * to implement shutr/shutw MUX operations. Once this is done, this + * BUG_ON_HOT() statement can be adjusted. + */ + //BUG_ON_HOT(!qcs_is_close_remote(qcs)); --qcc->nb_sc; if ((b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) && @@ -1739,8 +1836,15 @@ static size_t qc_snd_buf(struct stconn *sc, struct buffer *buf, TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + if (qcs_is_close_local(qcs)) { + ret = count; + count = 0; + goto end; + } + ret = qcs->qcc->app_ops->snd_buf(sc, buf, count, flags); + end: TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); return ret; @@ -1842,6 +1946,18 @@ static int qc_wake(struct connection *conn) } +static char *qcs_st_to_str(enum qcs_state st) +{ + switch (st) { + case QC_SS_IDLE: return "IDL"; + case QC_SS_OPEN: return "OPN"; + case QC_SS_HLOC: return "HCL"; + case QC_SS_HREM: return "HCR"; + case QC_SS_CLO: return "CLO"; + default: return "???"; + } +} + static void qmux_trace_frm(const struct quic_frame *frm) { switch (frm->type) { @@ -1877,7 +1993,9 @@ static void qmux_trace(enum trace_level level, uint64_t mask, chunk_appendf(&trace_buf, " : qcc=%p(F)", qcc); if (qcs) - chunk_appendf(&trace_buf, " qcs=%p(%llu)", qcs, (ull)qcs->id); + chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s", + qcs, (ull)qcs->id, + qcs_st_to_str(qcs->st)); if (mask & QMUX_EV_QCC_NQCS) { const uint64_t *id = a3;