MEDIUM: mux-quic: implement API to ignore txbuf limit for some streams

Define a new qc_stream_desc flag QC_SD_FL_OOB_BUF. This is to mark
streams which are not subject to the connection limit on allocated MUX
stream buffer.

The purpose is to simplify handling of QUIC MUX streams which do not
transfer data and as such are not driven by haproxy layer, for example
HTTP/3 control stream. These streams interacts synchronously with QUIC
MUX and cannot retry emission in case of temporary failure.

This commit will be useful once connection buffer allocation limit is
reimplemented to directly rely on the congestion window size. This will
probably cause the buffer limit to be reached more frequently, maybe
even on QUIC MUX initialization. As such, it will be possible to mark
control streams and prevent them to be subject to the buffer limit.

QUIC MUX expose a new function qcs_send_metadata(). It can be used by an
application protocol to specify which streams are used for control
exchanges. For the moment, no such stream use this mechanism.
This commit is contained in:
Amaury Denoyelle 2024-08-19 10:22:02 +02:00
parent f4d1bd0b76
commit 4c4bf26f44
4 changed files with 43 additions and 17 deletions

View File

@ -15,6 +15,7 @@
void qcc_set_error(struct qcc *qcc, int err, int app); void qcc_set_error(struct qcc *qcc, int err, int app);
int qcc_report_glitch(struct qcc *qcc, int inc); int qcc_report_glitch(struct qcc *qcc, int inc);
struct qcs *qcc_init_stream_local(struct qcc *qcc, int bidi); struct qcs *qcc_init_stream_local(struct qcc *qcc, int bidi);
void qcs_send_metadata(struct qcs *qcs);
struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin); struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin);
int qcs_is_close_local(struct qcs *qcs); int qcs_is_close_local(struct qcs *qcs);
int qcs_is_close_remote(struct qcs *qcs); int qcs_is_close_remote(struct qcs *qcs);

View File

@ -21,6 +21,7 @@ struct qc_stream_buf {
#define QC_SD_FL_RELEASE 0x00000001 /* set when MUX has finished to use this stream */ #define QC_SD_FL_RELEASE 0x00000001 /* set when MUX has finished to use this stream */
#define QC_SD_FL_WAIT_FOR_FIN 0x00000002 /* set if sent FIN is waiting for acknowledgement */ #define QC_SD_FL_WAIT_FOR_FIN 0x00000002 /* set if sent FIN is waiting for acknowledgement */
#define QC_SD_FL_OOB_BUF 0x00000004 /* buffers not accounted against conn limit */
/* QUIC STREAM descriptor. /* QUIC STREAM descriptor.
* *

View File

@ -715,6 +715,19 @@ static struct qcs *qcc_init_stream_remote(struct qcc *qcc, uint64_t id)
return NULL; return NULL;
} }
/* Mark <qcs> as reserved for metadata transfer. As such, future txbuf
* allocation won't be accounted against connection limit.
*/
void qcs_send_metadata(struct qcs *qcs)
{
/* Reserved for stream with Tx capability. */
BUG_ON(!qcs->stream);
/* Cannot use if some data already transferred for this stream. */
BUG_ON(!LIST_ISEMPTY(&qcs->stream->buf_list));
qcs->stream->flags |= QC_SD_FL_OOB_BUF;
}
struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin) struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin)
{ {
struct qcc *qcc = qcs->qcc; struct qcc *qcc = qcs->qcc;
@ -1004,6 +1017,10 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
* cause when the buffer cannot be allocated. It is set to 0 if the connection * cause when the buffer cannot be allocated. It is set to 0 if the connection
* buffer limit is reached. For fatal errors, its value is non-zero. * buffer limit is reached. For fatal errors, its value is non-zero.
* *
* Streams reserved for application protocol metadata transfer are not subject
* to the buffer limit per connection. Hence, for them only a memory error
* can prevent a buffer allocation.
*
* Returns buffer pointer. May be NULL on allocation failure, in which case * Returns buffer pointer. May be NULL on allocation failure, in which case
* <err> will refer to the cause. * <err> will refer to the cause.
*/ */
@ -1011,6 +1028,7 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
{ {
struct qcc *qcc = qcs->qcc; struct qcc *qcc = qcs->qcc;
struct buffer *out = qc_stream_buf_get(qcs->stream); struct buffer *out = qc_stream_buf_get(qcs->stream);
const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF;
/* Stream must not try to reallocate a buffer if currently waiting for one. */ /* Stream must not try to reallocate a buffer if currently waiting for one. */
BUG_ON(LIST_INLIST(&qcs->el_buf)); BUG_ON(LIST_INLIST(&qcs->el_buf));
@ -1018,18 +1036,20 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
*err = 0; *err = 0;
if (!out) { if (!out) {
if (qcc->flags & QC_CF_CONN_FULL) { if (likely(!unlimited)) {
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); if ((qcc->flags & QC_CF_CONN_FULL)) {
tot_time_start(&qcs->timer.buf); LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
goto out; tot_time_start(&qcs->timer.buf);
} goto out;
}
if (!qcc->tx.avail_bufs) { if (!qcc->tx.avail_bufs) {
TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs); TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
tot_time_start(&qcs->timer.buf); tot_time_start(&qcs->timer.buf);
qcc->flags |= QC_CF_CONN_FULL; qcc->flags |= QC_CF_CONN_FULL;
goto out; goto out;
}
} }
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real); out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real);
@ -1039,7 +1059,8 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
goto out; goto out;
} }
--qcc->tx.avail_bufs; if (likely(!unlimited))
--qcc->tx.avail_bufs;
} }
out: out:

View File

@ -42,8 +42,10 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
/* notify MUX about available buffers. */ /* notify MUX about available buffers. */
if (qc->mux_state == QC_MUX_READY) { if (qc->mux_state == QC_MUX_READY) {
/* notify MUX about available buffers. */ if (!(stream->flags & QC_SD_FL_OOB_BUF)) {
qcc_notify_buf(qc->qcc, 1); /* notify MUX about available buffers. */
qcc_notify_buf(qc->qcc, 1);
}
} }
} }
@ -208,7 +210,6 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
b_free(&buf->buf); b_free(&buf->buf);
LIST_DELETE(&buf->list); LIST_DELETE(&buf->list);
pool_free(pool_head_quic_stream_buf, buf); pool_free(pool_head_quic_stream_buf, buf);
++free_count; ++free_count;
} }
} }
@ -217,8 +218,10 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
offer_buffers(NULL, free_count); offer_buffers(NULL, free_count);
if (qc->mux_state == QC_MUX_READY) { if (qc->mux_state == QC_MUX_READY) {
/* notify MUX about available buffers. */ if (!(stream->flags & QC_SD_FL_OOB_BUF)) {
qcc_notify_buf(qc->qcc, free_count); /* notify MUX about available buffers. */
qcc_notify_buf(qc->qcc, free_count);
}
} }
} }