diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index aa8572c14..9e7ef175f 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -15,6 +15,10 @@ void uni_qcs_free(struct qcs *qcs); struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr); +int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); +void qcs_notify_recv(struct qcs *qcs); +void qcs_notify_send(struct qcs *qcs); + /* Bit shift to get the stream sub ID for internal use which is obtained * shifting the stream IDs by this value, knowing that the * QCS_ID_TYPE_SHIFT less significant bits identify the stream ID diff --git a/src/h3.c b/src/h3.c index ecaa02d49..359b27676 100644 --- a/src/h3.c +++ b/src/h3.c @@ -322,9 +322,12 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx) b_del(rxbuf, flen); } - /* TODO handle the case when the buffer is not empty. This can happens - * if there is an incomplete frame. + /* Handle the case where remaining data are present in the buffer. This + * can happen if there is an incomplete frame. In this case, subscribe + * on the lower layer to restart receive operation. */ + if (b_data(rxbuf)) + qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event); return 1; } @@ -658,7 +661,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx) h3->rctrl.qcs = qcs; h3->rctrl.cb = h3_control_recv; - // TODO wake-up rctrl tasklet on reception + qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event); break; case H3_UNI_STRM_TP_PUSH_STREAM: /* NOT SUPPORTED */ @@ -671,7 +674,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx) h3->rqpack_enc.qcs = qcs; h3->rqpack_enc.cb = qpack_decode_enc; - // TODO wake-up rqpack_enc tasklet on reception + qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_enc.wait_event); break; case H3_UNI_STRM_TP_QPACK_DECODER: if (h3->rqpack_dec.qcs) { @@ -681,7 +684,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx) h3->rqpack_dec.qcs = qcs; h3->rqpack_dec.cb = qpack_decode_dec; - // TODO wake-up rqpack_dec tasklet on reception + qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_dec.wait_event); break; default: /* Error */ diff --git a/src/mux_quic.c b/src/mux_quic.c index df5868637..3b194f687 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -84,6 +84,39 @@ struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr) return buf; } +int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es) +{ + fprintf(stderr, "%s\n", __func__); + + BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV)); + BUG_ON(qcs->subs && qcs->subs != es); + + es->events |= event_type; + qcs->subs = es; + + return 0; +} + +void qcs_notify_recv(struct qcs *qcs) +{ + if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) { + tasklet_wakeup(qcs->subs->tasklet); + qcs->subs->events &= ~SUB_RETRY_RECV; + if (!qcs->subs->events) + qcs->subs = NULL; + } +} + +void qcs_notify_send(struct qcs *qcs) +{ + if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) { + tasklet_wakeup(qcs->subs->tasklet); + qcs->subs->events &= ~SUB_RETRY_SEND; + if (!qcs->subs->events) + qcs->subs = NULL; + } +} + static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset) { struct quic_frame *frm; @@ -157,6 +190,9 @@ static int qc_send(struct qcc *qcc) if (ret < 0) ABORT_NOW(); + if (ret > 0) + qcs_notify_send(qcs); + /* TODO wake-up xprt if data were transfered */ fprintf(stderr, "%s ret=%d\n", __func__, ret); @@ -323,8 +359,7 @@ static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf, static int qc_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - /* XXX TODO XXX */ - return 0; + return qcs_subscribe(cs->ctx, event_type, es); } /* Called from the upper layer, to unsubscribe from events . diff --git a/src/xprt_quic.c b/src/xprt_quic.c index 96d409fa7..d70be2cc5 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -2149,6 +2149,9 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt, return 0; } + if (ret) + qcs_notify_recv(strm); + strm_frm->offset.key += ret; } /* Take this frame into an account for the stream flow control */