mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 22:31:28 +02:00
MINOR: mux-quic: implement subscribe on stream
Implement the subscription in the mux on the qcs instance. Subscribe is now used by the h3 layer when receiving an incomplete frame on the H3 control stream. It is also used when attaching the remote uni-directional streams on the h3 layer. In the qc_send, the mux wakes up the qcs for each new transfer executed. This is done via the method qcs_notify_send(). The xprt wakes up the qcs when receiving data on unidirectional streams. This is done via the method qcs_notify_recv().
This commit is contained in:
parent
c2025c1ec6
commit
a3f222dc1e
@ -15,6 +15,10 @@ void uni_qcs_free(struct qcs *qcs);
|
|||||||
|
|
||||||
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
|
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
|
||||||
|
|
||||||
|
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
|
||||||
|
void qcs_notify_recv(struct qcs *qcs);
|
||||||
|
void qcs_notify_send(struct qcs *qcs);
|
||||||
|
|
||||||
/* Bit shift to get the stream sub ID for internal use which is obtained
|
/* Bit shift to get the stream sub ID for internal use which is obtained
|
||||||
* shifting the stream IDs by this value, knowing that the
|
* shifting the stream IDs by this value, knowing that the
|
||||||
* QCS_ID_TYPE_SHIFT less significant bits identify the stream ID
|
* QCS_ID_TYPE_SHIFT less significant bits identify the stream ID
|
||||||
|
13
src/h3.c
13
src/h3.c
@ -322,9 +322,12 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
|
|||||||
b_del(rxbuf, flen);
|
b_del(rxbuf, flen);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO handle the case when the buffer is not empty. This can happens
|
/* Handle the case where remaining data are present in the buffer. This
|
||||||
* if there is an incomplete frame.
|
* can happen if there is an incomplete frame. In this case, subscribe
|
||||||
|
* on the lower layer to restart receive operation.
|
||||||
*/
|
*/
|
||||||
|
if (b_data(rxbuf))
|
||||||
|
qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -658,7 +661,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
|
|||||||
|
|
||||||
h3->rctrl.qcs = qcs;
|
h3->rctrl.qcs = qcs;
|
||||||
h3->rctrl.cb = h3_control_recv;
|
h3->rctrl.cb = h3_control_recv;
|
||||||
// TODO wake-up rctrl tasklet on reception
|
qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event);
|
||||||
break;
|
break;
|
||||||
case H3_UNI_STRM_TP_PUSH_STREAM:
|
case H3_UNI_STRM_TP_PUSH_STREAM:
|
||||||
/* NOT SUPPORTED */
|
/* NOT SUPPORTED */
|
||||||
@ -671,7 +674,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
|
|||||||
|
|
||||||
h3->rqpack_enc.qcs = qcs;
|
h3->rqpack_enc.qcs = qcs;
|
||||||
h3->rqpack_enc.cb = qpack_decode_enc;
|
h3->rqpack_enc.cb = qpack_decode_enc;
|
||||||
// TODO wake-up rqpack_enc tasklet on reception
|
qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_enc.wait_event);
|
||||||
break;
|
break;
|
||||||
case H3_UNI_STRM_TP_QPACK_DECODER:
|
case H3_UNI_STRM_TP_QPACK_DECODER:
|
||||||
if (h3->rqpack_dec.qcs) {
|
if (h3->rqpack_dec.qcs) {
|
||||||
@ -681,7 +684,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
|
|||||||
|
|
||||||
h3->rqpack_dec.qcs = qcs;
|
h3->rqpack_dec.qcs = qcs;
|
||||||
h3->rqpack_dec.cb = qpack_decode_dec;
|
h3->rqpack_dec.cb = qpack_decode_dec;
|
||||||
// TODO wake-up rqpack_dec tasklet on reception
|
qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_dec.wait_event);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
/* Error */
|
/* Error */
|
||||||
|
@ -84,6 +84,39 @@ struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr)
|
|||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "%s\n", __func__);
|
||||||
|
|
||||||
|
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
|
||||||
|
BUG_ON(qcs->subs && qcs->subs != es);
|
||||||
|
|
||||||
|
es->events |= event_type;
|
||||||
|
qcs->subs = es;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void qcs_notify_recv(struct qcs *qcs)
|
||||||
|
{
|
||||||
|
if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) {
|
||||||
|
tasklet_wakeup(qcs->subs->tasklet);
|
||||||
|
qcs->subs->events &= ~SUB_RETRY_RECV;
|
||||||
|
if (!qcs->subs->events)
|
||||||
|
qcs->subs = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void qcs_notify_send(struct qcs *qcs)
|
||||||
|
{
|
||||||
|
if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) {
|
||||||
|
tasklet_wakeup(qcs->subs->tasklet);
|
||||||
|
qcs->subs->events &= ~SUB_RETRY_SEND;
|
||||||
|
if (!qcs->subs->events)
|
||||||
|
qcs->subs = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
|
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
|
||||||
{
|
{
|
||||||
struct quic_frame *frm;
|
struct quic_frame *frm;
|
||||||
@ -157,6 +190,9 @@ static int qc_send(struct qcc *qcc)
|
|||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
ABORT_NOW();
|
ABORT_NOW();
|
||||||
|
|
||||||
|
if (ret > 0)
|
||||||
|
qcs_notify_send(qcs);
|
||||||
|
|
||||||
/* TODO wake-up xprt if data were transfered */
|
/* TODO wake-up xprt if data were transfered */
|
||||||
|
|
||||||
fprintf(stderr, "%s ret=%d\n", __func__, ret);
|
fprintf(stderr, "%s ret=%d\n", __func__, ret);
|
||||||
@ -323,8 +359,7 @@ static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf,
|
|||||||
static int qc_subscribe(struct conn_stream *cs, int event_type,
|
static int qc_subscribe(struct conn_stream *cs, int event_type,
|
||||||
struct wait_event *es)
|
struct wait_event *es)
|
||||||
{
|
{
|
||||||
/* XXX TODO XXX */
|
return qcs_subscribe(cs->ctx, event_type, es);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
|
/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
|
||||||
|
@ -2149,6 +2149,9 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ret)
|
||||||
|
qcs_notify_recv(strm);
|
||||||
|
|
||||||
strm_frm->offset.key += ret;
|
strm_frm->offset.key += ret;
|
||||||
}
|
}
|
||||||
/* Take this frame into an account for the stream flow control */
|
/* Take this frame into an account for the stream flow control */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user