diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index ef45d664c..bca0444cf 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -44,6 +44,9 @@ struct qcc { /* Flow-control related fields which are enforced on our side. */ struct { + uint64_t max_bidi_streams; /* max sub-ID of bidi stream allowed for the peer */ + uint64_t initial_max_bidi_streams; /* max initial sub-ID of bidi stream allowed for the peer */ + uint64_t closed_bidi_streams; /* total count of closed bidi stream since last MAX_STREAMS emission */ } lfctl; /* Flow-control related fields from the endpoint which we must respect. */ diff --git a/src/mux_quic.c b/src/mux_quic.c index cf1d72e9c..e3ad4eb9c 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -130,9 +130,13 @@ struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id) strms = &qcc->streams_by_id; qcs_type = qcs_id_type(id); - if (sub_id + 1 > qcc->strms[qcs_type].max_streams) { - /* streams limit reached */ - goto out; + + /* TODO also checks max-streams for uni streams */ + if (quic_stream_is_bidi(id)) { + if (sub_id + 1 > qcc->lfctl.max_bidi_streams) { + /* streams limit reached */ + goto out; + } } /* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a @@ -258,11 +262,26 @@ int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) return 0; } +static int qc_is_max_streams_needed(struct qcc *qcc) +{ + return qcc->lfctl.closed_bidi_streams > qcc->lfctl.initial_max_bidi_streams / 2; +} + /* detaches the QUIC stream from its QCC and releases it to the QCS pool. */ static void qcs_destroy(struct qcs *qcs) { + const uint64_t id = qcs->by_id.key; + fprintf(stderr, "%s: release stream %llu\n", __func__, qcs->by_id.key); + if (quic_stream_is_remote(qcs->qcc, id)) { + if (quic_stream_is_bidi(id)) { + ++qcs->qcc->lfctl.closed_bidi_streams; + if (qc_is_max_streams_needed(qcs->qcc)) + tasklet_wakeup(qcs->qcc->wait_event.tasklet); + } + } + eb64_delete(&qcs->by_id); b_free(&qcs->rx.buf); @@ -491,12 +510,45 @@ static int qc_release_detached_streams(struct qcc *qcc) return release; } +/* Send a MAX_STREAM_BIDI frame to update the limit of bidirectional streams + * allowed to be opened by the peer. The caller should have first checked if + * this is required with qc_is_max_streams_needed. + * + * Returns 0 on success else non-zero. + */ +static int qc_send_max_streams(struct qcc *qcc) +{ + struct list frms = LIST_HEAD_INIT(frms); + struct quic_frame *frm; + + frm = pool_zalloc(pool_head_quic_frame); + BUG_ON(!frm); /* TODO handle this properly */ + + frm->type = QUIC_FT_MAX_STREAMS_BIDI; + frm->max_streams_bidi.max_streams = qcc->lfctl.max_bidi_streams + + qcc->lfctl.closed_bidi_streams; + fprintf(stderr, "SET MAX_STREAMS %lu\n", frm->max_streams_bidi.max_streams); + LIST_APPEND(&frms, &frm->list); + + if (qc_send_frames(qcc, &frms)) + return 1; + + /* save the new limit if the frame has been send. */ + qcc->lfctl.max_bidi_streams += qcc->lfctl.closed_bidi_streams; + qcc->lfctl.closed_bidi_streams = 0; + + return 0; +} + static struct task *qc_io_cb(struct task *t, void *ctx, unsigned int status) { struct qcc *qcc = ctx; fprintf(stderr, "%s\n", __func__); + if (qc_is_max_streams_needed(qcc)) + qc_send_max_streams(qcc); + qc_send(qcc); if (qc_release_detached_streams(qcc)) { @@ -595,6 +647,9 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni; qcc->strms[QCS_SRV_UNI].tx.max_data = 0; + qcc->lfctl.max_bidi_streams = qcc->lfctl.initial_max_bidi_streams = lparams->initial_max_streams_bidi; + qcc->lfctl.closed_bidi_streams = 0; + qcc->wait_event.tasklet = tasklet_new(); if (!qcc->wait_event.tasklet) goto fail_no_tasklet;