MINOR: stconn: start to rename cs_rx_endp_{more,done}() to se_have_{no_,}more_data()

The analysis of cs_rx_endp_more() showed that the purpose is for a stream
endpoint to inform the connector that it's ready to deliver more data to
that one, and conversely cs_rx_endp_done() that it's done delivering data
so it should not be bothered again for this.

This was modified two ways:
  - the operation is no longer performed on the connector but on the
    endpoint so that there is no more doubt when reading applet code
    about what this rx refers to; it's the endpoint that has more or
    no more data.

  - an applet implementation is also provided and mostly used from
    applet code since it saves the caller from having to access the
    endpoint descriptor.

It's visible that the flag ought to be inverted because some places
have to set it by default for no reason.
This commit is contained in:
Willy Tarreau 2022-05-25 15:42:03 +02:00
parent 0ed73c376c
commit 4164eb94f3
13 changed files with 50 additions and 30 deletions

View File

@ -128,6 +128,22 @@ static inline struct stream *appctx_strm(const struct appctx *appctx)
return __sc_strm(appctx->sedesc->sc);
}
/* The applet announces it has more data to deliver to the stream's input
* buffer.
*/
static inline void applet_have_more_data(struct appctx *appctx)
{
se_fl_clr(appctx->sedesc, SE_FL_RX_WAIT_EP);
}
/* The applet announces it doesn't have more data for the stream's input
* buffer.
*/
static inline void applet_have_no_more_data(struct appctx *appctx)
{
se_fl_set(appctx->sedesc, SE_FL_RX_WAIT_EP);
}
/* writes chunk <chunk> into the input channel of the stream attached to this
* appctx's endpoint, and marks the RXBLK_ROOM on a channel full error. See
* ci_putchk() for the list of return codes.

View File

@ -305,16 +305,20 @@ static inline int cs_rx_endp_ready(const struct stconn *cs)
return !sc_ep_test(cs, SE_FL_RX_WAIT_EP);
}
/* The stream connector announces it is ready to try to deliver more data to the input buffer */
static inline void cs_rx_endp_more(struct stconn *cs)
/* The stream endpoint announces it has more data to deliver to the stream's
* input buffer.
*/
static inline void se_have_more_data(struct sedesc *se)
{
sc_ep_clr(cs, SE_FL_RX_WAIT_EP);
se_fl_clr(se, SE_FL_RX_WAIT_EP);
}
/* The stream connector announces it doesn't have more data for the input buffer */
static inline void cs_rx_endp_done(struct stconn *cs)
/* The stream endpoint announces it doesn't have more data for the stream's
* input buffer.
*/
static inline void se_have_no_more_data(struct sedesc *se)
{
sc_ep_set(cs, SE_FL_RX_WAIT_EP);
se_fl_set(se, SE_FL_RX_WAIT_EP);
}
/* The application layer informs a stream connector that it's willing to

View File

@ -219,7 +219,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
* that one applet which ignores any event will not spin.
*/
cs_cant_get(cs);
cs_rx_endp_done(cs);
applet_have_no_more_data(app);
/* Now we'll try to allocate the input buffer. We wake up the applet in
* all cases. So this is the applet's responsibility to check if this
@ -228,7 +228,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
* do if it needs the buffer, it will be called again upon readiness.
*/
if (!cs_alloc_ibuf(cs, &app->buffer_wait))
cs_rx_endp_more(cs);
applet_have_more_data(app);
count = co_data(sc_oc(cs));
app->applet->fct(app);

View File

@ -1588,9 +1588,9 @@ static int sc_conn_recv(struct stconn *cs)
else if (!cs_rx_blocked(cs) && !(ic->flags & CF_SHUTR)) {
/* Subscribe to receive events if we're blocking on I/O */
conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
cs_rx_endp_done(cs);
se_have_no_more_data(cs->sedesc);
} else {
cs_rx_endp_more(cs);
se_have_more_data(cs->sedesc);
ret = 1;
}
return ret;
@ -1926,7 +1926,7 @@ static int cs_applet_process(struct stconn *cs)
* begin blocked by the channel.
*/
if (cs_rx_blocked(cs) || sc_ep_test(cs, SE_FL_APPLET_NEED_CONN))
cs_rx_endp_more(cs);
applet_have_more_data(__sc_appctx(cs));
/* update the stream connector, channels, and possibly wake the stream up */
cs_notify(cs);

View File

@ -474,7 +474,7 @@ static void dns_session_io_handler(struct appctx *appctx)
if (cs_opposite(cs)->state < SC_ST_EST) {
cs_cant_get(cs);
se_need_remote_conn(appctx->sedesc);
cs_rx_endp_more(cs);
applet_have_more_data(appctx);
return;
}
@ -649,7 +649,7 @@ static void dns_session_io_handler(struct appctx *appctx)
BUG_ON(LIST_INLIST(&appctx->wait_entry));
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(DNS_LOCK, &ring->lock);
cs_rx_endp_done(cs);
applet_have_no_more_data(appctx);
}
read:

View File

@ -1194,7 +1194,7 @@ static int
spoe_wakeup_appctx(struct appctx *appctx)
{
cs_want_get(appctx_cs(appctx));
cs_rx_endp_more(appctx_cs(appctx));
applet_have_more_data(appctx);
appctx_wakeup(appctx);
return 1;
}
@ -1399,7 +1399,7 @@ spoe_handle_connect_appctx(struct appctx *appctx)
if (!cs_state_in(cs->state, SC_SB_RDY|SC_SB_EST)) {
/* not connected yet */
cs_rx_endp_more(cs);
applet_have_more_data(appctx);
task_wakeup(__sc_strm(cs)->task, TASK_WOKEN_MSG);
goto stop;
}

View File

@ -1958,7 +1958,7 @@ static void hlua_socket_handler(struct appctx *appctx)
if (cs_opposite(cs)->state < SC_ST_EST) {
cs_cant_get(cs);
se_need_remote_conn(appctx->sedesc);
cs_rx_endp_more(cs);
applet_have_more_data(appctx);
return;
}
@ -1983,7 +1983,7 @@ static void hlua_socket_handler(struct appctx *appctx)
* to write, so we clear the blocking flag.
*/
if (notification_registered(&ctx->wake_on_write))
cs_rx_endp_more(cs);
applet_have_more_data(appctx);
}
static int hlua_socket_init(struct appctx *appctx)
@ -2859,7 +2859,7 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
* connection completes.
*/
cs_cant_get(s->scf);
cs_rx_endp_more(s->scf);
applet_have_more_data(appctx);
appctx_wakeup(appctx);
hlua->gc_count++;
@ -9306,7 +9306,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx)
/* Wakeup the applet ASAP. */
cs_cant_get(cs);
cs_rx_endp_more(cs);
applet_have_more_data(ctx);
return 0;
}

View File

@ -1023,7 +1023,7 @@ static int cli_io_handler_clear_map(struct appctx *appctx)
if (!finished) {
/* let's come back later */
cs_rx_endp_more(appctx_cs(appctx));
applet_have_more_data(appctx);
return 0;
}
return 1;

View File

@ -378,7 +378,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
cs_rx_endp_done(cs);
applet_have_no_more_data(appctx);
ret = 0;
}
/* always drain all the request */

View File

@ -335,7 +335,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
if (cs_opposite(cs)->state < SC_ST_EST) {
cs_cant_get(cs);
se_need_remote_conn(appctx->sedesc);
cs_rx_endp_more(cs);
applet_have_more_data(appctx);
return;
}
@ -417,7 +417,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
cs_rx_endp_done(cs);
applet_have_no_more_data(appctx);
}
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
@ -475,7 +475,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
if (cs_opposite(cs)->state < SC_ST_EST) {
cs_cant_get(cs);
se_need_remote_conn(appctx->sedesc);
cs_rx_endp_more(cs);
applet_have_more_data(appctx);
return;
}
@ -561,7 +561,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
cs_rx_endp_done(cs);
applet_have_no_more_data(appctx);
}
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);

View File

@ -2144,7 +2144,7 @@ yield:
/* store the state */
applet_putchk(appctx, trash);
free_trash_chunk(trash);
cs_rx_endp_more(cs); /* let's come back later */
applet_have_more_data(appctx); /* let's come back later */
return 0; /* should come back */
error:
@ -2930,7 +2930,7 @@ yield:
/* store the state */
applet_putchk(appctx, trash);
free_trash_chunk(trash);
cs_rx_endp_more(cs); /* let's come back later */
applet_have_more_data(appctx); /* let's come back later */
return 0; /* should come back */
error:

View File

@ -1153,7 +1153,7 @@ yield:
/* store the state */
applet_putchk(appctx, trash);
free_trash_chunk(trash);
cs_rx_endp_more(cs); /* let's come back later */
applet_have_more_data(appctx); /* let's come back later */
return 0; /* should come back */
error:

View File

@ -917,7 +917,7 @@ static void back_establish(struct stream *s)
rep->analysers |= strm_fe(s)->fe_rsp_ana | s->be->be_rsp_ana;
cs_rx_endp_more(s->scb);
se_have_more_data(s->scb->sedesc);
rep->flags |= CF_READ_ATTACHED; /* producer is now attached */
if (conn) {
/* real connections have timeouts
@ -1476,7 +1476,7 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot
conn = sc_conn(cs);
if (conn) {
cs_rx_endp_more(s->scf);
se_have_more_data(s->scf->sedesc);
/* Make sure we're unsubscribed, the the new
* mux will probably want to subscribe to
* the underlying XPRT