MEDIUM: stream-int/conn-stream: Move si_ops in the conn-stream scope

The si_ops structure is renamed to cs_app_ops and the callback functions are
changed to manipulate a conn-stream instead of a stream-interface..
This commit is contained in:
Christopher Faulet 2022-04-01 08:58:29 +02:00
parent da098e6c17
commit 0c6a64cd5f
5 changed files with 175 additions and 172 deletions

View File

@ -147,6 +147,14 @@ struct cs_endpoint {
unsigned int flags; unsigned int flags;
}; };
/* operations available on a conn-stream */
struct cs_app_ops {
void (*chk_rcv)(struct conn_stream *); /* chk_rcv function, may not be null */
void (*chk_snd)(struct conn_stream *); /* chk_snd function, may not be null */
void (*shutr)(struct conn_stream *); /* shut read function, may not be null */
void (*shutw)(struct conn_stream *); /* shut write function, may not be null */
};
/* /*
* This structure describes the elements of a connection relevant to a stream * This structure describes the elements of a connection relevant to a stream
*/ */
@ -162,6 +170,7 @@ struct conn_stream {
enum obj_type *app; /* points to the applicative point (stream or check) */ enum obj_type *app; /* points to the applicative point (stream or check) */
struct stream_interface *si; struct stream_interface *si;
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
struct cs_app_ops *ops; /* general operations used at the app layer */
struct sockaddr_storage *src; /* source address (pool), when known, otherwise NULL */ struct sockaddr_storage *src; /* source address (pool), when known, otherwise NULL */
struct sockaddr_storage *dst; /* destination address (pool), when known, otherwise NULL */ struct sockaddr_storage *dst; /* destination address (pool), when known, otherwise NULL */
}; };

View File

@ -62,15 +62,6 @@ struct stream_interface {
/* 16-bit hole here */ /* 16-bit hole here */
unsigned int flags; /* SI_FL_* */ unsigned int flags; /* SI_FL_* */
struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */ struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */
struct si_ops *ops; /* general operations at the stream interface layer */
};
/* operations available on a stream-interface */
struct si_ops {
void (*chk_rcv)(struct stream_interface *); /* chk_rcv function, may not be null */
void (*chk_snd)(struct stream_interface *); /* chk_snd function, may not be null */
void (*shutr)(struct stream_interface *); /* shut read function, may not be null */
void (*shutw)(struct stream_interface *); /* shut write function, may not be null */
}; };
#endif /* _HAPROXY_STREAM_INTERFACE_T_H */ #endif /* _HAPROXY_STREAM_INTERFACE_T_H */

View File

@ -30,9 +30,9 @@
#include <haproxy/cs_utils.h> #include <haproxy/cs_utils.h>
#include <haproxy/obj_type.h> #include <haproxy/obj_type.h>
extern struct si_ops si_embedded_ops; extern struct cs_app_ops cs_app_embedded_ops;
extern struct si_ops si_conn_ops; extern struct cs_app_ops cs_app_conn_ops;
extern struct si_ops si_applet_ops; extern struct cs_app_ops cs_app_applet_ops;
extern struct data_cb si_conn_cb; extern struct data_cb si_conn_cb;
extern struct data_cb check_conn_cb; extern struct data_cb check_conn_cb;
@ -107,7 +107,6 @@ static inline int si_init(struct stream_interface *si)
{ {
si->flags &= SI_FL_ISBACK; si->flags &= SI_FL_ISBACK;
si->cs = NULL; si->cs = NULL;
si->ops = &si_embedded_ops;
return 0; return 0;
} }
@ -276,13 +275,13 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait
/* Sends a shutr to the endpoint using the data layer */ /* Sends a shutr to the endpoint using the data layer */
static inline void cs_shutr(struct conn_stream *cs) static inline void cs_shutr(struct conn_stream *cs)
{ {
cs->si->ops->shutr(cs->si); cs->ops->shutr(cs);
} }
/* Sends a shutw to the endpoint using the data layer */ /* Sends a shutw to the endpoint using the data layer */
static inline void cs_shutw(struct conn_stream *cs) static inline void cs_shutw(struct conn_stream *cs)
{ {
cs->si->ops->shutw(cs->si); cs->ops->shutw(cs);
} }
/* This is to be used after making some room available in a channel. It will /* This is to be used after making some room available in a channel. It will
@ -303,13 +302,13 @@ static inline void cs_chk_rcv(struct conn_stream *cs)
return; return;
cs->si->flags |= SI_FL_RX_WAIT_EP; cs->si->flags |= SI_FL_RX_WAIT_EP;
cs->si->ops->chk_rcv(cs->si); cs->ops->chk_rcv(cs);
} }
/* Calls chk_snd on the endpoint using the data layer */ /* Calls chk_snd on the endpoint using the data layer */
static inline void cs_chk_snd(struct conn_stream *cs) static inline void cs_chk_snd(struct conn_stream *cs)
{ {
cs->si->ops->chk_snd(cs->si); cs->ops->chk_snd(cs);
} }
/* Combines both si_update_rx() and si_update_tx() at once */ /* Combines both si_update_rx() and si_update_tx() at once */

View File

@ -127,8 +127,9 @@ struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
cs_free(cs); cs_free(cs);
return NULL; return NULL;
} }
cs->app = &strm->obj_type; cs->app = &strm->obj_type;
cs->si->ops = &si_embedded_ops; cs->ops = &cs_app_embedded_ops;
cs->data_cb = NULL; cs->data_cb = NULL;
return cs; return cs;
} }
@ -186,7 +187,7 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
cs->wait_event.events = 0; cs->wait_event.events = 0;
} }
cs->si->ops = &si_conn_ops; cs->ops = &cs_app_conn_ops;
cs->data_cb = &si_conn_cb; cs->data_cb = &si_conn_cb;
} }
else if (cs_check(cs)) else if (cs_check(cs))
@ -205,7 +206,7 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
cs->endp->flags &= ~CS_EP_DETACHED; cs->endp->flags &= ~CS_EP_DETACHED;
appctx->owner = cs; appctx->owner = cs;
if (cs_strm(cs)) { if (cs_strm(cs)) {
cs->si->ops = &si_applet_ops; cs->ops = &cs_app_applet_ops;
cs->data_cb = NULL; cs->data_cb = NULL;
} }
} }
@ -231,15 +232,15 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
cs->wait_event.tasklet->context = cs->si; cs->wait_event.tasklet->context = cs->si;
cs->wait_event.events = 0; cs->wait_event.events = 0;
cs->si->ops = &si_conn_ops; cs->ops = &cs_app_conn_ops;
cs->data_cb = &si_conn_cb; cs->data_cb = &si_conn_cb;
} }
else if (cs->endp->flags & CS_EP_T_APPLET) { else if (cs->endp->flags & CS_EP_T_APPLET) {
cs->si->ops = &si_applet_ops; cs->ops = &cs_app_applet_ops;
cs->data_cb = NULL; cs->data_cb = NULL;
} }
else { else {
cs->si->ops = &si_embedded_ops; cs->ops = &cs_app_embedded_ops;
cs->data_cb = NULL; cs->data_cb = NULL;
} }
return 0; return 0;
@ -300,7 +301,7 @@ void cs_detach_endp(struct conn_stream *cs)
*/ */
cs->flags &= CS_FL_ISBACK; cs->flags &= CS_FL_ISBACK;
if (cs->si) if (cs->si)
cs->si->ops = &si_embedded_ops; cs->ops = &cs_app_embedded_ops;
cs->data_cb = NULL; cs->data_cb = NULL;
if (cs->app == NULL) if (cs->app == NULL)

View File

@ -23,6 +23,8 @@
#include <haproxy/applet.h> #include <haproxy/applet.h>
#include <haproxy/channel.h> #include <haproxy/channel.h>
#include <haproxy/connection.h> #include <haproxy/connection.h>
#include <haproxy/conn_stream.h>
#include <haproxy/cs_utils.h>
#include <haproxy/dynbuf.h> #include <haproxy/dynbuf.h>
#include <haproxy/http_ana.h> #include <haproxy/http_ana.h>
#include <haproxy/http_htx.h> #include <haproxy/http_htx.h>
@ -40,23 +42,23 @@
DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface)); DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface));
/* functions used by default on a detached stream-interface */ /* functions used by default on a detached conn-stream */
static void stream_int_shutr(struct stream_interface *si); static void cs_app_shutr(struct conn_stream *cs);
static void stream_int_shutw(struct stream_interface *si); static void cs_app_shutw(struct conn_stream *cs);
static void stream_int_chk_rcv(struct stream_interface *si); static void cs_app_chk_rcv(struct conn_stream *cs);
static void stream_int_chk_snd(struct stream_interface *si); static void cs_app_chk_snd(struct conn_stream *cs);
/* functions used on a conn_stream-based stream-interface */ /* functions used on a mux-based conn-stream */
static void stream_int_shutr_conn(struct stream_interface *si); static void cs_app_shutr_conn(struct conn_stream *cs);
static void stream_int_shutw_conn(struct stream_interface *si); static void cs_app_shutw_conn(struct conn_stream *cs);
static void stream_int_chk_rcv_conn(struct stream_interface *si); static void cs_app_chk_rcv_conn(struct conn_stream *cs);
static void stream_int_chk_snd_conn(struct stream_interface *si); static void cs_app_chk_snd_conn(struct conn_stream *cs);
/* functions used on an applet-based stream-interface */ /* functions used on an applet-based conn-stream */
static void stream_int_shutr_applet(struct stream_interface *si); static void cs_app_shutr_applet(struct conn_stream *cs);
static void stream_int_shutw_applet(struct stream_interface *si); static void cs_app_shutw_applet(struct conn_stream *cs);
static void stream_int_chk_rcv_applet(struct stream_interface *si); static void cs_app_chk_rcv_applet(struct conn_stream *cs);
static void stream_int_chk_snd_applet(struct stream_interface *si); static void cs_app_chk_snd_applet(struct conn_stream *cs);
/* last read notification */ /* last read notification */
static void stream_int_read0(struct stream_interface *si); static void stream_int_read0(struct stream_interface *si);
@ -64,31 +66,31 @@ static void stream_int_read0(struct stream_interface *si);
/* post-IO notification callback */ /* post-IO notification callback */
static void stream_int_notify(struct stream_interface *si); static void stream_int_notify(struct stream_interface *si);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = { /* conn-stream operations for connections */
.chk_rcv = stream_int_chk_rcv, struct cs_app_ops cs_app_conn_ops = {
.chk_snd = stream_int_chk_snd, .chk_rcv = cs_app_chk_rcv_conn,
.shutr = stream_int_shutr, .chk_snd = cs_app_chk_snd_conn,
.shutw = stream_int_shutw, .shutr = cs_app_shutr_conn,
.shutw = cs_app_shutw_conn,
}; };
/* stream-interface operations for connections */ /* conn-stream operations for embedded tasks */
struct si_ops si_conn_ops = { struct cs_app_ops cs_app_embedded_ops = {
.chk_rcv = stream_int_chk_rcv_conn, .chk_rcv = cs_app_chk_rcv,
.chk_snd = stream_int_chk_snd_conn, .chk_snd = cs_app_chk_snd,
.shutr = stream_int_shutr_conn, .shutr = cs_app_shutr,
.shutw = stream_int_shutw_conn, .shutw = cs_app_shutw,
}; };
/* stream-interface operations for connections */ /* conn-stream operations for connections */
struct si_ops si_applet_ops = { struct cs_app_ops cs_app_applet_ops = {
.chk_rcv = stream_int_chk_rcv_applet, .chk_rcv = cs_app_chk_rcv_applet,
.chk_snd = stream_int_chk_snd_applet, .chk_snd = cs_app_chk_snd_applet,
.shutr = stream_int_shutr_applet, .shutr = cs_app_shutr_applet,
.shutw = stream_int_shutw_applet, .shutw = cs_app_shutw_applet,
}; };
/* Functions used to communicate with a conn_stream. The first two may be used /* Functions used to communicate with a conn_stream. The first two may be used
* directly, the last one is mostly a wake callback. * directly, the last one is mostly a wake callback.
*/ */
@ -127,65 +129,66 @@ void si_free(struct stream_interface *si)
pool_free(pool_head_streaminterface, si); pool_free(pool_head_streaminterface, si);
} }
/* /*
* This function performs a shutdown-read on a detached stream interface in a * This function performs a shutdown-read on a detached conn-stream in a
* connected or init state (it does nothing for other states). It either shuts * connected or init state (it does nothing for other states). It either shuts
* the read side or marks itself as closed. The buffer flags are updated to * the read side or marks itself as closed. The buffer flags are updated to
* reflect the new state. If the stream interface has CS_FL_NOHALF, we also * reflect the new state. If the stream interface has CS_FL_NOHALF, we also
* forward the close to the write side. The owner task is woken up if it exists. * forward the close to the write side. The owner task is woken up if it exists.
*/ */
static void stream_int_shutr(struct stream_interface *si) static void cs_app_shutr(struct conn_stream *cs)
{ {
struct channel *ic = si_ic(si); struct channel *ic = cs_ic(cs);
si_rx_shut_blk(si); si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR) if (ic->flags & CF_SHUTR)
return; return;
ic->flags |= CF_SHUTR; ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY; ic->rex = TICK_ETERNITY;
if (!cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return; return;
if (si_oc(si)->flags & CF_SHUTW) { if (cs_oc(cs)->flags & CF_SHUTW) {
si->cs->state = CS_ST_DIS; cs->state = CS_ST_DIS;
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY;
} }
else if (si->cs->flags & CS_FL_NOHALF) { else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */ /* we want to immediately forward this close to the write side */
return stream_int_shutw(si); return cs_app_shutw(cs);
} }
/* note that if the task exists, it must unregister itself once it runs */ /* note that if the task exists, it must unregister itself once it runs */
if (!(si->cs->flags & CS_FL_DONT_WAKE)) if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO); task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
} }
/* /*
* This function performs a shutdown-write on a detached stream interface in a * This function performs a shutdown-write on a detached conn-stream in a
* connected or init state (it does nothing for other states). It either shuts * connected or init state (it does nothing for other states). It either shuts
* the write side or marks itself as closed. The buffer flags are updated to * the write side or marks itself as closed. The buffer flags are updated to
* reflect the new state. It does also close everything if the SI was marked as * reflect the new state. It does also close everything if the SI was marked as
* being in error state. The owner task is woken up if it exists. * being in error state. The owner task is woken up if it exists.
*/ */
static void stream_int_shutw(struct stream_interface *si) static void cs_app_shutw(struct conn_stream *cs)
{ {
struct channel *ic = si_ic(si); struct channel *ic = cs_ic(cs);
struct channel *oc = si_oc(si); struct channel *oc = cs_oc(cs);
oc->flags &= ~CF_SHUTW_NOW; oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW) if (oc->flags & CF_SHUTW)
return; return;
oc->flags |= CF_SHUTW; oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY; oc->wex = TICK_ETERNITY;
si_done_get(si); si_done_get(cs->si);
if (tick_isset(si->cs->hcto)) { if (tick_isset(cs->hcto)) {
ic->rto = si->cs->hcto; ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto); ic->rex = tick_add(now_ms, ic->rto);
} }
switch (si->cs->state) { switch (cs->state) {
case CS_ST_RDY: case CS_ST_RDY:
case CS_ST_EST: case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages /* we have to shut before closing, otherwise some short messages
@ -194,7 +197,7 @@ static void stream_int_shutw(struct stream_interface *si)
* However, if CS_FL_NOLINGER is explicitly set, we know there is * However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately. * no risk so we close both sides immediately.
*/ */
if (!(si->cs->endp->flags & CS_EP_ERROR) && !(si->cs->flags & CS_FL_NOLINGER) && if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ))) !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return; return;
@ -204,66 +207,66 @@ static void stream_int_shutw(struct stream_interface *si)
case CS_ST_QUE: case CS_ST_QUE:
case CS_ST_TAR: case CS_ST_TAR:
/* Note that none of these states may happen with applets */ /* Note that none of these states may happen with applets */
si->cs->state = CS_ST_DIS; cs->state = CS_ST_DIS;
/* fall through */ /* fall through */
default: default:
si->cs->flags &= ~CS_FL_NOLINGER; cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(si); si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR; ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY; ic->rex = TICK_ETERNITY;
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY;
} }
/* note that if the task exists, it must unregister itself once it runs */ /* note that if the task exists, it must unregister itself once it runs */
if (!(si->cs->flags & CS_FL_DONT_WAKE)) if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO); task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
} }
/* default chk_rcv function for scheduled tasks */ /* default chk_rcv function for scheduled tasks */
static void stream_int_chk_rcv(struct stream_interface *si) static void cs_app_chk_rcv(struct conn_stream *cs)
{ {
struct channel *ic = si_ic(si); struct channel *ic = cs_ic(cs);
DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__, __FUNCTION__,
si, si->cs->state, ic->flags, si_oc(si)->flags); cs, cs->state, ic->flags, cs_oc(cs)->flags);
if (ic->pipe) { if (ic->pipe) {
/* stop reading */ /* stop reading */
si_rx_room_blk(si); si_rx_room_blk(cs->si);
} }
else { else {
/* (re)start reading */ /* (re)start reading */
if (!(si->cs->flags & CS_FL_DONT_WAKE)) if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO); task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
} }
} }
/* default chk_snd function for scheduled tasks */ /* default chk_snd function for scheduled tasks */
static void stream_int_chk_snd(struct stream_interface *si) static void cs_app_chk_snd(struct conn_stream *cs)
{ {
struct channel *oc = si_oc(si); struct channel *oc = cs_oc(cs);
DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__, __FUNCTION__,
si, si->cs->state, si_ic(si)->flags, oc->flags); cs, cs->state, cs_ic(cs)->flags, oc->flags);
if (unlikely(si->cs->state != CS_ST_EST || (oc->flags & CF_SHUTW))) if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
return; return;
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */ if (!(cs->si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
channel_is_empty(oc)) /* called with nothing to send ! */ channel_is_empty(oc)) /* called with nothing to send ! */
return; return;
/* Otherwise there are remaining data to be sent in the buffer, /* Otherwise there are remaining data to be sent in the buffer,
* so we tell the handler. * so we tell the handler.
*/ */
si->flags &= ~SI_FL_WAIT_DATA; cs->si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex)) if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto); oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!(si->cs->flags & CS_FL_DONT_WAKE)) if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO); task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
} }
/* Register an applet to handle a stream_interface as a new appctx. The SI will /* Register an applet to handle a stream_interface as a new appctx. The SI will
@ -980,7 +983,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
} }
/* /*
* This function performs a shutdown-read on a stream interface attached to * This function performs a shutdown-read on a conn-stream attached to
* a connection in a connected or init state (it does nothing for other * a connection in a connected or init state (it does nothing for other
* states). It either shuts the read side or marks itself as closed. The buffer * states). It either shuts the read side or marks itself as closed. The buffer
* flags are updated to reflect the new state. If the stream interface has * flags are updated to reflect the new state. If the stream interface has
@ -989,14 +992,13 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
* descriptors are then shutdown or closed accordingly. The function * descriptors are then shutdown or closed accordingly. The function
* automatically disables polling if needed. * automatically disables polling if needed.
*/ */
static void stream_int_shutr_conn(struct stream_interface *si) static void cs_app_shutr_conn(struct conn_stream *cs)
{ {
struct conn_stream *cs = si->cs; struct channel *ic = cs_ic(cs);
struct channel *ic = si_ic(si);
BUG_ON(!cs_conn(cs)); BUG_ON(!cs_conn(cs));
si_rx_shut_blk(si); si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR) if (ic->flags & CF_SHUTR)
return; return;
ic->flags |= CF_SHUTR; ic->flags |= CF_SHUTR;
@ -1005,30 +1007,29 @@ static void stream_int_shutr_conn(struct stream_interface *si)
if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return; return;
if (si_oc(si)->flags & CF_SHUTW) { if (cs_oc(cs)->flags & CF_SHUTW) {
cs_conn_close(cs); cs_conn_close(cs);
cs->state = CS_ST_DIS; cs->state = CS_ST_DIS;
__cs_strm(cs)->conn_exp = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY;
} }
else if (si->cs->flags & CS_FL_NOHALF) { else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */ /* we want to immediately forward this close to the write side */
return stream_int_shutw_conn(si); return cs_app_shutw_conn(cs);
} }
} }
/* /*
* This function performs a shutdown-write on a stream interface attached to * This function performs a shutdown-write on a conn-stream attached to
* a connection in a connected or init state (it does nothing for other * a connection in a connected or init state (it does nothing for other
* states). It either shuts the write side or marks itself as closed. The * states). It either shuts the write side or marks itself as closed. The
* buffer flags are updated to reflect the new state. It does also close * buffer flags are updated to reflect the new state. It does also close
* everything if the SI was marked as being in error state. If there is a * everything if the SI was marked as being in error state. If there is a
* data-layer shutdown, it is called. * data-layer shutdown, it is called.
*/ */
static void stream_int_shutw_conn(struct stream_interface *si) static void cs_app_shutw_conn(struct conn_stream *cs)
{ {
struct conn_stream *cs = si->cs; struct channel *ic = cs_ic(cs);
struct channel *ic = si_ic(si); struct channel *oc = cs_oc(cs);
struct channel *oc = si_oc(si);
BUG_ON(!cs_conn(cs)); BUG_ON(!cs_conn(cs));
@ -1037,10 +1038,10 @@ static void stream_int_shutw_conn(struct stream_interface *si)
return; return;
oc->flags |= CF_SHUTW; oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY; oc->wex = TICK_ETERNITY;
si_done_get(si); si_done_get(cs->si);
if (tick_isset(si->cs->hcto)) { if (tick_isset(cs->hcto)) {
ic->rto = si->cs->hcto; ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto); ic->rex = tick_add(now_ms, ic->rto);
} }
@ -1092,36 +1093,37 @@ static void stream_int_shutw_conn(struct stream_interface *si)
/* fall through */ /* fall through */
default: default:
cs->flags &= ~CS_FL_NOLINGER; cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(si); si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR; ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY; ic->rex = TICK_ETERNITY;
__cs_strm(cs)->conn_exp = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY;
} }
} }
/* This function is used for inter-stream-interface calls. It is called by the /* This function is used for inter-conn-stream calls. It is called by the
* consumer to inform the producer side that it may be interested in checking * consumer to inform the producer side that it may be interested in checking
* for free space in the buffer. Note that it intentionally does not update * for free space in the buffer. Note that it intentionally does not update
* timeouts, so that we can still check them later at wake-up. This function is * timeouts, so that we can still check them later at wake-up. This function is
* dedicated to connection-based stream interfaces. * dedicated to connection-based stream interfaces.
*/ */
static void stream_int_chk_rcv_conn(struct stream_interface *si) static void cs_app_chk_rcv_conn(struct conn_stream *cs)
{ {
BUG_ON(!cs_conn(cs));
/* (re)start reading */ /* (re)start reading */
if (cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) if (cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
tasklet_wakeup(si->cs->wait_event.tasklet); tasklet_wakeup(cs->wait_event.tasklet);
} }
/* This function is used for inter-stream-interface calls. It is called by the /* This function is used for inter-conn-stream calls. It is called by the
* producer to inform the consumer side that it may be interested in checking * producer to inform the consumer side that it may be interested in checking
* for data in the buffer. Note that it intentionally does not update timeouts, * for data in the buffer. Note that it intentionally does not update timeouts,
* so that we can still check them later at wake-up. * so that we can still check them later at wake-up.
*/ */
static void stream_int_chk_snd_conn(struct stream_interface *si) static void cs_app_chk_snd_conn(struct conn_stream *cs)
{ {
struct channel *oc = si_oc(si); struct channel *oc = cs_oc(cs);
struct conn_stream *cs = si->cs;
BUG_ON(!cs_conn(cs)); BUG_ON(!cs_conn(cs));
@ -1133,13 +1135,13 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
return; return;
if (!oc->pipe && /* spliced data wants to be forwarded ASAP */ if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */ !(cs->si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return; return;
if (!(si->cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
si_cs_send(cs); si_cs_send(cs);
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) { if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
/* Write error on the file descriptor */ /* Write error on the file descriptor */
if (cs->state >= CS_ST_CON) if (cs->state >= CS_ST_CON)
cs->endp->flags |= CS_EP_ERROR; cs->endp->flags |= CS_EP_ERROR;
@ -1626,21 +1628,22 @@ void si_applet_wake_cb(struct stream_interface *si)
appctx_wakeup(__cs_appctx(si->cs)); appctx_wakeup(__cs_appctx(si->cs));
} }
/* /*
* This function performs a shutdown-read on a stream interface attached to an * This function performs a shutdown-read on a conn-stream attached to an
* applet in a connected or init state (it does nothing for other states). It * applet in a connected or init state (it does nothing for other states). It
* either shuts the read side or marks itself as closed. The buffer flags are * either shuts the read side or marks itself as closed. The buffer flags are
* updated to reflect the new state. If the stream interface has CS_FL_NOHALF, * updated to reflect the new state. If the stream interface has CS_FL_NOHALF,
* we also forward the close to the write side. The owner task is woken up if * we also forward the close to the write side. The owner task is woken up if
* it exists. * it exists.
*/ */
static void stream_int_shutr_applet(struct stream_interface *si) static void cs_app_shutr_applet(struct conn_stream *cs)
{ {
struct channel *ic = si_ic(si); struct channel *ic = cs_ic(cs);
BUG_ON(!cs_appctx(si->cs)); BUG_ON(!cs_appctx(cs));
si_rx_shut_blk(si); si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR) if (ic->flags & CF_SHUTR)
return; return;
ic->flags |= CF_SHUTR; ic->flags |= CF_SHUTR;
@ -1648,50 +1651,50 @@ static void stream_int_shutr_applet(struct stream_interface *si)
/* Note: on shutr, we don't call the applet */ /* Note: on shutr, we don't call the applet */
if (!cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return; return;
if (si_oc(si)->flags & CF_SHUTW) { if (cs_oc(cs)->flags & CF_SHUTW) {
si_applet_release(si); si_applet_release(cs->si);
si->cs->state = CS_ST_DIS; cs->state = CS_ST_DIS;
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY;
} }
else if (si->cs->flags & CS_FL_NOHALF) { else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */ /* we want to immediately forward this close to the write side */
return stream_int_shutw_applet(si); return cs_app_shutw_applet(cs);
} }
} }
/* /*
* This function performs a shutdown-write on a stream interface attached to an * This function performs a shutdown-write on a conn-stream attached to an
* applet in a connected or init state (it does nothing for other states). It * applet in a connected or init state (it does nothing for other states). It
* either shuts the write side or marks itself as closed. The buffer flags are * either shuts the write side or marks itself as closed. The buffer flags are
* updated to reflect the new state. It does also close everything if the SI * updated to reflect the new state. It does also close everything if the SI
* was marked as being in error state. The owner task is woken up if it exists. * was marked as being in error state. The owner task is woken up if it exists.
*/ */
static void stream_int_shutw_applet(struct stream_interface *si) static void cs_app_shutw_applet(struct conn_stream *cs)
{ {
struct channel *ic = si_ic(si); struct channel *ic = cs_ic(cs);
struct channel *oc = si_oc(si); struct channel *oc = cs_oc(cs);
BUG_ON(!cs_appctx(si->cs)); BUG_ON(!cs_appctx(cs));
oc->flags &= ~CF_SHUTW_NOW; oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW) if (oc->flags & CF_SHUTW)
return; return;
oc->flags |= CF_SHUTW; oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY; oc->wex = TICK_ETERNITY;
si_done_get(si); si_done_get(cs->si);
if (tick_isset(si->cs->hcto)) { if (tick_isset(cs->hcto)) {
ic->rto = si->cs->hcto; ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto); ic->rex = tick_add(now_ms, ic->rto);
} }
/* on shutw we always wake the applet up */ /* on shutw we always wake the applet up */
appctx_wakeup(__cs_appctx(si->cs)); appctx_wakeup(__cs_appctx(cs));
switch (si->cs->state) { switch (cs->state) {
case CS_ST_RDY: case CS_ST_RDY:
case CS_ST_EST: case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages /* we have to shut before closing, otherwise some short messages
@ -1700,7 +1703,7 @@ static void stream_int_shutw_applet(struct stream_interface *si)
* However, if CS_FL_NOLINGER is explicitly set, we know there is * However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately. * no risk so we close both sides immediately.
*/ */
if (!(si->cs->endp->flags & CS_EP_ERROR) && !(si->cs->flags & CS_FL_NOLINGER) && if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ))) !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return; return;
@ -1710,52 +1713,52 @@ static void stream_int_shutw_applet(struct stream_interface *si)
case CS_ST_QUE: case CS_ST_QUE:
case CS_ST_TAR: case CS_ST_TAR:
/* Note that none of these states may happen with applets */ /* Note that none of these states may happen with applets */
si_applet_release(si); si_applet_release(cs->si);
si->cs->state = CS_ST_DIS; cs->state = CS_ST_DIS;
/* fall through */ /* fall through */
default: default:
si->cs->flags &= ~CS_FL_NOLINGER; cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(si); si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR; ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY; ic->rex = TICK_ETERNITY;
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY;
} }
} }
/* chk_rcv function for applets */ /* chk_rcv function for applets */
static void stream_int_chk_rcv_applet(struct stream_interface *si) static void cs_app_chk_rcv_applet(struct conn_stream *cs)
{ {
struct channel *ic = si_ic(si); struct channel *ic = cs_ic(cs);
BUG_ON(!cs_appctx(si->cs)); BUG_ON(!cs_appctx(cs));
DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__, __FUNCTION__,
si, si->cs->state, ic->flags, si_oc(si)->flags); cs, cs->state, ic->flags, cs_oc(cs)->flags);
if (!ic->pipe) { if (!ic->pipe) {
/* (re)start reading */ /* (re)start reading */
appctx_wakeup(__cs_appctx(si->cs)); appctx_wakeup(__cs_appctx(cs));
} }
} }
/* chk_snd function for applets */ /* chk_snd function for applets */
static void stream_int_chk_snd_applet(struct stream_interface *si) static void cs_app_chk_snd_applet(struct conn_stream *cs)
{ {
struct channel *oc = si_oc(si); struct channel *oc = cs_oc(cs);
BUG_ON(!cs_appctx(si->cs)); BUG_ON(!cs_appctx(cs));
DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__, __FUNCTION__,
si, si->cs->state, si_ic(si)->flags, oc->flags); cs, cs->state, cs_ic(cs)->flags, oc->flags);
if (unlikely(si->cs->state != CS_ST_EST || (oc->flags & CF_SHUTW))) if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
return; return;
/* we only wake the applet up if it was waiting for some data */ /* we only wake the applet up if it was waiting for some data */
if (!(si->flags & SI_FL_WAIT_DATA)) if (!(cs->si->flags & SI_FL_WAIT_DATA))
return; return;
if (!tick_isset(oc->wex)) if (!tick_isset(oc->wex))
@ -1763,7 +1766,7 @@ static void stream_int_chk_snd_applet(struct stream_interface *si)
if (!channel_is_empty(oc)) { if (!channel_is_empty(oc)) {
/* (re)start sending */ /* (re)start sending */
appctx_wakeup(__cs_appctx(si->cs)); appctx_wakeup(__cs_appctx(cs));
} }
} }