mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-10 00:57:02 +02:00
MEDIUM: quic: handle out-of-order ACK at streamdesc layer
qc_stream_desc_ack() is the entrypoint for streamdesc layer to handle a new acknowledgement of previously emitted STREAM data. Previously, it was only able to deal with in-order ACK offset. The caller was responsible to buffer out-of-order ACKs. Change this by dealing with the latter case directly in qc_stream_desc_ack(). This notably simplify ACK handling in quic_rx module.
This commit is contained in:
parent
62558a9285
commit
cc4384aeb7
@ -7,12 +7,13 @@
|
|||||||
#include <haproxy/quic_stream-t.h>
|
#include <haproxy/quic_stream-t.h>
|
||||||
|
|
||||||
struct quic_conn;
|
struct quic_conn;
|
||||||
|
struct quic_frame;
|
||||||
|
|
||||||
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx,
|
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx,
|
||||||
struct quic_conn *qc);
|
struct quic_conn *qc);
|
||||||
void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size,
|
void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size,
|
||||||
void *new_ctx);
|
void *new_ctx);
|
||||||
int qc_stream_desc_ack(struct qc_stream_desc *stream, size_t offset, size_t len, int fin);
|
int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm);
|
||||||
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
|
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
|
||||||
|
|
||||||
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
|
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
|
||||||
|
@ -225,14 +225,11 @@ static int quic_stream_try_to_consume(struct quic_conn *qc,
|
|||||||
while (frm_node) {
|
while (frm_node) {
|
||||||
struct qf_stream *strm_frm;
|
struct qf_stream *strm_frm;
|
||||||
struct quic_frame *frm;
|
struct quic_frame *frm;
|
||||||
size_t offset, len;
|
size_t offset;
|
||||||
int fin;
|
|
||||||
|
|
||||||
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
|
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
|
||||||
frm = container_of(strm_frm, struct quic_frame, stream);
|
frm = container_of(strm_frm, struct quic_frame, stream);
|
||||||
offset = strm_frm->offset.key;
|
offset = strm_frm->offset.key;
|
||||||
len = strm_frm->len;
|
|
||||||
fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
|
|
||||||
|
|
||||||
if (offset > stream->ack_offset)
|
if (offset > stream->ack_offset)
|
||||||
break;
|
break;
|
||||||
@ -241,13 +238,13 @@ static int quic_stream_try_to_consume(struct quic_conn *qc,
|
|||||||
* stream_buf instance is removed via qc_stream_desc_ack().
|
* stream_buf instance is removed via qc_stream_desc_ack().
|
||||||
*/
|
*/
|
||||||
eb64_delete(frm_node);
|
eb64_delete(frm_node);
|
||||||
qc_release_frm(qc, frm);
|
|
||||||
|
|
||||||
if (qc_stream_desc_ack(stream, offset, len, fin)) {
|
if (qc_stream_desc_ack(stream, frm)) {
|
||||||
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
|
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
|
||||||
qc, strm_frm, stream);
|
qc, strm_frm, stream);
|
||||||
ret = 1;
|
ret = 1;
|
||||||
}
|
}
|
||||||
|
qc_release_frm(qc, frm);
|
||||||
|
|
||||||
/* Always retrieve first buffer as the previously used
|
/* Always retrieve first buffer as the previously used
|
||||||
* instance could have been removed during qc_stream_desc_ack().
|
* instance could have been removed during qc_stream_desc_ack().
|
||||||
@ -284,10 +281,6 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
|
|||||||
struct qf_stream *strm_frm = &frm->stream;
|
struct qf_stream *strm_frm = &frm->stream;
|
||||||
struct eb64_node *node = NULL;
|
struct eb64_node *node = NULL;
|
||||||
struct qc_stream_desc *stream = NULL;
|
struct qc_stream_desc *stream = NULL;
|
||||||
const size_t offset = strm_frm->offset.key;
|
|
||||||
const size_t len = strm_frm->len;
|
|
||||||
const int fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
|
|
||||||
int ack;
|
|
||||||
|
|
||||||
/* do not use strm_frm->stream as the qc_stream_desc instance
|
/* do not use strm_frm->stream as the qc_stream_desc instance
|
||||||
* might be freed at this stage. Use the id to do a proper
|
* might be freed at this stage. Use the id to do a proper
|
||||||
@ -306,31 +299,23 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
|
|||||||
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||||
|
|
||||||
TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
|
TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
|
||||||
if (offset <= stream->ack_offset) {
|
if (!qc_stream_desc_ack(stream, frm)) {
|
||||||
ack = qc_stream_desc_ack(stream, offset, len, fin);
|
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
|
||||||
if (ack || fin) {
|
qc, strm_frm, stream);
|
||||||
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
|
|
||||||
qc, strm_frm, stream);
|
|
||||||
|
|
||||||
quic_stream_try_to_consume(qc, stream);
|
quic_stream_try_to_consume(qc, stream);
|
||||||
|
|
||||||
if (qc_stream_desc_done(stream)) {
|
if (qc_stream_desc_done(stream)) {
|
||||||
/* no need to continue if stream freed. */
|
/* no need to continue if stream freed. */
|
||||||
TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
|
TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
|
||||||
qc_check_close_on_released_mux(qc);
|
qc_check_close_on_released_mux(qc);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
qc_release_frm(qc, frm);
|
qc_release_frm(qc, frm);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
struct qc_stream_buf *stream_buf;
|
TRACE_DEVEL("handled out-of-order stream ACK", QUIC_EV_CONN_ACKSTRM,
|
||||||
struct eb64_node *buf_node;
|
qc, strm_frm, stream);
|
||||||
|
|
||||||
buf_node = eb64_lookup_le(&stream->buf_tree, offset);
|
|
||||||
BUG_ON(!buf_node);
|
|
||||||
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
|
|
||||||
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
#include <haproxy/mux_quic.h>
|
#include <haproxy/mux_quic.h>
|
||||||
#include <haproxy/pool.h>
|
#include <haproxy/pool.h>
|
||||||
#include <haproxy/quic_conn.h>
|
#include <haproxy/quic_conn.h>
|
||||||
#include <haproxy/quic_frame.h>
|
#include <haproxy/quic_frame-t.h>
|
||||||
#include <haproxy/task.h>
|
#include <haproxy/task.h>
|
||||||
|
|
||||||
DECLARE_STATIC_POOL(pool_head_quic_stream_desc, "qc_stream_desc",
|
DECLARE_STATIC_POOL(pool_head_quic_stream_desc, "qc_stream_desc",
|
||||||
@ -134,26 +134,39 @@ void qc_stream_desc_release(struct qc_stream_desc *stream,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Acknowledge data at <offset> of length <len> for <stream> with <fin> set for
|
/* Acknowledge <frm> STREAM frame whose content is managed by <stream>
|
||||||
* the final data.
|
* descriptor.
|
||||||
*
|
*
|
||||||
* Returns the count of byte removed from stream.
|
* Returns 0 if the frame has been handled and can be removed.
|
||||||
|
* Returns a positive value if acknowledgement is out-of-order and
|
||||||
|
* corresponding STREAM frame has been buffered.
|
||||||
*/
|
*/
|
||||||
int qc_stream_desc_ack(struct qc_stream_desc *stream, size_t offset, size_t len,
|
int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm)
|
||||||
int fin)
|
|
||||||
{
|
{
|
||||||
|
struct qf_stream *strm_frm = &frm->stream;
|
||||||
|
const uint64_t offset = strm_frm->offset.key;
|
||||||
|
const uint64_t len = strm_frm->len;
|
||||||
|
const int fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
|
||||||
|
|
||||||
struct qc_stream_buf *stream_buf = NULL;
|
struct qc_stream_buf *stream_buf = NULL;
|
||||||
|
struct eb64_node *buf_node;
|
||||||
struct buffer *buf = NULL;
|
struct buffer *buf = NULL;
|
||||||
size_t diff;
|
size_t diff;
|
||||||
|
|
||||||
/* Cannot advertise FIN for an inferior data range. */
|
/* Cannot advertise FIN for an inferior data range. */
|
||||||
BUG_ON(fin && offset + len < stream->ack_offset);
|
BUG_ON(fin && offset + len < stream->ack_offset);
|
||||||
|
|
||||||
/* No support now for out-of-order ACK reporting. */
|
if (offset + len < stream->ack_offset) {
|
||||||
BUG_ON(offset > stream->ack_offset);
|
|
||||||
|
|
||||||
if (offset + len < stream->ack_offset)
|
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
else if (offset > stream->ack_offset) {
|
||||||
|
buf_node = eb64_lookup_le(&stream->buf_tree, offset);
|
||||||
|
BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */
|
||||||
|
|
||||||
|
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
|
||||||
|
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
diff = offset + len - stream->ack_offset;
|
diff = offset + len - stream->ack_offset;
|
||||||
if (diff) {
|
if (diff) {
|
||||||
@ -193,7 +206,7 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, size_t offset, size_t len,
|
|||||||
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
|
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
return diff;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Free the stream descriptor <stream> content. This function should be used
|
/* Free the stream descriptor <stream> content. This function should be used
|
||||||
|
Loading…
Reference in New Issue
Block a user