diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 44fb4d627..d95e9dc0d 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -70,6 +71,8 @@ struct qcc { struct eb_root streams_by_id; /* all active streams by their ID */ + struct list send_retry_list; /* list of qcs eligible to send retry */ + struct wait_event wait_event; /* To be used if we're waiting for I/Os */ struct wait_event *subs; @@ -111,6 +114,8 @@ struct qcs { uint64_t id; struct qc_stream_desc *stream; + struct list el; /* element of qcc.send_retry_list */ + struct wait_event wait_event; struct wait_event *subs; }; diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index 6d9359d0f..0550f4f0c 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -14,7 +14,6 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len void qc_stream_desc_free(struct qc_stream_desc *stream); struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream); -int qc_stream_buf_avail(struct quic_conn *qc); struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, uint64_t offset); void qc_stream_buf_release(struct qc_stream_desc *stream); diff --git a/src/mux_quic.c b/src/mux_quic.c index b969d3a4b..6a65d6923 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -722,14 +723,9 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) { qc_stream_buf_release(qcs->stream); - - /* reschedule send if buffers available */ - if (qc_stream_buf_avail(qcc->conn->handle.qc)) { - tasklet_wakeup(qcc->wait_event.tasklet); - } - else { - qcc->flags |= QC_CF_CONN_FULL; - } + /* prepare qcs for immediate send retry if data to send */ + if (b_data(&qcs->tx.buf)) + LIST_APPEND(&qcc->send_retry_list, &qcs->el); } } @@ -760,9 +756,11 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms) if (LIST_ISEMPTY(frms)) { TRACE_DEVEL("leaving with no frames to send", QMUX_EV_QCC_SEND, qcc->conn); - return 0; + return 1; } + LIST_INIT(&qcc->send_retry_list); + retry_send: first_frm = LIST_ELEM(frms->n, struct quic_frame *, list); if ((first_frm->type & QUIC_FT_STREAM_8) == QUIC_FT_STREAM_8) { @@ -837,6 +835,65 @@ static int qc_send_max_streams(struct qcc *qcc) return 0; } +/* Used internally by qc_send function. Proceed to send for . This will + * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame + * is then generated and inserted in list. is the current + * flow-control max-data at the connection level which must not be surpassed. + * + * Returns the total bytes transferred between qcs and quic_stream buffers. Can + * be null if out buffer cannot be allocated. + */ +static int _qc_send_qcs(struct qcs *qcs, struct list *frms, + uint64_t qcc_max_data) +{ + struct qcc *qcc = qcs->qcc; + struct buffer *buf = &qcs->tx.buf; + struct buffer *out = qc_stream_buf_get(qcs->stream); + int xfer = 0; + + /* Allocate buffer if necessary. */ + if (!out) { + if (qcc->flags & QC_CF_CONN_FULL) + return 0; + + out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset); + if (!out) { + qcc->flags |= QC_CF_CONN_FULL; + return 0; + } + } + + /* Transfer data from to . */ + if (b_data(buf)) { + xfer = qcs_xfer_data(qcs, out, buf, qcc_max_data); + BUG_ON(xfer < 0); /* TODO handle this properly */ + + if (xfer > 0) { + qcs_notify_send(qcs); + qcs->flags &= ~QC_SF_BLK_MROOM; + } + + qcs->tx.offset += xfer; + } + + /* out buffer cannot be emptied if qcs offsets differ. */ + BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset); + + /* Build a new STREAM frame with buffer. */ + if (qcs->tx.sent_offset != qcs->tx.offset) { + int ret; + char fin = !!(qcs->flags & QC_SF_FIN_STREAM); + + /* FIN is set if all incoming data were transfered. */ + fin = !!(fin && !b_data(buf)); + + ret = qcs_build_stream_frm(qcs, out, fin, frms); + BUG_ON(ret < 0); /* TODO handle this properly */ + } + + return xfer; +} + /* Proceed to sending. Loop through all available streams for the * instance and try to send as much as possible. * @@ -846,7 +903,8 @@ static int qc_send(struct qcc *qcc) { struct list frms = LIST_HEAD_INIT(frms); struct eb64_node *node; - int total = 0; + struct qcs *qcs, *qcs_tmp; + int total = 0, tmp_total = 0; TRACE_ENTER(QMUX_EV_QCC_SEND); @@ -867,9 +925,8 @@ static int qc_send(struct qcc *qcc) */ node = eb64_first(&qcc->streams_by_id); while (node) { - struct qcs *qcs = eb64_entry(node, struct qcs, by_id); - struct buffer *buf = &qcs->tx.buf; - struct buffer *out = qc_stream_buf_get(qcs->stream); + int ret; + qcs = eb64_entry(node, struct qcs, by_id); /* TODO * for the moment, unidirectional streams have their own @@ -886,63 +943,38 @@ static int qc_send(struct qcc *qcc) continue; } - if (!b_data(buf) && !out) { + if (!b_data(&qcs->tx.buf) && !qc_stream_buf_get(qcs->stream)) { node = eb64_next(node); continue; } - if (!out && (qcc->flags & QC_CF_CONN_FULL)) { - node = eb64_next(node); - continue; - } - - if (!out) { - out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset); - if (!out) { - qcc->flags |= QC_CF_CONN_FULL; - node = eb64_next(node); - continue; - } - } - - /* Prepare buffer with data from . */ - if (b_data(buf)) { - int ret = qcs_xfer_data(qcs, out, buf, - qcc->tx.sent_offsets + total); - BUG_ON(ret < 0); /* TODO handle this properly */ - - if (ret > 0) { - qcs_notify_send(qcs); - if (qcs->flags & QC_SF_BLK_MROOM) - qcs->flags &= ~QC_SF_BLK_MROOM; - } - - qcs->tx.offset += ret; - total += ret; - } - - /* Subscribe if not all data can be transfered. */ - if (b_data(buf)) { - qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx, - SUB_RETRY_SEND, &qcc->wait_event); - } - - /* Build a new STREAM frame with buffer. */ - if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) { - int ret; - char fin = !!(qcs->flags & QC_SF_FIN_STREAM); - - /* FIN is set if all incoming data were transfered. */ - fin = !!(fin && !b_data(buf)); - ret = qcs_build_stream_frm(qcs, out, fin, &frms); - BUG_ON(ret < 0); /* TODO handle this properly */ - } - + ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + total); + total += ret; node = eb64_next(node); } - qc_send_frames(qcc, &frms); + if (qc_send_frames(qcc, &frms)) { + /* data rejected by transport layer, do not retry. */ + goto out; + } + retry: + tmp_total = 0; + list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_retry_list, el) { + int ret; + BUG_ON(!b_data(&qcs->tx.buf)); + BUG_ON(qc_stream_buf_get(qcs->stream)); + + ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + tmp_total); + tmp_total += ret; + LIST_DELETE(&qcs->el); + } + + total += tmp_total; + if (!qc_send_frames(qcc, &frms) && !LIST_ISEMPTY(&qcc->send_retry_list)) + goto retry; + + out: TRACE_LEAVE(QMUX_EV_QCC_SEND); return total; @@ -1105,6 +1137,8 @@ static int qc_init(struct connection *conn, struct proxy *prx, if (!qcc->wait_event.tasklet) goto fail_no_tasklet; + LIST_INIT(&qcc->send_retry_list); + qcc->subs = NULL; qcc->wait_event.tasklet->process = qc_io_cb; qcc->wait_event.tasklet->context = qcc; diff --git a/src/quic_stream.c b/src/quic_stream.c index 2dd9b1c4c..839efd086 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -206,7 +206,7 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream) /* Check if a new stream buffer can be allocated for the connection . * Returns a boolean. */ -int qc_stream_buf_avail(struct quic_conn *qc) +static int qc_stream_buf_avail(struct quic_conn *qc) { /* TODO use a global tune settings for max */ return qc->stream_buf_count < 30;