diff --git a/include/haproxy/quic_stream-t.h b/include/haproxy/quic_stream-t.h index ec3f431ba..952d55019 100644 --- a/include/haproxy/quic_stream-t.h +++ b/include/haproxy/quic_stream-t.h @@ -20,6 +20,7 @@ struct qc_stream_buf { }; #define QC_SD_FL_RELEASE 0x00000001 /* set when MUX has finished to use this stream */ +#define QC_SD_FL_WAIT_FOR_FIN 0x00000002 /* set if sent FIN is waiting for acknowledgement */ /* QUIC STREAM descriptor. * diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index 44897287b..7788d9039 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -11,7 +11,7 @@ struct quic_conn; struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx, struct quic_conn *qc); void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size); -int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len); +int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len, int fin); void qc_stream_desc_free(struct qc_stream_desc *stream, int closing); struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream); diff --git a/src/mux_quic.c b/src/mux_quic.c index bcf2b7034..b703c8bfb 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1884,8 +1884,12 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) { /* Close stream locally. */ qcs_close_local(qcs); - /* Reset flag to not emit multiple FIN STREAM frames. */ - qcs->flags &= ~QC_SF_FIN_STREAM; + + if (qcs->flags & QC_SF_FIN_STREAM) { + qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN; + /* Reset flag to not emit multiple FIN STREAM frames. */ + qcs->flags &= ~QC_SF_FIN_STREAM; + } } } diff --git a/src/quic_retransmit.c b/src/quic_retransmit.c index 0ac57cbd5..780b15994 100644 --- a/src/quic_retransmit.c +++ b/src/quic_retransmit.c @@ -21,13 +21,19 @@ int qc_stream_frm_is_acked(struct quic_conn *qc, struct quic_frame *f) { const struct qf_stream *frm = &f->stream; const struct qc_stream_desc *s = frm->stream; + const int frm_fin = f->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT; if (!eb64_lookup(&qc->streams_by_id, frm->id)) { TRACE_DEVEL("STREAM frame already acked : stream released", QUIC_EV_CONN_PRSAFRM, qc, f); return 1; } - if (frm->offset.key + frm->len <= s->ack_offset) { + /* Frame cannot advertise FIN for a smaller data range. */ + BUG_ON(frm_fin && frm->offset.key + frm->len < s->ack_offset); + + if (frm->offset.key + frm->len < s->ack_offset || + (frm->offset.key + frm->len == s->ack_offset && + (!frm_fin || !(s->flags & QC_SD_FL_WAIT_FOR_FIN)))) { TRACE_DEVEL("STREAM frame already acked : fully acked range", QUIC_EV_CONN_PRSAFRM, qc, f); return 1; } diff --git a/src/quic_rx.c b/src/quic_rx.c index 38faafb23..6e21958cc 100644 --- a/src/quic_rx.c +++ b/src/quic_rx.c @@ -218,15 +218,18 @@ static int quic_stream_try_to_consume(struct quic_conn *qc, struct qf_stream *strm_frm; struct quic_frame *frm; size_t offset, len; + int fin; strm_frm = eb64_entry(frm_node, struct qf_stream, offset); + frm = container_of(strm_frm, struct quic_frame, stream); offset = strm_frm->offset.key; len = strm_frm->len; + fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT; if (offset > stream->ack_offset) break; - if (qc_stream_desc_ack(&stream, offset, len)) { + if (qc_stream_desc_ack(&stream, offset, len, fin)) { /* cf. next comment : frame may be freed at this stage. */ TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM, qc, stream ? strm_frm : NULL, stream); @@ -246,7 +249,6 @@ static int quic_stream_try_to_consume(struct quic_conn *qc, frm_node = eb64_next(frm_node); eb64_delete(&strm_frm->offset); - frm = container_of(strm_frm, struct quic_frame, stream); qc_release_frm(qc, frm); } @@ -272,6 +274,7 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f struct qc_stream_desc *stream = NULL; const size_t offset = strm_frm->offset.key; const size_t len = strm_frm->len; + const int fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT; /* do not use strm_frm->stream as the qc_stream_desc instance * might be freed at this stage. Use the id to do a proper @@ -291,7 +294,7 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream); if (offset <= stream->ack_offset) { - if (qc_stream_desc_ack(&stream, offset, len)) { + if (qc_stream_desc_ack(&stream, offset, len, fin)) { TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream); } diff --git a/src/quic_stream.c b/src/quic_stream.c index da071dfbf..9741f8e71 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -17,6 +17,12 @@ DECLARE_STATIC_POOL(pool_head_quic_stream_buf, "qc_stream_buf", sizeof(struct qc_stream_buf)); +/* Returns true if nothing to ack yet for stream including FIN bit. */ +static inline int qc_stream_desc_done(const struct qc_stream_desc *s) +{ + return !(s->flags & QC_SD_FL_WAIT_FOR_FIN) && LIST_ISEMPTY(&s->buf_list); +} + static void qc_stream_buf_free(struct qc_stream_desc *stream, struct qc_stream_buf **stream_buf) { @@ -116,7 +122,7 @@ void qc_stream_desc_release(struct qc_stream_desc *stream, /* Remove unsent data from current buffer. */ if (final_size < tail_offset) { b_sub(buf, tail_offset - final_size); - /* Remove buffer is all ACK already received. */ + /* Remove buffer if all ACK already received. */ if (!b_data(buf)) qc_stream_buf_free(stream, &stream_buf); } @@ -125,53 +131,64 @@ void qc_stream_desc_release(struct qc_stream_desc *stream, stream->buf = NULL; } - if (LIST_ISEMPTY(&stream->buf_list)) { + if (qc_stream_desc_done(stream)) { /* if no buffer left we can free the stream. */ qc_stream_desc_free(stream, 0); } } -/* Acknowledge data at of length for . It is handled - * only if it covers a range corresponding to stream.ack_offset. After data - * removal, if the stream does not contains data any more and is already - * released, the instance stream is freed. is set to NULL to indicate - * this. +/* Acknowledge data at of length for with set for + * the final data. After data removal, if the stream does not contains data + * any more and is already released, the instance stream is freed. is + * set to NULL to indicate this. * * Returns the count of byte removed from stream. Do not forget to check if * is NULL after invocation. */ -int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len) +int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len, + int fin) { struct qc_stream_desc *s = *stream; - struct qc_stream_buf *stream_buf; - struct buffer *buf; + struct qc_stream_buf *stream_buf = NULL; + struct buffer *buf = NULL; size_t diff; - if (offset + len <= s->ack_offset || offset > s->ack_offset) + /* Cannot advertise FIN for an inferior data range. */ + BUG_ON(fin && offset + len < s->ack_offset); + + if (offset + len < s->ack_offset || offset > s->ack_offset) return 0; - /* There must be at least a buffer or we must not report an ACK. */ - BUG_ON(LIST_ISEMPTY(&s->buf_list)); - - /* get oldest buffer from buf_list */ - stream_buf = LIST_NEXT(&s->buf_list, struct qc_stream_buf *, list); - buf = &stream_buf->buf; - diff = offset + len - s->ack_offset; - s->ack_offset += diff; - b_del(buf, diff); + if (diff) { + /* Buf list cannot be empty if there is still unacked data. */ + BUG_ON(LIST_ISEMPTY(&s->buf_list)); - /* Free oldest buffer if all data acknowledged. */ - if (!b_data(buf)) { - qc_stream_buf_free(s, &stream_buf); + /* get oldest buffer from buf_list */ + stream_buf = LIST_NEXT(&s->buf_list, struct qc_stream_buf *, list); + buf = &stream_buf->buf; - /* Free stream instance if already released and no buffers left. */ - if ((s->flags & QC_SD_FL_RELEASE) && LIST_ISEMPTY(&s->buf_list)) { - qc_stream_desc_free(s, 0); - *stream = NULL; + s->ack_offset += diff; + b_del(buf, diff); + + /* Free oldest buffer if all data acknowledged. */ + if (!b_data(buf)) { + qc_stream_buf_free(s, &stream_buf); + buf = NULL; } } + if (fin) { + /* Mark FIN as acknowledged. */ + s->flags &= ~QC_SD_FL_WAIT_FOR_FIN; + } + + /* Free stream instance if already released and everything acknowledged. */ + if ((s->flags & QC_SD_FL_RELEASE) && qc_stream_desc_done(s)) { + qc_stream_desc_free(s, 0); + *stream = NULL; + } + return diff; }