diff --git a/src/mux_quic.c b/src/mux_quic.c index 56674a78a..7956f16b8 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -397,18 +397,6 @@ static void qcc_refresh_timeout(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn); } -void qcc_wakeup(struct qcc *qcc) -{ - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - tasklet_wakeup(qcc->wait_event.tasklet); -} - -static void qcc_wakeup_pacing(struct qcc *qcc) -{ - HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); - tasklet_wakeup(qcc->wait_event.tasklet); -} - /* Mark a stream as open if it was idle. This can be used on every * successful emission/reception operation to update the stream state. */ @@ -755,7 +743,7 @@ void qcc_set_error(struct qcc *qcc, int err, int app) * is too tedious too not forget a wakeup outside of this function for * the moment. */ - qcc_wakeup(qcc); + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); } /* Increment glitch counter for connection by steps. If configured @@ -1109,7 +1097,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes) frm->max_stream_data.max_stream_data = qcs->rx.msd; LIST_APPEND(&qcc->lfctl.frms, &frm->list); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } conn_fctl: @@ -1127,7 +1115,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes) frm->max_data.max_data = qcc->lfctl.md; LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs); @@ -1397,7 +1385,7 @@ void qcc_reset_stream(struct qcs *qcs, int err) } qcc_send_stream(qcs, 1, 0); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } /* Register stream for emission of STREAM, STOP_SENDING or RESET_STREAM. @@ -1450,7 +1438,7 @@ void qcc_abort_stream_read(struct qcs *qcs) qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED); qcc_send_stream(qcs, 1, 0); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); end: TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs); @@ -1483,7 +1471,7 @@ int qcc_install_app_ops(struct qcc *qcc, const struct qcc_app_ops *app_ops) TRACE_ERROR("app ops finalize error", QMUX_EV_QCC_NEW, qcc->conn); goto err; } - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn); @@ -1663,7 +1651,7 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max) TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn); if (unblock_real) - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); if (unblock_soft) qcc_notify_fctl(qcc); @@ -1712,7 +1700,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); if (unblock_real) { /* TODO optim: only wakeup IO-CB if stream has data to sent. */ - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } if (unblock_soft) { @@ -1965,7 +1953,7 @@ static int qcc_release_remote_stream(struct qcc *qcc, uint64_t id) frm->max_streams_bidi.max_streams = qcc->lfctl.ms_bidi + qcc->lfctl.cl_bidi_r; LIST_APPEND(&qcc->lfctl.frms, &frm->list); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); qcc->lfctl.ms_bidi += qcc->lfctl.cl_bidi_r; qcc->lfctl.cl_bidi_r = 0; @@ -2523,8 +2511,9 @@ static int qcc_io_send(struct qcc *qcc) if (qcc_is_pacing_active(qcc->conn)) { if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { - qcc_wakeup_pacing(qcc); - return 1; + tasklet_wakeup(qcc->wait_event.tasklet); + total = 0; + goto out; } } @@ -2564,13 +2553,9 @@ static int qcc_io_send(struct qcc *qcc) if (ret == 1) { /* qcc_send_frames cannot return 1 if pacing not used. */ BUG_ON(!qcc_is_pacing_active(qcc->conn)); - qcc_wakeup_pacing(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); ++qcc->tx.paced_sent_ctr; } - else if (LIST_ISEMPTY(frms)) { - /* Everything sent */ - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - } out: /* Re-insert on-error QCS at the end of the send-list. */ @@ -2581,7 +2566,7 @@ static int qcc_io_send(struct qcc *qcc) } if (!qfctl_rblocked(&qcc->tx.fc)) - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) { @@ -2878,59 +2863,12 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } -static void qcc_purge_sending(struct qcc *qcc) -{ - struct quic_pacer *pacer = &qcc->tx.pacer; - struct list *frms = &qcc->tx.frms; - enum quic_tx_err ret = QUIC_TX_ERR_PACING; - int sent = 0; - - TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - - /* This function is reserved for pacing usage. */ - BUG_ON(!qcc_is_pacing_active(qcc->conn)); - - /* Only restart emission if pacing delay is reached. */ - if (quic_pacing_expired(pacer)) { - ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer); - sent = 1; - } - - if (ret == QUIC_TX_ERR_PACING) { - BUG_ON(LIST_ISEMPTY(frms)); - qcc_wakeup_pacing(qcc); - if (sent) - ++qcc->tx.paced_sent_ctr; - } - else if (ret == QUIC_TX_ERR_FATAL) { - TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - qcc_subscribe_send(qcc); - } - else { - if (!LIST_ISEMPTY(frms)) { - qcc_subscribe_send(qcc); - } - else { - /* Everything sent */ - HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); - } - } - - TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); -} - struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) { struct qcc *qcc = ctx; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - if (status & TASK_F_USR1) { - qcc_purge_sending(qcc); - goto end; - } - if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_io_send(qcc); @@ -2943,7 +2881,6 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) qcc_refresh_timeout(qcc); - end: TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); return NULL; @@ -3139,7 +3076,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list); /* init read cycle */ - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); /* MUX is initialized before QUIC handshake completion if early data * received. Flag connection to delay stream processing if @@ -3301,7 +3238,7 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf, qcs->flags &= ~QC_SF_DEM_FULL; if (!(qcc->flags & QC_CF_ERRL)) { LIST_APPEND(&qcc->recv_list, &qcs->el_recv); - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } } @@ -3366,14 +3303,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, if (data || fin) qcc_send_stream(qcs, 0, data); - /* Wake up MUX to emit newly transferred data. If blocked on - * send, ensure next emission will refresh data by removing - * pacing status info. - */ + /* Wake up MUX to emit newly transferred data. */ if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) - qcc_wakeup(qcs->qcc); - else - HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcs->qcc->wait_event.tasklet); } end: @@ -3496,11 +3428,8 @@ static size_t qmux_strm_done_ff(struct stconn *sc) qcs->sd->iobuf.offset = 0; qcs->sd->iobuf.data = 0; - /* Similar to snd_buf callback. */ if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) - qcc_wakeup(qcc); - else - HA_ATOMIC_AND(&qcs->qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcc->wait_event.tasklet); end: TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); @@ -3598,7 +3527,7 @@ static void qmux_strm_shut(struct stconn *sc, unsigned int mode, struct se_abort qcc_reset_stream(qcs, 0); } - qcc_wakeup(qcc); + tasklet_wakeup(qcc->wait_event.tasklet); } out: diff --git a/src/quic_conn.c b/src/quic_conn.c index c28725af7..8f1b48491 100644 --- a/src/quic_conn.c +++ b/src/quic_conn.c @@ -1816,7 +1816,7 @@ void qc_notify_err(struct quic_conn *qc) * is made between MUX and quic-conn layer, wake up could be * conducted only with qc.subs. */ - qcc_wakeup(qc->qcc); + tasklet_wakeup(qc->qcc->wait_event.tasklet); } TRACE_LEAVE(QUIC_EV_CONN_CLOSE, qc);