MEDIUM: mux-quic: refactor streams opening

Review the whole API used to access/instantiate qcs.

A public function qcc_open_stream_local() is available to the
application protocol layer. It allows to easily opening a local stream.
The ID is automatically attributed to the next one available.

For remote streams, qcc_open_stream_remote() has been implemented. It
will automatically take care of allocating streams in a linear way
according to the ID. This function is called via qcc_get_qcs() which can
be used for each qcc_recv*() operations. For the moment, it is only used
for STREAM frames via qcc_recv(), but soon it will be implemented for
other frames types which can also be used to open a new stream.

qcs_new() and qcs_free() has been restricted to the MUX QUIC only as
they are now reserved for internal usage.

This change is a pure refactoring and should not have any noticeable
impact. It clarifies the developer intent and help to ensure that a
stream is not automatically opened when not desired.
This commit is contained in:
Amaury Denoyelle 2022-07-04 15:50:33 +02:00
parent 3abeb57909
commit a509ffb505
4 changed files with 187 additions and 112 deletions

View File

@ -37,7 +37,6 @@ struct qcc {
struct {
uint64_t max_streams; /* maximum number of concurrent streams */
uint64_t largest_id; /* Largest ID of the open streams */
uint64_t nb_streams; /* Number of open streams */
struct {
uint64_t max_data; /* Maximum number of bytes which may be received */
@ -80,6 +79,11 @@ struct qcc {
uint64_t sent_offsets; /* sum of all offset sent */
} tx;
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
uint64_t largest_uni_r; /* largest remote uni stream ID opened. */
uint64_t next_bidi_l; /* next stream ID to use for local bidi stream */
uint64_t next_uni_l; /* next stream ID to use for local uni stream */
struct eb_root streams_by_id; /* all active streams by their ID */
struct list send_retry_list; /* list of qcs eligible to send retry */

View File

@ -14,9 +14,7 @@
#include <haproxy/stream.h>
#include <haproxy/xprt_quic-t.h>
struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type);
void qcs_free(struct qcs *qcs);
struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi);
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
@ -71,8 +69,6 @@ static inline int quic_stream_is_bidi(uint64_t id)
return !quic_stream_is_uni(id);
}
struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id);
/* Install the <app_ops> applicative layer of a QUIC connection on mux <qcc>.
* Returns 0 on success else non-zero.
*/

View File

@ -1081,7 +1081,7 @@ static int h3_finalize(void *ctx)
struct h3c *h3c = ctx;
struct qcs *qcs;
qcs = qcs_new(h3c->qcc, 0x3, QCS_SRV_UNI);
qcs = qcc_open_stream_local(h3c->qcc, 0);
if (!qcs)
return 0;

View File

@ -113,7 +113,7 @@ static void qcc_emit_cc(struct qcc *qcc, int err)
}
/* Allocate a new QUIC streams with id <id> and type <type>. */
struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
{
struct qcs *qcs;
@ -201,7 +201,7 @@ static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
* error or connection shutdown. Else use qcs_destroy which handle all the
* QUIC connection mechanism.
*/
void qcs_free(struct qcs *qcs)
static void qcs_free(struct qcs *qcs)
{
qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
b_free(&qcs->tx.buf);
@ -291,92 +291,180 @@ void qcs_notify_send(struct qcs *qcs)
}
}
/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
* several streams, depending on the already open ones.
* Return this node if succeeded, NULL if not.
/* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a
* bidirectional stream, else an unidirectional stream is opened. The next
* available ID on the connection will be used according to the stream type.
*
* Returns the allocated stream instance or NULL on error.
*/
struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi)
{
struct qcs *qcs;
enum qcs_type type;
uint64_t *next;
TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
if (bidi) {
next = &qcc->next_bidi_l;
type = conn_is_back(qcc->conn) ? QCS_CLT_BIDI : QCS_SRV_BIDI;
}
else {
next = &qcc->next_uni_l;
type = conn_is_back(qcc->conn) ? QCS_CLT_UNI : QCS_SRV_UNI;
}
/* TODO ensure that we won't overflow remote peer flow control limit on
* streams. Else, we should emit a STREAMS_BLOCKED frame.
*/
qcs = qcs_new(qcc, *next, type);
if (!qcs)
return NULL;
TRACE_DEVEL("opening local stream", QMUX_EV_QCS_NEW, qcc->conn, qcs);
*next += 4;
TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
return qcs;
}
/* Open a remote initiated stream for the connection <qcc> with ID <id>. The
* caller is responsible to ensure that a stream with the same ID was not
* already opened. This function will also create all intermediaries streams
* with ID smaller than <id> not already opened before.
*
* Returns the allocated stream instance or NULL on error.
*/
static struct qcs *qcc_open_stream_remote(struct qcc *qcc, uint64_t id)
{
struct qcs *qcs = NULL;
enum qcs_type type;
uint64_t *largest;
TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
BUG_ON_HOT(quic_stream_is_local(qcc, id));
if (quic_stream_is_bidi(id)) {
largest = &qcc->largest_bidi_r;
type = conn_is_back(qcc->conn) ? QCS_SRV_BIDI : QCS_CLT_BIDI;
}
else {
largest = &qcc->largest_uni_r;
type = conn_is_back(qcc->conn) ? QCS_SRV_UNI : QCS_CLT_UNI;
}
/* TODO also checks max-streams for uni streams */
if (quic_stream_is_bidi(id)) {
if (id >= qcc->lfctl.ms_bidi * 4) {
/* RFC 9000 4.6. Controlling Concurrency
*
* An endpoint that receives a frame with a
* stream ID exceeding the limit it has sent
* MUST treat this as a connection error of
* type STREAM_LIMIT_ERROR
*/
TRACE_DEVEL("leaving on flow control error", QMUX_EV_QCS_NEW, qcc->conn);
qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
return NULL;
}
}
/* Only stream ID not already opened can be used. */
BUG_ON(id < *largest);
while (id >= *largest) {
const char *str = *largest < id ? "opening intermediary stream" :
"opening remote stream";
qcs = qcs_new(qcc, *largest, type);
if (!qcs) {
TRACE_DEVEL("leaving on stream fallocation failure", QMUX_EV_QCS_NEW, qcc->conn);
return NULL;
}
TRACE_DEVEL(str, QMUX_EV_QCS_NEW, qcc->conn, qcs);
*largest += 4;
}
TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
return qcs;
}
/* Use this function for a stream <id> which is not in <qcc> stream tree. It
* returns true if the associated stream is closed.
*/
static int qcc_stream_id_is_closed(struct qcc *qcc, uint64_t id)
{
uint64_t *largest;
/* This function must only be used for stream not present in the stream tree. */
BUG_ON_HOT(eb64_lookup(&qcc->streams_by_id, id));
if (quic_stream_is_local(qcc, id)) {
largest = quic_stream_is_uni(id) ? &qcc->next_uni_l :
&qcc->next_bidi_l;
}
else {
largest = quic_stream_is_uni(id) ? &qcc->largest_uni_r :
&qcc->largest_bidi_r;
}
return id < *largest;
}
/* Retrieve the stream instance from <id> ID. This can be used when receiving
* STREAM, STREAM_DATA_BLOCKED, RESET_STREAM, MAX_STREAM_DATA or STOP_SENDING
* frames.
*
* Return the stream instance or NULL if not found.
*/
static struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
{
unsigned int strm_type;
int64_t sub_id;
struct eb64_node *node;
struct qcs *qcs = NULL;
strm_type = id & QCS_ID_TYPE_MASK;
sub_id = id >> QCS_ID_TYPE_SHIFT;
node = NULL;
if (quic_stream_is_local(qcc, id)) {
/* Local streams: this stream must be already opened. */
node = eb64_lookup(&qcc->streams_by_id, id);
if (!node) {
/* unknown stream id */
goto out;
}
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
/* Search the stream in the connection tree. */
node = eb64_lookup(&qcc->streams_by_id, id);
if (node) {
qcs = eb64_entry(node, struct qcs, by_id);
TRACE_DEVEL("using stream from connection tree", QMUX_EV_QCC_RECV, qcc->conn, qcs);
return qcs;
}
/* Check if stream is already closed. */
if (qcc_stream_id_is_closed(qcc, id)) {
TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
return NULL;
}
/* Create the stream. This is valid only for remote initiated one. A
* local stream must have already been explicitely created by the
* application protocol layer.
*/
if (quic_stream_is_local(qcc, id)) {
/* RFC 9000 19.8. STREAM Frames
*
* An endpoint MUST terminate the connection with error
* STREAM_STATE_ERROR if it receives a STREAM frame for a locally
* initiated stream that has not yet been created, or for a send-only
* stream.
*/
TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
return NULL;
}
else {
/* Remote streams. */
struct eb_root *strms;
uint64_t largest_id;
enum qcs_type qcs_type;
strms = &qcc->streams_by_id;
qcs_type = qcs_id_type(id);
/* TODO also checks max-streams for uni streams */
if (quic_stream_is_bidi(id)) {
if (sub_id + 1 > qcc->lfctl.ms_bidi) {
/* RFC 9000 4.6. Controlling Concurrency
*
* An endpoint that receives a frame with a
* stream ID exceeding the limit it has sent
* MUST treat this as a connection error of
* type STREAM_LIMIT_ERROR
*/
qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
goto out;
}
}
/* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
* correct value.
*/
largest_id = qcc->strms[qcs_type].largest_id;
if (sub_id > (int64_t)largest_id) {
/* RFC: "A stream ID that is used out of order results in all streams
* of that type with lower-numbered stream IDs also being opened".
* So, let's "open" these streams.
*/
int64_t i;
struct qcs *tmp_qcs;
tmp_qcs = NULL;
for (i = largest_id + 1; i <= sub_id; i++) {
uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
tmp_qcs = qcs_new(qcc, id, type);
if (!tmp_qcs) {
/* allocation failure */
goto out;
}
qcc->strms[qcs_type].largest_id = i;
}
if (tmp_qcs)
qcs = tmp_qcs;
}
else {
node = eb64_lookup(strms, id);
if (node)
qcs = eb64_entry(node, struct qcs, by_id);
}
/* Remote stream not found - try to open it. */
qcs = qcc_open_stream_remote(qcc, id);
}
return qcs;
out:
return NULL;
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
return qcs;
}
/* Simple function to duplicate a buffer */
@ -515,30 +603,8 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
}
qcs = qcc_get_qcs(qcc, id);
if (!qcs) {
if ((id >> QCS_ID_TYPE_SHIFT) <= qcc->strms[qcs_id_type(id)].largest_id) {
TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
return 0;
}
else {
/* RFC 9000 19.8. STREAM Frames
*
* An endpoint MUST terminate the connection with error
* STREAM_STATE_ERROR if it receives a STREAM frame for a locally
* initiated stream that has not yet been created, or for a send-only
* stream.
*/
if (quic_stream_is_local(qcc, id)) {
TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
return 1;
}
else {
TRACE_DEVEL("leaving on stream not found", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
return 1;
}
}
}
if (!qcs)
return 0;
if (offset + len <= qcs->rx.offset) {
TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
@ -1370,26 +1436,22 @@ static int qc_init(struct connection *conn, struct proxy *prx,
/* Client initiated streams must respect the server flow control. */
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
qcc->strms[QCS_CLT_BIDI].nb_streams = 0;
qcc->strms[QCS_CLT_BIDI].largest_id = -1;
qcc->strms[QCS_CLT_BIDI].rx.max_data = 0;
qcc->strms[QCS_CLT_BIDI].tx.max_data = lparams->initial_max_stream_data_bidi_remote;
qcc->strms[QCS_CLT_UNI].max_streams = lparams->initial_max_streams_uni;
qcc->strms[QCS_CLT_UNI].nb_streams = 0;
qcc->strms[QCS_CLT_UNI].largest_id = -1;
qcc->strms[QCS_CLT_UNI].rx.max_data = 0;
qcc->strms[QCS_CLT_UNI].tx.max_data = lparams->initial_max_stream_data_uni;
/* Server initiated streams must respect the server flow control. */
qcc->strms[QCS_SRV_BIDI].max_streams = 0;
qcc->strms[QCS_SRV_BIDI].nb_streams = 0;
qcc->strms[QCS_SRV_BIDI].largest_id = -1;
qcc->strms[QCS_SRV_BIDI].rx.max_data = lparams->initial_max_stream_data_bidi_local;
qcc->strms[QCS_SRV_BIDI].tx.max_data = 0;
qcc->strms[QCS_SRV_UNI].max_streams = 0;
qcc->strms[QCS_SRV_UNI].nb_streams = 0;
qcc->strms[QCS_SRV_UNI].largest_id = -1;
qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni;
qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
@ -1407,6 +1469,19 @@ static int qc_init(struct connection *conn, struct proxy *prx,
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
if (conn_is_back(conn)) {
qcc->next_bidi_l = 0x00;
qcc->largest_bidi_r = 0x01;
qcc->next_uni_l = 0x02;
qcc->largest_uni_r = 0x03;
}
else {
qcc->largest_bidi_r = 0x00;
qcc->next_bidi_l = 0x01;
qcc->largest_uni_r = 0x02;
qcc->next_uni_l = 0x03;
}
qcc->wait_event.tasklet = tasklet_new();
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;