MEDIUM: stream: Limit number of synchronous send per stream wakeup

It is not a bug fix, because there is no way to hit the issue for now. But
there is nothing preventing a loop of synchronous sends in process_stream().
Indead, when a synchronous send is successfully performed, we restart the
SCs evaluation and at the end another synchronous send is attempted. So with
an endpoint consuming data bit by bit or with a filter fowarding few bytes
at each call, it is possible to loop for a while in process_stream().

Because it is not expected, we now limit the number of synchronous send per
wakeup to two calls. In a nominal case, it should never be more. This commit
is mandatory to be able to handle large buffers on channels

There is no reason to backport this commit except if the large buffers
support on channels are backported.
This commit is contained in:
Christopher Faulet 2026-02-03 07:49:21 +01:00
parent 5965a6e1d2
commit 53b7150357
3 changed files with 22 additions and 12 deletions

View File

@ -38,7 +38,7 @@ void sc_update_tx(struct stconn *sc);
struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state);
int sc_conn_sync_recv(struct stconn *sc);
void sc_conn_sync_send(struct stconn *sc);
int sc_conn_sync_send(struct stconn *sc);
int sc_applet_sync_recv(struct stconn *sc);
void sc_applet_sync_send(struct stconn *sc);
@ -408,10 +408,15 @@ static inline int sc_sync_recv(struct stconn *sc)
/* Perform a synchronous send using the right version, depending the endpoing is
* a connection or an applet.
*/
static inline void sc_sync_send(struct stconn *sc)
static inline int sc_sync_send(struct stconn *sc, unsigned cnt)
{
if (sc_ep_test(sc, SE_FL_T_MUX))
sc_conn_sync_send(sc);
if (!sc_ep_test(sc, SE_FL_T_MUX))
return 0;
if (cnt >= 2 && co_data(sc_oc(sc))) {
task_wakeup(__sc_strm(sc)->task, TASK_WOKEN_MSG);
return 0;
}
return sc_conn_sync_send(sc);
}
/* Combines both sc_update_rx() and sc_update_tx() at once */

View File

@ -1801,27 +1801,30 @@ int sc_conn_send(struct stconn *sc)
* flag are cleared prior to the attempt, and will possibly be updated in case
* of success.
*/
void sc_conn_sync_send(struct stconn *sc)
int sc_conn_sync_send(struct stconn *sc)
{
struct channel *oc = sc_oc(sc);
int did_send = 0;
oc->flags &= ~CF_WRITE_EVENT;
if (sc->flags & SC_FL_SHUT_DONE)
return;
goto end;
if (!co_data(oc))
return;
goto end;
if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST))
return;
goto end;
if (!sc_mux_ops(sc))
return;
goto end;
sc_conn_send(sc);
did_send = sc_conn_send(sc);
if (oc->flags & CF_WRITE_EVENT)
oc->flags |= CF_WAKE_ONCE;
end:
return did_send;
}
/* Called by I/O handlers after completion.. It propagates

View File

@ -1828,6 +1828,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
struct channel *req, *res;
struct stconn *scf, *scb;
unsigned int rate;
unsigned int scf_send_cnt, scb_send_cnt;
activity[tid].stream_calls++;
stream_cond_update_cpu_latency(s);
@ -1855,6 +1856,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
/* Keep a copy of SC flags */
scf_flags = scf->flags;
scb_flags = scb->flags;
scf_send_cnt = scb_send_cnt = 0;
/* update pending events */
s->pending_events |= stream_map_task_state(state);
@ -2488,7 +2490,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
}
/* Let's see if we can send the pending request now */
sc_sync_send(scb);
sc_sync_send(scb, scb_send_cnt++);
/*
* Now forward all shutdown requests between both sides of the request buffer
@ -2598,7 +2600,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
scf_flags = (scf_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED));
/* Let's see if we can send the pending response now */
sc_sync_send(scf);
sc_sync_send(scf, scf_send_cnt++);
/*
* Now forward all shutdown requests between both sides of the buffer