REORG: conn-stream: Move cs_app_ops in conn_stream.c

Callback functions to perform shutdown for reads and writes and to trigger
I/O calls are now moved in conn_stream.c.
This commit is contained in:
Christopher Faulet 2022-04-01 14:04:29 +02:00
parent 19bd728642
commit 9ffddd5ca5
3 changed files with 552 additions and 558 deletions

View File

@ -29,9 +29,6 @@
#include <haproxy/conn_stream.h>
#include <haproxy/obj_type.h>
extern struct cs_app_ops cs_app_embedded_ops;
extern struct cs_app_ops cs_app_conn_ops;
extern struct cs_app_ops cs_app_applet_ops;
extern struct data_cb si_conn_cb;
extern struct data_cb check_conn_cb;

View File

@ -21,6 +21,49 @@
DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
DECLARE_POOL(pool_head_cs_endpoint, "cs_endpoint", sizeof(struct cs_endpoint));
/* functions used by default on a detached conn-stream */
static void cs_app_shutr(struct conn_stream *cs);
static void cs_app_shutw(struct conn_stream *cs);
static void cs_app_chk_rcv(struct conn_stream *cs);
static void cs_app_chk_snd(struct conn_stream *cs);
/* functions used on a mux-based conn-stream */
static void cs_app_shutr_conn(struct conn_stream *cs);
static void cs_app_shutw_conn(struct conn_stream *cs);
static void cs_app_chk_rcv_conn(struct conn_stream *cs);
static void cs_app_chk_snd_conn(struct conn_stream *cs);
/* functions used on an applet-based conn-stream */
static void cs_app_shutr_applet(struct conn_stream *cs);
static void cs_app_shutw_applet(struct conn_stream *cs);
static void cs_app_chk_rcv_applet(struct conn_stream *cs);
static void cs_app_chk_snd_applet(struct conn_stream *cs);
/* conn-stream operations for connections */
struct cs_app_ops cs_app_conn_ops = {
.chk_rcv = cs_app_chk_rcv_conn,
.chk_snd = cs_app_chk_snd_conn,
.shutr = cs_app_shutr_conn,
.shutw = cs_app_shutw_conn,
};
/* conn-stream operations for embedded tasks */
struct cs_app_ops cs_app_embedded_ops = {
.chk_rcv = cs_app_chk_rcv,
.chk_snd = cs_app_chk_snd,
.shutr = cs_app_shutr,
.shutw = cs_app_shutw,
};
/* conn-stream operations for connections */
struct cs_app_ops cs_app_applet_ops = {
.chk_rcv = cs_app_chk_rcv_applet,
.chk_snd = cs_app_chk_snd_applet,
.shutr = cs_app_shutr_applet,
.shutw = cs_app_shutw_applet,
};
void cs_endpoint_init(struct cs_endpoint *endp)
{
endp->target = NULL;
@ -388,3 +431,512 @@ void cs_applet_release(struct conn_stream *cs)
if (appctx->applet->release && !cs_state_in(cs->state, CS_SB_DIS|CS_SB_CLO))
appctx->applet->release(appctx);
}
/*
* This function performs a shutdown-read on a detached conn-stream in a
* connected or init state (it does nothing for other states). It either shuts
* the read side or marks itself as closed. The buffer flags are updated to
* reflect the new state. If the stream interface has CS_FL_NOHALF, we also
* forward the close to the write side. The owner task is woken up if it exists.
*/
static void cs_app_shutr(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return;
if (cs_oc(cs)->flags & CF_SHUTW) {
cs->state = CS_ST_DIS;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return cs_app_shutw(cs);
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
/*
* This function performs a shutdown-write on a detached conn-stream in a
* connected or init state (it does nothing for other states). It either shuts
* the write side or marks itself as closed. The buffer flags are updated to
* reflect the new state. It does also close everything if the SI was marked as
* being in error state. The owner task is woken up if it exists.
*/
static void cs_app_shutw(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
struct channel *oc = cs_oc(cs);
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si_done_get(cs->si);
if (tick_isset(cs->hcto)) {
ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
switch (cs->state) {
case CS_ST_RDY:
case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
/* fall through */
case CS_ST_CON:
case CS_ST_CER:
case CS_ST_QUE:
case CS_ST_TAR:
/* Note that none of these states may happen with applets */
cs->state = CS_ST_DIS;
/* fall through */
default:
cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
/* default chk_rcv function for scheduled tasks */
static void cs_app_chk_rcv(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, ic->flags, cs_oc(cs)->flags);
if (ic->pipe) {
/* stop reading */
si_rx_room_blk(cs->si);
}
else {
/* (re)start reading */
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
}
/* default chk_snd function for scheduled tasks */
static void cs_app_chk_snd(struct conn_stream *cs)
{
struct channel *oc = cs_oc(cs);
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, cs_ic(cs)->flags, oc->flags);
if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
return;
if (!(cs->si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
channel_is_empty(oc)) /* called with nothing to send ! */
return;
/* Otherwise there are remaining data to be sent in the buffer,
* so we tell the handler.
*/
cs->si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
/*
* This function performs a shutdown-read on a conn-stream attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the read side or marks itself as closed. The buffer
* flags are updated to reflect the new state. If the stream interface has
* CS_FL_NOHALF, we also forward the close to the write side. If a control
* layer is defined, then it is supposed to be a socket layer and file
* descriptors are then shutdown or closed accordingly. The function
* automatically disables polling if needed.
*/
static void cs_app_shutr_conn(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
BUG_ON(!cs_conn(cs));
si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return;
if (cs_oc(cs)->flags & CF_SHUTW) {
cs_conn_close(cs);
cs->state = CS_ST_DIS;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return cs_app_shutw_conn(cs);
}
}
/*
* This function performs a shutdown-write on a conn-stream attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the write side or marks itself as closed. The
* buffer flags are updated to reflect the new state. It does also close
* everything if the SI was marked as being in error state. If there is a
* data-layer shutdown, it is called.
*/
static void cs_app_shutw_conn(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_conn(cs));
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si_done_get(cs->si);
if (tick_isset(cs->hcto)) {
ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
switch (cs->state) {
case CS_ST_RDY:
case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (cs->endp->flags & CS_EP_ERROR) {
/* quick close, the socket is already shut anyway */
}
else if (cs->flags & CS_FL_NOLINGER) {
/* unclean data-layer shutdown, typically an aborted request
* or a forwarded shutdown from a client to a server due to
* option abortonclose. No need for the TLS layer to try to
* emit a shutdown message.
*/
cs_conn_shutw(cs, CO_SHW_SILENT);
}
else {
/* clean data-layer shutdown. This only happens on the
* frontend side, or on the backend side when forwarding
* a client close in TCP mode or in HTTP TUNNEL mode
* while option abortonclose is set. We want the TLS
* layer to try to signal it to the peer before we close.
*/
cs_conn_shutw(cs, CO_SHW_NORMAL);
if (!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
}
/* fall through */
case CS_ST_CON:
/* we may have to close a pending connection, and mark the
* response buffer as shutr
*/
cs_conn_close(cs);
/* fall through */
case CS_ST_CER:
case CS_ST_QUE:
case CS_ST_TAR:
cs->state = CS_ST_DIS;
/* fall through */
default:
cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
}
/* This function is used for inter-conn-stream calls. It is called by the
* consumer to inform the producer side that it may be interested in checking
* for free space in the buffer. Note that it intentionally does not update
* timeouts, so that we can still check them later at wake-up. This function is
* dedicated to connection-based stream interfaces.
*/
static void cs_app_chk_rcv_conn(struct conn_stream *cs)
{
BUG_ON(!cs_conn(cs));
/* (re)start reading */
if (cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
tasklet_wakeup(cs->wait_event.tasklet);
}
/* This function is used for inter-conn-stream calls. It is called by the
* producer to inform the consumer side that it may be interested in checking
* for data in the buffer. Note that it intentionally does not update timeouts,
* so that we can still check them later at wake-up.
*/
static void cs_app_chk_snd_conn(struct conn_stream *cs)
{
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_conn(cs));
if (unlikely(!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
(oc->flags & CF_SHUTW)))
return;
if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
return;
if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
!(cs->si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
si_cs_send(cs);
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
/* Write error on the file descriptor */
if (cs->state >= CS_ST_CON)
cs->endp->flags |= CS_EP_ERROR;
goto out_wakeup;
}
/* OK, so now we know that some data might have been sent, and that we may
* have to poll first. We have to do that too if the buffer is not empty.
*/
if (channel_is_empty(oc)) {
/* the connection is established but we can't write. Either the
* buffer is empty, or we just refrain from sending because the
* ->o limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
cs_state_in(cs->state, CS_SB_RDY|CS_SB_EST)) {
cs_shutw(cs);
goto out_wakeup;
}
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
cs->si->flags |= SI_FL_WAIT_DATA;
oc->wex = TICK_ETERNITY;
}
else {
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
cs->si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
}
if (likely(oc->flags & CF_WRITE_ACTIVITY)) {
struct channel *ic = cs_ic(cs);
/* update timeout if we have written something */
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
!channel_is_empty(oc))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
/* Note: to prevent the client from expiring read timeouts
* during writes, we refresh it. We only do this if the
* interface is not configured for "independent streams",
* because for some applications it's better not to do this,
* for instance when continuously exchanging small amounts
* of data which can full the socket buffers long before a
* write timeout is detected.
*/
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
/* in case of special condition (error, shutdown, end of write...), we
* have to notify the task.
*/
if (likely((oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
((oc->flags & CF_WAKE_WRITE) &&
((channel_is_empty(oc) && !oc->to_forward) ||
!cs_state_in(cs->state, CS_SB_EST))))) {
out_wakeup:
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
}
/*
* This function performs a shutdown-read on a conn-stream attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the read side or marks itself as closed. The buffer flags are
* updated to reflect the new state. If the stream interface has CS_FL_NOHALF,
* we also forward the close to the write side. The owner task is woken up if
* it exists.
*/
static void cs_app_shutr_applet(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
BUG_ON(!cs_appctx(cs));
si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
/* Note: on shutr, we don't call the applet */
if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return;
if (cs_oc(cs)->flags & CF_SHUTW) {
cs_applet_release(cs);
cs->state = CS_ST_DIS;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return cs_app_shutw_applet(cs);
}
}
/*
* This function performs a shutdown-write on a conn-stream attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the write side or marks itself as closed. The buffer flags are
* updated to reflect the new state. It does also close everything if the SI
* was marked as being in error state. The owner task is woken up if it exists.
*/
static void cs_app_shutw_applet(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_appctx(cs));
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si_done_get(cs->si);
if (tick_isset(cs->hcto)) {
ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
/* on shutw we always wake the applet up */
appctx_wakeup(__cs_appctx(cs));
switch (cs->state) {
case CS_ST_RDY:
case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
/* fall through */
case CS_ST_CON:
case CS_ST_CER:
case CS_ST_QUE:
case CS_ST_TAR:
/* Note that none of these states may happen with applets */
cs_applet_release(cs);
cs->state = CS_ST_DIS;
/* fall through */
default:
cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
}
/* chk_rcv function for applets */
static void cs_app_chk_rcv_applet(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
BUG_ON(!cs_appctx(cs));
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, ic->flags, cs_oc(cs)->flags);
if (!ic->pipe) {
/* (re)start reading */
appctx_wakeup(__cs_appctx(cs));
}
}
/* chk_snd function for applets */
static void cs_app_chk_snd_applet(struct conn_stream *cs)
{
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_appctx(cs));
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, cs_ic(cs)->flags, oc->flags);
if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
return;
/* we only wake the applet up if it was waiting for some data */
if (!(cs->si->flags & SI_FL_WAIT_DATA))
return;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!channel_is_empty(oc)) {
/* (re)start sending */
appctx_wakeup(__cs_appctx(cs));
}
}

View File

@ -41,56 +41,12 @@
DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface));
/* functions used by default on a detached conn-stream */
static void cs_app_shutr(struct conn_stream *cs);
static void cs_app_shutw(struct conn_stream *cs);
static void cs_app_chk_rcv(struct conn_stream *cs);
static void cs_app_chk_snd(struct conn_stream *cs);
/* functions used on a mux-based conn-stream */
static void cs_app_shutr_conn(struct conn_stream *cs);
static void cs_app_shutw_conn(struct conn_stream *cs);
static void cs_app_chk_rcv_conn(struct conn_stream *cs);
static void cs_app_chk_snd_conn(struct conn_stream *cs);
/* functions used on an applet-based conn-stream */
static void cs_app_shutr_applet(struct conn_stream *cs);
static void cs_app_shutw_applet(struct conn_stream *cs);
static void cs_app_chk_rcv_applet(struct conn_stream *cs);
static void cs_app_chk_snd_applet(struct conn_stream *cs);
/* last read notification */
static void stream_int_read0(struct stream_interface *si);
/* post-IO notification callback */
static void stream_int_notify(struct stream_interface *si);
/* conn-stream operations for connections */
struct cs_app_ops cs_app_conn_ops = {
.chk_rcv = cs_app_chk_rcv_conn,
.chk_snd = cs_app_chk_snd_conn,
.shutr = cs_app_shutr_conn,
.shutw = cs_app_shutw_conn,
};
/* conn-stream operations for embedded tasks */
struct cs_app_ops cs_app_embedded_ops = {
.chk_rcv = cs_app_chk_rcv,
.chk_snd = cs_app_chk_snd,
.shutr = cs_app_shutr,
.shutw = cs_app_shutw,
};
/* conn-stream operations for connections */
struct cs_app_ops cs_app_applet_ops = {
.chk_rcv = cs_app_chk_rcv_applet,
.chk_snd = cs_app_chk_snd_applet,
.shutr = cs_app_shutr_applet,
.shutw = cs_app_shutw_applet,
};
struct data_cb si_conn_cb = {
.wake = si_cs_process,
.name = "STRM",
@ -121,146 +77,6 @@ void si_free(struct stream_interface *si)
pool_free(pool_head_streaminterface, si);
}
/*
* This function performs a shutdown-read on a detached conn-stream in a
* connected or init state (it does nothing for other states). It either shuts
* the read side or marks itself as closed. The buffer flags are updated to
* reflect the new state. If the stream interface has CS_FL_NOHALF, we also
* forward the close to the write side. The owner task is woken up if it exists.
*/
static void cs_app_shutr(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return;
if (cs_oc(cs)->flags & CF_SHUTW) {
cs->state = CS_ST_DIS;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return cs_app_shutw(cs);
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
/*
* This function performs a shutdown-write on a detached conn-stream in a
* connected or init state (it does nothing for other states). It either shuts
* the write side or marks itself as closed. The buffer flags are updated to
* reflect the new state. It does also close everything if the SI was marked as
* being in error state. The owner task is woken up if it exists.
*/
static void cs_app_shutw(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
struct channel *oc = cs_oc(cs);
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si_done_get(cs->si);
if (tick_isset(cs->hcto)) {
ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
switch (cs->state) {
case CS_ST_RDY:
case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
/* fall through */
case CS_ST_CON:
case CS_ST_CER:
case CS_ST_QUE:
case CS_ST_TAR:
/* Note that none of these states may happen with applets */
cs->state = CS_ST_DIS;
/* fall through */
default:
cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
/* note that if the task exists, it must unregister itself once it runs */
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
/* default chk_rcv function for scheduled tasks */
static void cs_app_chk_rcv(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, ic->flags, cs_oc(cs)->flags);
if (ic->pipe) {
/* stop reading */
si_rx_room_blk(cs->si);
}
else {
/* (re)start reading */
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
}
/* default chk_snd function for scheduled tasks */
static void cs_app_chk_snd(struct conn_stream *cs)
{
struct channel *oc = cs_oc(cs);
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, cs_ic(cs)->flags, oc->flags);
if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
return;
if (!(cs->si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
channel_is_empty(oc)) /* called with nothing to send ! */
return;
/* Otherwise there are remaining data to be sent in the buffer,
* so we tell the handler.
*/
cs->si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
/* This function is the equivalent to si_update() except that it's
* designed to be called from outside the stream handlers, typically the lower
* layers (applets, connections) after I/O completion. After updating the stream
@ -837,235 +653,6 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
appctx_wakeup(__cs_appctx(si_b->cs));
}
/*
* This function performs a shutdown-read on a conn-stream attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the read side or marks itself as closed. The buffer
* flags are updated to reflect the new state. If the stream interface has
* CS_FL_NOHALF, we also forward the close to the write side. If a control
* layer is defined, then it is supposed to be a socket layer and file
* descriptors are then shutdown or closed accordingly. The function
* automatically disables polling if needed.
*/
static void cs_app_shutr_conn(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
BUG_ON(!cs_conn(cs));
si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return;
if (cs_oc(cs)->flags & CF_SHUTW) {
cs_conn_close(cs);
cs->state = CS_ST_DIS;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return cs_app_shutw_conn(cs);
}
}
/*
* This function performs a shutdown-write on a conn-stream attached to
* a connection in a connected or init state (it does nothing for other
* states). It either shuts the write side or marks itself as closed. The
* buffer flags are updated to reflect the new state. It does also close
* everything if the SI was marked as being in error state. If there is a
* data-layer shutdown, it is called.
*/
static void cs_app_shutw_conn(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_conn(cs));
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si_done_get(cs->si);
if (tick_isset(cs->hcto)) {
ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
switch (cs->state) {
case CS_ST_RDY:
case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (cs->endp->flags & CS_EP_ERROR) {
/* quick close, the socket is already shut anyway */
}
else if (cs->flags & CS_FL_NOLINGER) {
/* unclean data-layer shutdown, typically an aborted request
* or a forwarded shutdown from a client to a server due to
* option abortonclose. No need for the TLS layer to try to
* emit a shutdown message.
*/
cs_conn_shutw(cs, CO_SHW_SILENT);
}
else {
/* clean data-layer shutdown. This only happens on the
* frontend side, or on the backend side when forwarding
* a client close in TCP mode or in HTTP TUNNEL mode
* while option abortonclose is set. We want the TLS
* layer to try to signal it to the peer before we close.
*/
cs_conn_shutw(cs, CO_SHW_NORMAL);
if (!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
}
/* fall through */
case CS_ST_CON:
/* we may have to close a pending connection, and mark the
* response buffer as shutr
*/
cs_conn_close(cs);
/* fall through */
case CS_ST_CER:
case CS_ST_QUE:
case CS_ST_TAR:
cs->state = CS_ST_DIS;
/* fall through */
default:
cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
}
/* This function is used for inter-conn-stream calls. It is called by the
* consumer to inform the producer side that it may be interested in checking
* for free space in the buffer. Note that it intentionally does not update
* timeouts, so that we can still check them later at wake-up. This function is
* dedicated to connection-based stream interfaces.
*/
static void cs_app_chk_rcv_conn(struct conn_stream *cs)
{
BUG_ON(!cs_conn(cs));
/* (re)start reading */
if (cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
tasklet_wakeup(cs->wait_event.tasklet);
}
/* This function is used for inter-conn-stream calls. It is called by the
* producer to inform the consumer side that it may be interested in checking
* for data in the buffer. Note that it intentionally does not update timeouts,
* so that we can still check them later at wake-up.
*/
static void cs_app_chk_snd_conn(struct conn_stream *cs)
{
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_conn(cs));
if (unlikely(!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
(oc->flags & CF_SHUTW)))
return;
if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
return;
if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
!(cs->si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
si_cs_send(cs);
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
/* Write error on the file descriptor */
if (cs->state >= CS_ST_CON)
cs->endp->flags |= CS_EP_ERROR;
goto out_wakeup;
}
/* OK, so now we know that some data might have been sent, and that we may
* have to poll first. We have to do that too if the buffer is not empty.
*/
if (channel_is_empty(oc)) {
/* the connection is established but we can't write. Either the
* buffer is empty, or we just refrain from sending because the
* ->o limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
cs_state_in(cs->state, CS_SB_RDY|CS_SB_EST)) {
cs_shutw(cs);
goto out_wakeup;
}
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
cs->si->flags |= SI_FL_WAIT_DATA;
oc->wex = TICK_ETERNITY;
}
else {
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
cs->si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
}
if (likely(oc->flags & CF_WRITE_ACTIVITY)) {
struct channel *ic = cs_ic(cs);
/* update timeout if we have written something */
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
!channel_is_empty(oc))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
/* Note: to prevent the client from expiring read timeouts
* during writes, we refresh it. We only do this if the
* interface is not configured for "independent streams",
* because for some applications it's better not to do this,
* for instance when continuously exchanging small amounts
* of data which can full the socket buffers long before a
* write timeout is detected.
*/
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
/* in case of special condition (error, shutdown, end of write...), we
* have to notify the task.
*/
if (likely((oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
((oc->flags & CF_WAKE_WRITE) &&
((channel_is_empty(oc) && !oc->to_forward) ||
!cs_state_in(cs->state, CS_SB_EST))))) {
out_wakeup:
if (!(cs->flags & CS_FL_DONT_WAKE))
task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
}
}
/*
* This is the callback which is called by the connection layer to receive data
* into the buffer from the connection. It iterates over the mux layer's
@ -1483,148 +1070,6 @@ void si_applet_wake_cb(struct stream_interface *si)
appctx_wakeup(__cs_appctx(si->cs));
}
/*
* This function performs a shutdown-read on a conn-stream attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the read side or marks itself as closed. The buffer flags are
* updated to reflect the new state. If the stream interface has CS_FL_NOHALF,
* we also forward the close to the write side. The owner task is woken up if
* it exists.
*/
static void cs_app_shutr_applet(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
BUG_ON(!cs_appctx(cs));
si_rx_shut_blk(cs->si);
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
/* Note: on shutr, we don't call the applet */
if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
return;
if (cs_oc(cs)->flags & CF_SHUTW) {
cs_applet_release(cs);
cs->state = CS_ST_DIS;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
else if (cs->flags & CS_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
return cs_app_shutw_applet(cs);
}
}
/*
* This function performs a shutdown-write on a conn-stream attached to an
* applet in a connected or init state (it does nothing for other states). It
* either shuts the write side or marks itself as closed. The buffer flags are
* updated to reflect the new state. It does also close everything if the SI
* was marked as being in error state. The owner task is woken up if it exists.
*/
static void cs_app_shutw_applet(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_appctx(cs));
oc->flags &= ~CF_SHUTW_NOW;
if (oc->flags & CF_SHUTW)
return;
oc->flags |= CF_SHUTW;
oc->wex = TICK_ETERNITY;
si_done_get(cs->si);
if (tick_isset(cs->hcto)) {
ic->rto = cs->hcto;
ic->rex = tick_add(now_ms, ic->rto);
}
/* on shutw we always wake the applet up */
appctx_wakeup(__cs_appctx(cs));
switch (cs->state) {
case CS_ST_RDY:
case CS_ST_EST:
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if CS_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
return;
/* fall through */
case CS_ST_CON:
case CS_ST_CER:
case CS_ST_QUE:
case CS_ST_TAR:
/* Note that none of these states may happen with applets */
cs_applet_release(cs);
cs->state = CS_ST_DIS;
/* fall through */
default:
cs->flags &= ~CS_FL_NOLINGER;
si_rx_shut_blk(cs->si);
ic->flags |= CF_SHUTR;
ic->rex = TICK_ETERNITY;
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
}
}
/* chk_rcv function for applets */
static void cs_app_chk_rcv_applet(struct conn_stream *cs)
{
struct channel *ic = cs_ic(cs);
BUG_ON(!cs_appctx(cs));
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, ic->flags, cs_oc(cs)->flags);
if (!ic->pipe) {
/* (re)start reading */
appctx_wakeup(__cs_appctx(cs));
}
}
/* chk_snd function for applets */
static void cs_app_chk_snd_applet(struct conn_stream *cs)
{
struct channel *oc = cs_oc(cs);
BUG_ON(!cs_appctx(cs));
DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
cs, cs->state, cs_ic(cs)->flags, oc->flags);
if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
return;
/* we only wake the applet up if it was waiting for some data */
if (!(cs->si->flags & SI_FL_WAIT_DATA))
return;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!channel_is_empty(oc)) {
/* (re)start sending */
appctx_wakeup(__cs_appctx(cs));
}
}
/*
* Local variables:
* c-indent-level: 8