From a3f222dc1e77e02079ef47eea7cc3623ffebdbe8 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Mon, 6 Dec 2021 11:24:00 +0100 Subject: [PATCH] MINOR: mux-quic: implement subscribe on stream Implement the subscription in the mux on the qcs instance. Subscribe is now used by the h3 layer when receiving an incomplete frame on the H3 control stream. It is also used when attaching the remote uni-directional streams on the h3 layer. In the qc_send, the mux wakes up the qcs for each new transfer executed. This is done via the method qcs_notify_send(). The xprt wakes up the qcs when receiving data on unidirectional streams. This is done via the method qcs_notify_recv(). --- include/haproxy/mux_quic.h | 4 ++++ src/h3.c | 13 ++++++++----- src/mux_quic.c | 39 ++++++++++++++++++++++++++++++++++++-- src/xprt_quic.c | 3 +++ 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index aa8572c14..9e7ef175f 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -15,6 +15,10 @@ void uni_qcs_free(struct qcs *qcs); struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr); +int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); +void qcs_notify_recv(struct qcs *qcs); +void qcs_notify_send(struct qcs *qcs); + /* Bit shift to get the stream sub ID for internal use which is obtained * shifting the stream IDs by this value, knowing that the * QCS_ID_TYPE_SHIFT less significant bits identify the stream ID diff --git a/src/h3.c b/src/h3.c index ecaa02d49..359b27676 100644 --- a/src/h3.c +++ b/src/h3.c @@ -322,9 +322,12 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx) b_del(rxbuf, flen); } - /* TODO handle the case when the buffer is not empty. This can happens - * if there is an incomplete frame. + /* Handle the case where remaining data are present in the buffer. This + * can happen if there is an incomplete frame. In this case, subscribe + * on the lower layer to restart receive operation. */ + if (b_data(rxbuf)) + qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event); return 1; } @@ -658,7 +661,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx) h3->rctrl.qcs = qcs; h3->rctrl.cb = h3_control_recv; - // TODO wake-up rctrl tasklet on reception + qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event); break; case H3_UNI_STRM_TP_PUSH_STREAM: /* NOT SUPPORTED */ @@ -671,7 +674,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx) h3->rqpack_enc.qcs = qcs; h3->rqpack_enc.cb = qpack_decode_enc; - // TODO wake-up rqpack_enc tasklet on reception + qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_enc.wait_event); break; case H3_UNI_STRM_TP_QPACK_DECODER: if (h3->rqpack_dec.qcs) { @@ -681,7 +684,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx) h3->rqpack_dec.qcs = qcs; h3->rqpack_dec.cb = qpack_decode_dec; - // TODO wake-up rqpack_dec tasklet on reception + qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_dec.wait_event); break; default: /* Error */ diff --git a/src/mux_quic.c b/src/mux_quic.c index df5868637..3b194f687 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -84,6 +84,39 @@ struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr) return buf; } +int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es) +{ + fprintf(stderr, "%s\n", __func__); + + BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV)); + BUG_ON(qcs->subs && qcs->subs != es); + + es->events |= event_type; + qcs->subs = es; + + return 0; +} + +void qcs_notify_recv(struct qcs *qcs) +{ + if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) { + tasklet_wakeup(qcs->subs->tasklet); + qcs->subs->events &= ~SUB_RETRY_RECV; + if (!qcs->subs->events) + qcs->subs = NULL; + } +} + +void qcs_notify_send(struct qcs *qcs) +{ + if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) { + tasklet_wakeup(qcs->subs->tasklet); + qcs->subs->events &= ~SUB_RETRY_SEND; + if (!qcs->subs->events) + qcs->subs = NULL; + } +} + static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset) { struct quic_frame *frm; @@ -157,6 +190,9 @@ static int qc_send(struct qcc *qcc) if (ret < 0) ABORT_NOW(); + if (ret > 0) + qcs_notify_send(qcs); + /* TODO wake-up xprt if data were transfered */ fprintf(stderr, "%s ret=%d\n", __func__, ret); @@ -323,8 +359,7 @@ static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf, static int qc_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - /* XXX TODO XXX */ - return 0; + return qcs_subscribe(cs->ctx, event_type, es); } /* Called from the upper layer, to unsubscribe from events . diff --git a/src/xprt_quic.c b/src/xprt_quic.c index 96d409fa7..d70be2cc5 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -2149,6 +2149,9 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt, return 0; } + if (ret) + qcs_notify_recv(strm); + strm_frm->offset.key += ret; } /* Take this frame into an account for the stream flow control */