diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index eb5e42a07..9e5e44fd7 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -128,6 +128,22 @@ static inline struct stream *appctx_strm(const struct appctx *appctx) return __sc_strm(appctx->sedesc->sc); } +/* The applet announces it has more data to deliver to the stream's input + * buffer. + */ +static inline void applet_have_more_data(struct appctx *appctx) +{ + se_fl_clr(appctx->sedesc, SE_FL_RX_WAIT_EP); +} + +/* The applet announces it doesn't have more data for the stream's input + * buffer. + */ +static inline void applet_have_no_more_data(struct appctx *appctx) +{ + se_fl_set(appctx->sedesc, SE_FL_RX_WAIT_EP); +} + /* writes chunk into the input channel of the stream attached to this * appctx's endpoint, and marks the RXBLK_ROOM on a channel full error. See * ci_putchk() for the list of return codes. diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h index 0418039e2..b98f5db45 100644 --- a/include/haproxy/conn_stream.h +++ b/include/haproxy/conn_stream.h @@ -305,16 +305,20 @@ static inline int cs_rx_endp_ready(const struct stconn *cs) return !sc_ep_test(cs, SE_FL_RX_WAIT_EP); } -/* The stream connector announces it is ready to try to deliver more data to the input buffer */ -static inline void cs_rx_endp_more(struct stconn *cs) +/* The stream endpoint announces it has more data to deliver to the stream's + * input buffer. + */ +static inline void se_have_more_data(struct sedesc *se) { - sc_ep_clr(cs, SE_FL_RX_WAIT_EP); + se_fl_clr(se, SE_FL_RX_WAIT_EP); } -/* The stream connector announces it doesn't have more data for the input buffer */ -static inline void cs_rx_endp_done(struct stconn *cs) +/* The stream endpoint announces it doesn't have more data for the stream's + * input buffer. + */ +static inline void se_have_no_more_data(struct sedesc *se) { - sc_ep_set(cs, SE_FL_RX_WAIT_EP); + se_fl_set(se, SE_FL_RX_WAIT_EP); } /* The application layer informs a stream connector that it's willing to diff --git a/src/applet.c b/src/applet.c index c48d36f61..7330512b0 100644 --- a/src/applet.c +++ b/src/applet.c @@ -219,7 +219,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) * that one applet which ignores any event will not spin. */ cs_cant_get(cs); - cs_rx_endp_done(cs); + applet_have_no_more_data(app); /* Now we'll try to allocate the input buffer. We wake up the applet in * all cases. So this is the applet's responsibility to check if this @@ -228,7 +228,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) * do if it needs the buffer, it will be called again upon readiness. */ if (!cs_alloc_ibuf(cs, &app->buffer_wait)) - cs_rx_endp_more(cs); + applet_have_more_data(app); count = co_data(sc_oc(cs)); app->applet->fct(app); diff --git a/src/conn_stream.c b/src/conn_stream.c index b898dd9f7..9904fba31 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -1588,9 +1588,9 @@ static int sc_conn_recv(struct stconn *cs) else if (!cs_rx_blocked(cs) && !(ic->flags & CF_SHUTR)) { /* Subscribe to receive events if we're blocking on I/O */ conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event); - cs_rx_endp_done(cs); + se_have_no_more_data(cs->sedesc); } else { - cs_rx_endp_more(cs); + se_have_more_data(cs->sedesc); ret = 1; } return ret; @@ -1926,7 +1926,7 @@ static int cs_applet_process(struct stconn *cs) * begin blocked by the channel. */ if (cs_rx_blocked(cs) || sc_ep_test(cs, SE_FL_APPLET_NEED_CONN)) - cs_rx_endp_more(cs); + applet_have_more_data(__sc_appctx(cs)); /* update the stream connector, channels, and possibly wake the stream up */ cs_notify(cs); diff --git a/src/dns.c b/src/dns.c index 29e7f3e3f..3bb2653a0 100644 --- a/src/dns.c +++ b/src/dns.c @@ -474,7 +474,7 @@ static void dns_session_io_handler(struct appctx *appctx) if (cs_opposite(cs)->state < SC_ST_EST) { cs_cant_get(cs); se_need_remote_conn(appctx->sedesc); - cs_rx_endp_more(cs); + applet_have_more_data(appctx); return; } @@ -649,7 +649,7 @@ static void dns_session_io_handler(struct appctx *appctx) BUG_ON(LIST_INLIST(&appctx->wait_entry)); LIST_APPEND(&ring->waiters, &appctx->wait_entry); HA_RWLOCK_WRUNLOCK(DNS_LOCK, &ring->lock); - cs_rx_endp_done(cs); + applet_have_no_more_data(appctx); } read: diff --git a/src/flt_spoe.c b/src/flt_spoe.c index ac48c0669..90c3e01fa 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1194,7 +1194,7 @@ static int spoe_wakeup_appctx(struct appctx *appctx) { cs_want_get(appctx_cs(appctx)); - cs_rx_endp_more(appctx_cs(appctx)); + applet_have_more_data(appctx); appctx_wakeup(appctx); return 1; } @@ -1399,7 +1399,7 @@ spoe_handle_connect_appctx(struct appctx *appctx) if (!cs_state_in(cs->state, SC_SB_RDY|SC_SB_EST)) { /* not connected yet */ - cs_rx_endp_more(cs); + applet_have_more_data(appctx); task_wakeup(__sc_strm(cs)->task, TASK_WOKEN_MSG); goto stop; } diff --git a/src/hlua.c b/src/hlua.c index e9316106b..64a6ae83e 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1958,7 +1958,7 @@ static void hlua_socket_handler(struct appctx *appctx) if (cs_opposite(cs)->state < SC_ST_EST) { cs_cant_get(cs); se_need_remote_conn(appctx->sedesc); - cs_rx_endp_more(cs); + applet_have_more_data(appctx); return; } @@ -1983,7 +1983,7 @@ static void hlua_socket_handler(struct appctx *appctx) * to write, so we clear the blocking flag. */ if (notification_registered(&ctx->wake_on_write)) - cs_rx_endp_more(cs); + applet_have_more_data(appctx); } static int hlua_socket_init(struct appctx *appctx) @@ -2859,7 +2859,7 @@ __LJMP static int hlua_socket_connect(struct lua_State *L) * connection completes. */ cs_cant_get(s->scf); - cs_rx_endp_more(s->scf); + applet_have_more_data(appctx); appctx_wakeup(appctx); hlua->gc_count++; @@ -9306,7 +9306,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx) /* Wakeup the applet ASAP. */ cs_cant_get(cs); - cs_rx_endp_more(cs); + applet_have_more_data(ctx); return 0; } diff --git a/src/map.c b/src/map.c index b13971092..3c04eb709 100644 --- a/src/map.c +++ b/src/map.c @@ -1023,7 +1023,7 @@ static int cli_io_handler_clear_map(struct appctx *appctx) if (!finished) { /* let's come back later */ - cs_rx_endp_more(appctx_cs(appctx)); + applet_have_more_data(appctx); return 0; } return 1; diff --git a/src/ring.c b/src/ring.c index 9ab0330db..873d0d6bf 100644 --- a/src/ring.c +++ b/src/ring.c @@ -378,7 +378,7 @@ int cli_io_handler_show_ring(struct appctx *appctx) HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_APPEND(&ring->waiters, &appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); - cs_rx_endp_done(cs); + applet_have_no_more_data(appctx); ret = 0; } /* always drain all the request */ diff --git a/src/sink.c b/src/sink.c index 6bd30fcac..f8e9404bc 100644 --- a/src/sink.c +++ b/src/sink.c @@ -335,7 +335,7 @@ static void sink_forward_io_handler(struct appctx *appctx) if (cs_opposite(cs)->state < SC_ST_EST) { cs_cant_get(cs); se_need_remote_conn(appctx->sedesc); - cs_rx_endp_more(cs); + applet_have_more_data(appctx); return; } @@ -417,7 +417,7 @@ static void sink_forward_io_handler(struct appctx *appctx) HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_APPEND(&ring->waiters, &appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); - cs_rx_endp_done(cs); + applet_have_no_more_data(appctx); } HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); @@ -475,7 +475,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx) if (cs_opposite(cs)->state < SC_ST_EST) { cs_cant_get(cs); se_need_remote_conn(appctx->sedesc); - cs_rx_endp_more(cs); + applet_have_more_data(appctx); return; } @@ -561,7 +561,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx) HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_APPEND(&ring->waiters, &appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); - cs_rx_endp_done(cs); + applet_have_no_more_data(appctx); } HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); diff --git a/src/ssl_ckch.c b/src/ssl_ckch.c index afcc6ad7b..1accf60f1 100644 --- a/src/ssl_ckch.c +++ b/src/ssl_ckch.c @@ -2144,7 +2144,7 @@ yield: /* store the state */ applet_putchk(appctx, trash); free_trash_chunk(trash); - cs_rx_endp_more(cs); /* let's come back later */ + applet_have_more_data(appctx); /* let's come back later */ return 0; /* should come back */ error: @@ -2930,7 +2930,7 @@ yield: /* store the state */ applet_putchk(appctx, trash); free_trash_chunk(trash); - cs_rx_endp_more(cs); /* let's come back later */ + applet_have_more_data(appctx); /* let's come back later */ return 0; /* should come back */ error: diff --git a/src/ssl_crtlist.c b/src/ssl_crtlist.c index c06f3acde..e4297906a 100644 --- a/src/ssl_crtlist.c +++ b/src/ssl_crtlist.c @@ -1153,7 +1153,7 @@ yield: /* store the state */ applet_putchk(appctx, trash); free_trash_chunk(trash); - cs_rx_endp_more(cs); /* let's come back later */ + applet_have_more_data(appctx); /* let's come back later */ return 0; /* should come back */ error: diff --git a/src/stream.c b/src/stream.c index c05f039c7..64e2dd0e8 100644 --- a/src/stream.c +++ b/src/stream.c @@ -917,7 +917,7 @@ static void back_establish(struct stream *s) rep->analysers |= strm_fe(s)->fe_rsp_ana | s->be->be_rsp_ana; - cs_rx_endp_more(s->scb); + se_have_more_data(s->scb->sedesc); rep->flags |= CF_READ_ATTACHED; /* producer is now attached */ if (conn) { /* real connections have timeouts @@ -1476,7 +1476,7 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot conn = sc_conn(cs); if (conn) { - cs_rx_endp_more(s->scf); + se_have_more_data(s->scf->sedesc); /* Make sure we're unsubscribed, the the new * mux will probably want to subscribe to * the underlying XPRT