mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 23:56:57 +02:00
MINOR: stream-int/conn-stream: Move stream_int_notify() in the conn-stream scope
stream_int_notify() is renamed cs_notify() and is updated to manipulate a conn-stream instead of a stream-interface.
This commit is contained in:
parent
d715d36200
commit
9029a72899
@ -45,7 +45,7 @@ DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream
|
|||||||
static void cs_conn_read0(struct conn_stream *cs);
|
static void cs_conn_read0(struct conn_stream *cs);
|
||||||
|
|
||||||
/* post-IO notification callback */
|
/* post-IO notification callback */
|
||||||
static void stream_int_notify(struct stream_interface *si);
|
static void cs_notify(struct conn_stream *cs);
|
||||||
|
|
||||||
struct data_cb si_conn_cb = {
|
struct data_cb si_conn_cb = {
|
||||||
.wake = si_cs_process,
|
.wake = si_cs_process,
|
||||||
@ -92,20 +92,20 @@ void si_free(struct stream_interface *si)
|
|||||||
* It should not be called from within the stream itself, cs_update()
|
* It should not be called from within the stream itself, cs_update()
|
||||||
* is designed for this.
|
* is designed for this.
|
||||||
*/
|
*/
|
||||||
static void stream_int_notify(struct stream_interface *si)
|
static void cs_notify(struct conn_stream *cs)
|
||||||
{
|
{
|
||||||
struct channel *ic = si_ic(si);
|
struct channel *ic = cs_ic(cs);
|
||||||
struct channel *oc = si_oc(si);
|
struct channel *oc = cs_oc(cs);
|
||||||
struct stream_interface *sio = si_opposite(si);
|
struct conn_stream *cso = cs_opposite(cs);
|
||||||
struct task *task = si_task(si);
|
struct task *task = cs_strm_task(cs);
|
||||||
|
|
||||||
/* process consumer side */
|
/* process consumer side */
|
||||||
if (channel_is_empty(oc)) {
|
if (channel_is_empty(oc)) {
|
||||||
struct connection *conn = cs_conn(si->cs);
|
struct connection *conn = cs_conn(cs);
|
||||||
|
|
||||||
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
|
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
|
||||||
(si->cs->state == CS_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
|
(cs->state == CS_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
|
||||||
cs_shutw(si->cs);
|
cs_shutw(cs);
|
||||||
oc->wex = TICK_ETERNITY;
|
oc->wex = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,9 +113,9 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
* we're about to close and can't expect more data if SHUTW_NOW is there.
|
* we're about to close and can't expect more data if SHUTW_NOW is there.
|
||||||
*/
|
*/
|
||||||
if (!(oc->flags & (CF_SHUTW|CF_SHUTW_NOW)))
|
if (!(oc->flags & (CF_SHUTW|CF_SHUTW_NOW)))
|
||||||
si->flags |= SI_FL_WAIT_DATA;
|
cs->si->flags |= SI_FL_WAIT_DATA;
|
||||||
else if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW)
|
else if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW)
|
||||||
si->flags &= ~SI_FL_WAIT_DATA;
|
cs->si->flags &= ~SI_FL_WAIT_DATA;
|
||||||
|
|
||||||
/* update OC timeouts and wake the other side up if it's waiting for room */
|
/* update OC timeouts and wake the other side up if it's waiting for room */
|
||||||
if (oc->flags & CF_WRITE_ACTIVITY) {
|
if (oc->flags & CF_WRITE_ACTIVITY) {
|
||||||
@ -124,15 +124,15 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
if (tick_isset(oc->wex))
|
if (tick_isset(oc->wex))
|
||||||
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||||
|
|
||||||
if (!(si->cs->flags & CS_FL_INDEP_STR))
|
if (!(cs->flags & CS_FL_INDEP_STR))
|
||||||
if (tick_isset(ic->rex))
|
if (tick_isset(ic->rex))
|
||||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oc->flags & CF_DONT_READ)
|
if (oc->flags & CF_DONT_READ)
|
||||||
si_rx_chan_blk(sio);
|
si_rx_chan_blk(cso->si);
|
||||||
else
|
else
|
||||||
si_rx_chan_rdy(sio);
|
si_rx_chan_rdy(cso->si);
|
||||||
|
|
||||||
/* Notify the other side when we've injected data into the IC that
|
/* Notify the other side when we've injected data into the IC that
|
||||||
* needs to be forwarded. We can do fast-forwarding as soon as there
|
* needs to be forwarded. We can do fast-forwarding as soon as there
|
||||||
@ -146,7 +146,7 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
* an HTTP parser might need more data to complete the parsing.
|
* an HTTP parser might need more data to complete the parsing.
|
||||||
*/
|
*/
|
||||||
if (!channel_is_empty(ic) &&
|
if (!channel_is_empty(ic) &&
|
||||||
(sio->flags & SI_FL_WAIT_DATA) &&
|
(cso->si->flags & SI_FL_WAIT_DATA) &&
|
||||||
(!(ic->flags & CF_EXPECT_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
|
(!(ic->flags & CF_EXPECT_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
|
||||||
int new_len, last_len;
|
int new_len, last_len;
|
||||||
|
|
||||||
@ -154,7 +154,7 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
if (ic->pipe)
|
if (ic->pipe)
|
||||||
last_len += ic->pipe->data;
|
last_len += ic->pipe->data;
|
||||||
|
|
||||||
cs_chk_snd(sio->cs);
|
cs_chk_snd(cso);
|
||||||
|
|
||||||
new_len = co_data(ic);
|
new_len = co_data(ic);
|
||||||
if (ic->pipe)
|
if (ic->pipe)
|
||||||
@ -164,16 +164,16 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
* buffer or in the pipe.
|
* buffer or in the pipe.
|
||||||
*/
|
*/
|
||||||
if (new_len < last_len)
|
if (new_len < last_len)
|
||||||
si_rx_room_rdy(si);
|
si_rx_room_rdy(cs->si);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(ic->flags & CF_DONT_READ))
|
if (!(ic->flags & CF_DONT_READ))
|
||||||
si_rx_chan_rdy(si);
|
si_rx_chan_rdy(cs->si);
|
||||||
|
|
||||||
cs_chk_rcv(si->cs);
|
cs_chk_rcv(cs);
|
||||||
cs_chk_rcv(sio->cs);
|
cs_chk_rcv(cso);
|
||||||
|
|
||||||
if (si_rx_blocked(si)) {
|
if (si_rx_blocked(cs->si)) {
|
||||||
ic->rex = TICK_ETERNITY;
|
ic->rex = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
|
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
|
||||||
@ -185,10 +185,10 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
/* wake the task up only when needed */
|
/* wake the task up only when needed */
|
||||||
if (/* changes on the production side */
|
if (/* changes on the production side */
|
||||||
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
|
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
|
||||||
!cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
|
!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
|
||||||
(si->cs->endp->flags & CS_EP_ERROR) ||
|
(cs->endp->flags & CS_EP_ERROR) ||
|
||||||
((ic->flags & CF_READ_PARTIAL) &&
|
((ic->flags & CF_READ_PARTIAL) &&
|
||||||
((ic->flags & CF_EOI) || !ic->to_forward || sio->cs->state != CS_ST_EST)) ||
|
((ic->flags & CF_EOI) || !ic->to_forward || cso->state != CS_ST_EST)) ||
|
||||||
|
|
||||||
/* changes on the consumption side */
|
/* changes on the consumption side */
|
||||||
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||||
@ -196,7 +196,7 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
((oc->flags & CF_SHUTW) ||
|
((oc->flags & CF_SHUTW) ||
|
||||||
(((oc->flags & CF_WAKE_WRITE) ||
|
(((oc->flags & CF_WAKE_WRITE) ||
|
||||||
!(oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW|CF_SHUTW))) &&
|
!(oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW|CF_SHUTW))) &&
|
||||||
(sio->cs->state != CS_ST_EST ||
|
(cso->state != CS_ST_EST ||
|
||||||
(channel_is_empty(oc) && !oc->to_forward)))))) {
|
(channel_is_empty(oc) && !oc->to_forward)))))) {
|
||||||
task_wakeup(task, TASK_WOKEN_IO);
|
task_wakeup(task, TASK_WOKEN_IO);
|
||||||
}
|
}
|
||||||
@ -208,7 +208,7 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
|
|
||||||
task->expire = tick_first(task->expire, ic->analyse_exp);
|
task->expire = tick_first(task->expire, ic->analyse_exp);
|
||||||
task->expire = tick_first(task->expire, oc->analyse_exp);
|
task->expire = tick_first(task->expire, oc->analyse_exp);
|
||||||
task->expire = tick_first(task->expire, __cs_strm(si->cs)->conn_exp);
|
task->expire = tick_first(task->expire, __cs_strm(cs)->conn_exp);
|
||||||
|
|
||||||
task_queue(task);
|
task_queue(task);
|
||||||
}
|
}
|
||||||
@ -301,7 +301,7 @@ int si_cs_process(struct conn_stream *cs)
|
|||||||
* pending data, then possibly wake the stream up based on the new
|
* pending data, then possibly wake the stream up based on the new
|
||||||
* stream-int status.
|
* stream-int status.
|
||||||
*/
|
*/
|
||||||
stream_int_notify(si);
|
cs_notify(cs);
|
||||||
stream_release_buffers(si_strm(si));
|
stream_release_buffers(si_strm(si));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -931,10 +931,10 @@ int cs_applet_process(struct conn_stream *cs)
|
|||||||
si_rx_endp_more(cs->si);
|
si_rx_endp_more(cs->si);
|
||||||
|
|
||||||
/* update the stream-int, channels, and possibly wake the stream up */
|
/* update the stream-int, channels, and possibly wake the stream up */
|
||||||
stream_int_notify(cs->si);
|
cs_notify(cs);
|
||||||
stream_release_buffers(__cs_strm(cs));
|
stream_release_buffers(__cs_strm(cs));
|
||||||
|
|
||||||
/* stream_int_notify may have passed through chk_snd and released some
|
/* cs_notify may have passed through chk_snd and released some
|
||||||
* RXBLK flags. Process_stream will consider those flags to wake up the
|
* RXBLK flags. Process_stream will consider those flags to wake up the
|
||||||
* appctx but in the case the task is not in runqueue we may have to
|
* appctx but in the case the task is not in runqueue we may have to
|
||||||
* wakeup the appctx immediately.
|
* wakeup the appctx immediately.
|
||||||
|
Loading…
Reference in New Issue
Block a user