mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 15:47:01 +02:00
MEDIUM: stream-int/stream: Use connect expiration instead of SI expiration
The expiration date in the stream-interface was only used on the server side to set the connect, queue or turn-around timeout. It was checked on the frontend stream-interface, but never used concretely. So it was removed and replaced by a connect expiration date in the stream itself. Thus, SI_FL_EXP flag in stream-interfaces is replaced by a stream flag, SF_CONN_EXP.
This commit is contained in:
parent
1d9877700e
commit
ae024ced03
@ -263,7 +263,6 @@ void show_si_flags(unsigned int f)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SHOW_FLAG(f, SI_FL_EXP);
|
|
||||||
SHOW_FLAG(f, SI_FL_ERR);
|
SHOW_FLAG(f, SI_FL_ERR);
|
||||||
SHOW_FLAG(f, SI_FL_KILL_CONN);
|
SHOW_FLAG(f, SI_FL_KILL_CONN);
|
||||||
SHOW_FLAG(f, SI_FL_WAIT_DATA);
|
SHOW_FLAG(f, SI_FL_WAIT_DATA);
|
||||||
@ -418,6 +417,7 @@ void show_strm_flags(unsigned int f)
|
|||||||
SHOW_FLAG(f, SF_REDIRECTABLE);
|
SHOW_FLAG(f, SF_REDIRECTABLE);
|
||||||
SHOW_FLAG(f, SF_IGNORE);
|
SHOW_FLAG(f, SF_IGNORE);
|
||||||
SHOW_FLAG(f, SF_REDISP);
|
SHOW_FLAG(f, SF_REDISP);
|
||||||
|
SHOW_FLAG(f, SF_CONN_EXP);
|
||||||
SHOW_FLAG(f, SF_CURR_SESS);
|
SHOW_FLAG(f, SF_CURR_SESS);
|
||||||
SHOW_FLAG(f, SF_MONITOR);
|
SHOW_FLAG(f, SF_MONITOR);
|
||||||
SHOW_FLAG(f, SF_FORCE_PRST);
|
SHOW_FLAG(f, SF_FORCE_PRST);
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
#define SF_FORCE_PRST 0x00000010 /* force persistence here, even if server is down */
|
#define SF_FORCE_PRST 0x00000010 /* force persistence here, even if server is down */
|
||||||
#define SF_MONITOR 0x00000020 /* this stream comes from a monitoring system */
|
#define SF_MONITOR 0x00000020 /* this stream comes from a monitoring system */
|
||||||
#define SF_CURR_SESS 0x00000040 /* a connection is currently being counted on the server */
|
#define SF_CURR_SESS 0x00000040 /* a connection is currently being counted on the server */
|
||||||
|
#define SF_CONN_EXP 0x00000080 /* timeout has expired */
|
||||||
#define SF_REDISP 0x00000100 /* set if this stream was redispatched from one server to another */
|
#define SF_REDISP 0x00000100 /* set if this stream was redispatched from one server to another */
|
||||||
#define SF_IGNORE 0x00000200 /* The stream lead to a mux upgrade, and should be ignored */
|
#define SF_IGNORE 0x00000200 /* The stream lead to a mux upgrade, and should be ignored */
|
||||||
#define SF_REDIRECTABLE 0x00000400 /* set if this stream is redirectable (GET or HEAD) */
|
#define SF_REDIRECTABLE 0x00000400 /* set if this stream is redirectable (GET or HEAD) */
|
||||||
@ -140,6 +141,7 @@ struct stream {
|
|||||||
int32_t priority_offset; /* priority offset of the stream for the pending queue */
|
int32_t priority_offset; /* priority offset of the stream for the pending queue */
|
||||||
|
|
||||||
int conn_retries; /* number of connect retries performed */
|
int conn_retries; /* number of connect retries performed */
|
||||||
|
unsigned int conn_exp; /* wake up time for connect, queue, turn-around, ... */
|
||||||
|
|
||||||
struct list list; /* position in the thread's streams list */
|
struct list list; /* position in the thread's streams list */
|
||||||
struct mt_list by_srv; /* position in server stream list */
|
struct mt_list by_srv; /* position in server stream list */
|
||||||
|
@ -352,6 +352,22 @@ static inline void stream_choose_redispatch(struct stream *s)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function only has to be called once after a wakeup event in case of
|
||||||
|
* suspected timeout. It controls the stream connection timeout and sets
|
||||||
|
* si->flags accordingly. It does NOT close anything, as this timeout may
|
||||||
|
* be used for any purpose. It returns 1 if the timeout fired, otherwise
|
||||||
|
* zero.
|
||||||
|
*/
|
||||||
|
static inline int stream_check_conn_timeout(struct stream *s)
|
||||||
|
{
|
||||||
|
if (tick_is_expired(s->conn_exp, now_ms)) {
|
||||||
|
s->flags |= SF_CONN_EXP;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
|
int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
|
||||||
|
|
||||||
void service_keywords_register(struct action_kw_list *kw_list);
|
void service_keywords_register(struct action_kw_list *kw_list);
|
||||||
|
@ -83,7 +83,7 @@ enum {
|
|||||||
/* flags set after I/O (32 bit) */
|
/* flags set after I/O (32 bit) */
|
||||||
enum {
|
enum {
|
||||||
SI_FL_NONE = 0x00000000, /* nothing */
|
SI_FL_NONE = 0x00000000, /* nothing */
|
||||||
SI_FL_EXP = 0x00000001, /* timeout has expired */
|
/* unused: 0x00000001 */
|
||||||
SI_FL_ERR = 0x00000002, /* a non-recoverable error has occurred */
|
SI_FL_ERR = 0x00000002, /* a non-recoverable error has occurred */
|
||||||
SI_FL_KILL_CONN = 0x00000004, /* next shutw must kill the whole conn, not just the stream */
|
SI_FL_KILL_CONN = 0x00000004, /* next shutw must kill the whole conn, not just the stream */
|
||||||
SI_FL_WAIT_DATA = 0x00000008, /* stream-int waits for more outgoing data to send */
|
SI_FL_WAIT_DATA = 0x00000008, /* stream-int waits for more outgoing data to send */
|
||||||
@ -125,7 +125,6 @@ struct stream_interface {
|
|||||||
unsigned int flags; /* SI_FL_* */
|
unsigned int flags; /* SI_FL_* */
|
||||||
struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */
|
struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */
|
||||||
struct si_ops *ops; /* general operations at the stream interface layer */
|
struct si_ops *ops; /* general operations at the stream interface layer */
|
||||||
unsigned int exp; /* wake up time for connect, queue, turn-around, ... */
|
|
||||||
|
|
||||||
/* struct members below are the "remote" part, as seen from the buffer side */
|
/* struct members below are the "remote" part, as seen from the buffer side */
|
||||||
unsigned int err_type; /* first error detected, one of SI_ET_* */
|
unsigned int err_type; /* first error detected, one of SI_ET_* */
|
||||||
|
@ -39,7 +39,6 @@ struct stream_interface *si_new(struct conn_stream *cs);
|
|||||||
void si_free(struct stream_interface *si);
|
void si_free(struct stream_interface *si);
|
||||||
|
|
||||||
/* main event functions used to move data between sockets and buffers */
|
/* main event functions used to move data between sockets and buffers */
|
||||||
int si_check_timeouts(struct stream_interface *si);
|
|
||||||
void si_report_error(struct stream_interface *si);
|
void si_report_error(struct stream_interface *si);
|
||||||
void si_retnclose(struct stream_interface *si, const struct buffer *msg);
|
void si_retnclose(struct stream_interface *si, const struct buffer *msg);
|
||||||
int conn_si_send_proxy(struct connection *conn, unsigned int flag);
|
int conn_si_send_proxy(struct connection *conn, unsigned int flag);
|
||||||
@ -108,7 +107,6 @@ static inline struct stream_interface *si_opposite(struct stream_interface *si)
|
|||||||
static inline int si_init(struct stream_interface *si)
|
static inline int si_init(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
si->err_type = SI_ET_NONE;
|
si->err_type = SI_ET_NONE;
|
||||||
si->exp = TICK_ETERNITY;
|
|
||||||
si->flags &= SI_FL_ISBACK;
|
si->flags &= SI_FL_ISBACK;
|
||||||
si->cs = NULL;
|
si->cs = NULL;
|
||||||
si->state = si->prev_state = SI_ST_INI;
|
si->state = si->prev_state = SI_ST_INI;
|
||||||
|
@ -1727,7 +1727,7 @@ static int connect_server(struct stream *s)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* set connect timeout */
|
/* set connect timeout */
|
||||||
cs_si(s->csb)->exp = tick_add_ifset(now_ms, s->be->timeout.connect);
|
s->conn_exp = tick_add_ifset(now_ms, s->be->timeout.connect);
|
||||||
|
|
||||||
if (srv) {
|
if (srv) {
|
||||||
int count;
|
int count;
|
||||||
@ -1757,7 +1757,7 @@ static int connect_server(struct stream *s)
|
|||||||
|
|
||||||
if (!si_state_in(cs_si(s->csb)->state, SI_SB_EST|SI_SB_DIS|SI_SB_CLO) &&
|
if (!si_state_in(cs_si(s->csb)->state, SI_SB_EST|SI_SB_DIS|SI_SB_CLO) &&
|
||||||
(srv_conn->flags & CO_FL_WAIT_XPRT) == 0) {
|
(srv_conn->flags & CO_FL_WAIT_XPRT) == 0) {
|
||||||
cs_si(s->csb)->exp = TICK_ETERNITY;
|
s->conn_exp = TICK_ETERNITY;
|
||||||
cs_oc(s->csb)->flags |= CF_WRITE_NULL;
|
cs_oc(s->csb)->flags |= CF_WRITE_NULL;
|
||||||
if (cs_si(s->csb)->state == SI_ST_CON)
|
if (cs_si(s->csb)->state == SI_ST_CON)
|
||||||
cs_si(s->csb)->state = SI_ST_RDY;
|
cs_si(s->csb)->state = SI_ST_RDY;
|
||||||
@ -1842,7 +1842,7 @@ int srv_redispatch_connect(struct stream *s)
|
|||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
case SRV_STATUS_QUEUED:
|
case SRV_STATUS_QUEUED:
|
||||||
cs_si(s->csb)->exp = tick_add_ifset(now_ms, s->be->timeout.queue);
|
s->conn_exp = tick_add_ifset(now_ms, s->be->timeout.queue);
|
||||||
cs_si(s->csb)->state = SI_ST_QUE;
|
cs_si(s->csb)->state = SI_ST_QUE;
|
||||||
/* do nothing else and do not wake any other stream up */
|
/* do nothing else and do not wake any other stream up */
|
||||||
return 1;
|
return 1;
|
||||||
@ -1978,7 +1978,7 @@ void back_try_conn_req(struct stream *s)
|
|||||||
* go directly to the assigned state, or we need to
|
* go directly to the assigned state, or we need to
|
||||||
* load-balance first and go to the INI state.
|
* load-balance first and go to the INI state.
|
||||||
*/
|
*/
|
||||||
cs->si->exp = TICK_ETERNITY;
|
s->conn_exp = TICK_ETERNITY;
|
||||||
if (unlikely(!(s->flags & SF_ASSIGNED)))
|
if (unlikely(!(s->flags & SF_ASSIGNED)))
|
||||||
cs->si->state = SI_ST_REQ;
|
cs->si->state = SI_ST_REQ;
|
||||||
else {
|
else {
|
||||||
@ -1990,10 +1990,10 @@ void back_try_conn_req(struct stream *s)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Connection request still in queue... */
|
/* Connection request still in queue... */
|
||||||
if (cs->si->flags & SI_FL_EXP) {
|
if (s->flags & SF_CONN_EXP) {
|
||||||
/* ... and timeout expired */
|
/* ... and timeout expired */
|
||||||
cs->si->exp = TICK_ETERNITY;
|
s->conn_exp = TICK_ETERNITY;
|
||||||
cs->si->flags &= ~SI_FL_EXP;
|
s->flags &= ~SF_CONN_EXP;
|
||||||
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
|
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
|
||||||
|
|
||||||
/* we may need to know the position in the queue for logging */
|
/* we may need to know the position in the queue for logging */
|
||||||
@ -2036,11 +2036,11 @@ void back_try_conn_req(struct stream *s)
|
|||||||
goto abort_connection;
|
goto abort_connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(cs->si->flags & SI_FL_EXP))
|
if (!(s->flags & SF_CONN_EXP))
|
||||||
return; /* still in turn-around */
|
return; /* still in turn-around */
|
||||||
|
|
||||||
cs->si->flags &= ~SI_FL_EXP;
|
s->flags &= ~SF_CONN_EXP;
|
||||||
cs->si->exp = TICK_ETERNITY;
|
s->conn_exp = TICK_ETERNITY;
|
||||||
|
|
||||||
/* we keep trying on the same server as long as the stream is
|
/* we keep trying on the same server as long as the stream is
|
||||||
* marked "assigned".
|
* marked "assigned".
|
||||||
@ -2060,8 +2060,8 @@ void back_try_conn_req(struct stream *s)
|
|||||||
|
|
||||||
abort_connection:
|
abort_connection:
|
||||||
/* give up */
|
/* give up */
|
||||||
cs->si->exp = TICK_ETERNITY;
|
s->conn_exp = TICK_ETERNITY;
|
||||||
cs->si->flags &= ~SI_FL_EXP;
|
s->flags &= ~SF_CONN_EXP;
|
||||||
si_shutr(cs->si);
|
si_shutr(cs->si);
|
||||||
si_shutw(cs->si);
|
si_shutw(cs->si);
|
||||||
cs->si->state = SI_ST_CLO;
|
cs->si->state = SI_ST_CLO;
|
||||||
@ -2187,7 +2187,7 @@ void back_handle_st_con(struct stream *s)
|
|||||||
|
|
||||||
done:
|
done:
|
||||||
/* retryable error ? */
|
/* retryable error ? */
|
||||||
if (cs->si->flags & (SI_FL_EXP|SI_FL_ERR)) {
|
if ((s->flags & SF_CONN_EXP) || (cs->si->flags & SI_FL_ERR)) {
|
||||||
if (!cs->si->err_type) {
|
if (!cs->si->err_type) {
|
||||||
if (cs->si->flags & SI_FL_ERR)
|
if (cs->si->flags & SI_FL_ERR)
|
||||||
cs->si->err_type = SI_ET_CONN_ERR;
|
cs->si->err_type = SI_ET_CONN_ERR;
|
||||||
@ -2218,8 +2218,8 @@ void back_handle_st_cer(struct stream *s)
|
|||||||
|
|
||||||
DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
|
DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
|
||||||
|
|
||||||
cs->si->exp = TICK_ETERNITY;
|
s->conn_exp = TICK_ETERNITY;
|
||||||
cs->si->flags &= ~SI_FL_EXP;
|
s->flags &= ~SF_CONN_EXP;
|
||||||
|
|
||||||
s->conn_retries++;
|
s->conn_retries++;
|
||||||
|
|
||||||
@ -2291,7 +2291,7 @@ void back_handle_st_cer(struct stream *s)
|
|||||||
* layers in an unexpected state (i.e < ST_CONN).
|
* layers in an unexpected state (i.e < ST_CONN).
|
||||||
*
|
*
|
||||||
* Note: the stream-interface will be switched to ST_REQ, ST_ASS or
|
* Note: the stream-interface will be switched to ST_REQ, ST_ASS or
|
||||||
* ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset.
|
* ST_TAR and SI_FL_ERR and SF_CONN_EXP flags will be unset.
|
||||||
*/
|
*/
|
||||||
if (cs_reset_endp(cs) < 0) {
|
if (cs_reset_endp(cs) < 0) {
|
||||||
if (!cs->si->err_type)
|
if (!cs->si->err_type)
|
||||||
@ -2343,7 +2343,7 @@ void back_handle_st_cer(struct stream *s)
|
|||||||
(s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_RR ||
|
(s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_RR ||
|
||||||
(s->be->srv_act <= 1)) && !reused) {
|
(s->be->srv_act <= 1)) && !reused) {
|
||||||
cs->si->state = SI_ST_TAR;
|
cs->si->state = SI_ST_TAR;
|
||||||
cs->si->exp = tick_add(now_ms, MS_TO_TICKS(delay));
|
s->conn_exp = tick_add(now_ms, MS_TO_TICKS(delay));
|
||||||
}
|
}
|
||||||
cs->si->flags &= ~SI_FL_ERR;
|
cs->si->flags &= ~SI_FL_ERR;
|
||||||
DBG_TRACE_STATE("retry a new connection", STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
|
DBG_TRACE_STATE("retry a new connection", STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
|
||||||
|
@ -2768,7 +2768,6 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
|
|||||||
|
|
||||||
cs_si(s->csb)->state = cs_si(s->csb)->prev_state = SI_ST_INI;
|
cs_si(s->csb)->state = cs_si(s->csb)->prev_state = SI_ST_INI;
|
||||||
cs_si(s->csb)->err_type = SI_ET_NONE;
|
cs_si(s->csb)->err_type = SI_ET_NONE;
|
||||||
cs_si(s->csb)->exp = TICK_ETERNITY;
|
|
||||||
cs_si(s->csb)->flags &= SI_FL_ISBACK | SI_FL_DONT_WAKE; /* we're in the context of process_stream */
|
cs_si(s->csb)->flags &= SI_FL_ISBACK | SI_FL_DONT_WAKE; /* we're in the context of process_stream */
|
||||||
s->req.flags &= ~(CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CONNECT|CF_WRITE_ERROR|CF_STREAMER|CF_STREAMER_FAST|CF_NEVER_WAIT|CF_WROTE_DATA);
|
s->req.flags &= ~(CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CONNECT|CF_WRITE_ERROR|CF_STREAMER|CF_STREAMER_FAST|CF_NEVER_WAIT|CF_WROTE_DATA);
|
||||||
s->res.flags &= ~(CF_SHUTR|CF_SHUTR_NOW|CF_READ_ATTACHED|CF_READ_ERROR|CF_READ_NOEXP|CF_STREAMER|CF_STREAMER_FAST|CF_WRITE_PARTIAL|CF_NEVER_WAIT|CF_WROTE_DATA|CF_READ_NULL);
|
s->res.flags &= ~(CF_SHUTR|CF_SHUTR_NOW|CF_READ_ATTACHED|CF_READ_ERROR|CF_READ_NOEXP|CF_STREAMER|CF_STREAMER_FAST|CF_WRITE_PARTIAL|CF_NEVER_WAIT|CF_WROTE_DATA|CF_READ_NULL);
|
||||||
@ -2776,6 +2775,7 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
|
|||||||
s->flags &= ~(SF_CURR_SESS|SF_REDIRECTABLE|SF_SRV_REUSED);
|
s->flags &= ~(SF_CURR_SESS|SF_REDIRECTABLE|SF_SRV_REUSED);
|
||||||
s->flags &= ~(SF_ERR_MASK|SF_FINST_MASK|SF_REDISP);
|
s->flags &= ~(SF_ERR_MASK|SF_FINST_MASK|SF_REDISP);
|
||||||
s->conn_retries = 0; /* used for logging too */
|
s->conn_retries = 0; /* used for logging too */
|
||||||
|
s->conn_exp = TICK_ETERNITY;
|
||||||
/* reinitialise the current rule list pointer to NULL. We are sure that
|
/* reinitialise the current rule list pointer to NULL. We are sure that
|
||||||
* any rulelist match the NULL pointer.
|
* any rulelist match the NULL pointer.
|
||||||
*/
|
*/
|
||||||
|
@ -694,7 +694,7 @@ static int debug_parse_cli_stream(char **args, char *payload, struct appctx *app
|
|||||||
if (!*args[3]) {
|
if (!*args[3]) {
|
||||||
return cli_err(appctx,
|
return cli_err(appctx,
|
||||||
"Usage: debug dev stream { <obj> <op> <value> | wake }*\n"
|
"Usage: debug dev stream { <obj> <op> <value> | wake }*\n"
|
||||||
" <obj> = {strm | strm.f | sif.f | sif.s | sif.x | sib.f | sib.s | sib.x |\n"
|
" <obj> = {strm | strm.f | strm.x | sif.f | sif.s | sib.f | sib.s |\n"
|
||||||
" txn.f | req.f | req.r | req.w | res.f | res.r | res.w}\n"
|
" txn.f | req.f | req.r | req.w | res.f | res.r | res.w}\n"
|
||||||
" <op> = {'' (show) | '=' (assign) | '^' (xor) | '+' (or) | '-' (andnot)}\n"
|
" <op> = {'' (show) | '=' (assign) | '^' (xor) | '+' (or) | '-' (andnot)}\n"
|
||||||
" <value> = 'now' | 64-bit dec/hex integer (0x prefix supported)\n"
|
" <value> = 'now' | 64-bit dec/hex integer (0x prefix supported)\n"
|
||||||
@ -713,6 +713,8 @@ static int debug_parse_cli_stream(char **args, char *payload, struct appctx *app
|
|||||||
ptr = (!s || !may_access(s)) ? NULL : &s; size = sizeof(s);
|
ptr = (!s || !may_access(s)) ? NULL : &s; size = sizeof(s);
|
||||||
} else if (isteq(name, ist("strm.f"))) {
|
} else if (isteq(name, ist("strm.f"))) {
|
||||||
ptr = (!s || !may_access(s)) ? NULL : &s->flags; size = sizeof(s->flags);
|
ptr = (!s || !may_access(s)) ? NULL : &s->flags; size = sizeof(s->flags);
|
||||||
|
} else if (isteq(name, ist("strm.x"))) {
|
||||||
|
ptr = (!s || !may_access(s)) ? NULL : &s->conn_exp; size = sizeof(s->conn_exp);
|
||||||
} else if (isteq(name, ist("txn.f"))) {
|
} else if (isteq(name, ist("txn.f"))) {
|
||||||
ptr = (!s || !may_access(s)) ? NULL : &s->txn->flags; size = sizeof(s->txn->flags);
|
ptr = (!s || !may_access(s)) ? NULL : &s->txn->flags; size = sizeof(s->txn->flags);
|
||||||
} else if (isteq(name, ist("req.f"))) {
|
} else if (isteq(name, ist("req.f"))) {
|
||||||
@ -731,10 +733,6 @@ static int debug_parse_cli_stream(char **args, char *payload, struct appctx *app
|
|||||||
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csf)->flags; size = sizeof(cs_si(s->csf)->flags);
|
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csf)->flags; size = sizeof(cs_si(s->csf)->flags);
|
||||||
} else if (isteq(name, ist("sib.f"))) {
|
} else if (isteq(name, ist("sib.f"))) {
|
||||||
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csb)->flags; size = sizeof(cs_si(s->csb)->flags);
|
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csb)->flags; size = sizeof(cs_si(s->csb)->flags);
|
||||||
} else if (isteq(name, ist("sif.x"))) {
|
|
||||||
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csf)->exp; size = sizeof(cs_si(s->csf)->exp);
|
|
||||||
} else if (isteq(name, ist("sib.x"))) {
|
|
||||||
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csb)->exp; size = sizeof(cs_si(s->csb)->exp);
|
|
||||||
} else if (isteq(name, ist("sif.s"))) {
|
} else if (isteq(name, ist("sif.s"))) {
|
||||||
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csf)->state; size = sizeof(cs_si(s->csf)->state);
|
ptr = (!s || !may_access(s)) ? NULL : &cs_si(s->csf)->state; size = sizeof(cs_si(s->csf)->state);
|
||||||
} else if (isteq(name, ist("sib.s"))) {
|
} else if (isteq(name, ist("sib.s"))) {
|
||||||
|
@ -1247,11 +1247,11 @@ static __inline int do_l7_retry(struct stream *s, struct stream_interface *si)
|
|||||||
req->flags &= ~(CF_WRITE_ERROR | CF_WRITE_TIMEOUT | CF_SHUTW | CF_SHUTW_NOW);
|
req->flags &= ~(CF_WRITE_ERROR | CF_WRITE_TIMEOUT | CF_SHUTW | CF_SHUTW_NOW);
|
||||||
res->flags &= ~(CF_READ_ERROR | CF_READ_TIMEOUT | CF_SHUTR | CF_EOI | CF_READ_NULL | CF_SHUTR_NOW);
|
res->flags &= ~(CF_READ_ERROR | CF_READ_TIMEOUT | CF_SHUTR | CF_EOI | CF_READ_NULL | CF_SHUTR_NOW);
|
||||||
res->analysers &= AN_RES_FLT_END;
|
res->analysers &= AN_RES_FLT_END;
|
||||||
si->flags &= ~(SI_FL_ERR | SI_FL_EXP | SI_FL_RXBLK_SHUT);
|
si->flags &= ~(SI_FL_ERR | SI_FL_RXBLK_SHUT);
|
||||||
si->err_type = SI_ET_NONE;
|
si->err_type = SI_ET_NONE;
|
||||||
s->flags &= ~(SF_ERR_MASK | SF_FINST_MASK);
|
s->flags &= ~(SF_CONN_EXP | SF_ERR_MASK | SF_FINST_MASK);
|
||||||
|
s->conn_exp = TICK_ETERNITY;
|
||||||
stream_choose_redispatch(s);
|
stream_choose_redispatch(s);
|
||||||
si->exp = TICK_ETERNITY;
|
|
||||||
res->rex = TICK_ETERNITY;
|
res->rex = TICK_ETERNITY;
|
||||||
res->to_forward = 0;
|
res->to_forward = 0;
|
||||||
res->analyse_exp = TICK_ETERNITY;
|
res->analyse_exp = TICK_ETERNITY;
|
||||||
|
58
src/stream.c
58
src/stream.c
@ -421,6 +421,7 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
|
|||||||
s->task = t;
|
s->task = t;
|
||||||
s->pending_events = 0;
|
s->pending_events = 0;
|
||||||
s->conn_retries = 0;
|
s->conn_retries = 0;
|
||||||
|
s->conn_exp = TICK_ETERNITY;
|
||||||
t->process = process_stream;
|
t->process = process_stream;
|
||||||
t->context = s;
|
t->context = s;
|
||||||
t->expire = TICK_ETERNITY;
|
t->expire = TICK_ETERNITY;
|
||||||
@ -875,8 +876,8 @@ static void back_establish(struct stream *s)
|
|||||||
* timeout.
|
* timeout.
|
||||||
*/
|
*/
|
||||||
s->logs.t_connect = tv_ms_elapsed(&s->logs.tv_accept, &now);
|
s->logs.t_connect = tv_ms_elapsed(&s->logs.tv_accept, &now);
|
||||||
si->exp = TICK_ETERNITY;
|
s->conn_exp = TICK_ETERNITY;
|
||||||
si->flags &= ~SI_FL_EXP;
|
s->flags &= ~SF_CONN_EXP;
|
||||||
|
|
||||||
/* errors faced after sending data need to be reported */
|
/* errors faced after sending data need to be reported */
|
||||||
if (si->flags & SI_FL_ERR && req->flags & CF_WROTE_DATA) {
|
if (si->flags & SI_FL_ERR && req->flags & CF_WROTE_DATA) {
|
||||||
@ -1626,8 +1627,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
|||||||
* stream interfaces when their timeouts have expired.
|
* stream interfaces when their timeouts have expired.
|
||||||
*/
|
*/
|
||||||
if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
|
if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
|
||||||
si_check_timeouts(si_f);
|
stream_check_conn_timeout(s);
|
||||||
si_check_timeouts(si_b);
|
|
||||||
|
|
||||||
/* check channel timeouts, and close the corresponding stream interfaces
|
/* check channel timeouts, and close the corresponding stream interfaces
|
||||||
* for future reads or writes. Note: this will also concern upper layers
|
* for future reads or writes. Note: this will also concern upper layers
|
||||||
@ -1672,7 +1672,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
|||||||
if (!((req->flags | res->flags) &
|
if (!((req->flags | res->flags) &
|
||||||
(CF_SHUTR|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_SHUTW|
|
(CF_SHUTR|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_SHUTW|
|
||||||
CF_WRITE_ACTIVITY|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT)) &&
|
CF_WRITE_ACTIVITY|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT)) &&
|
||||||
!((si_f->flags | si_b->flags) & (SI_FL_EXP|SI_FL_ERR)) &&
|
!(s->flags & SF_CONN_EXP) &&
|
||||||
|
!((si_f->flags | si_b->flags) & SI_FL_ERR) &&
|
||||||
((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
|
((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
|
||||||
si_f->flags &= ~SI_FL_DONT_WAKE;
|
si_f->flags &= ~SI_FL_DONT_WAKE;
|
||||||
si_b->flags &= ~SI_FL_DONT_WAKE;
|
si_b->flags &= ~SI_FL_DONT_WAKE;
|
||||||
@ -2462,11 +2463,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
|||||||
|
|
||||||
t->expire = tick_first(t->expire, res->analyse_exp);
|
t->expire = tick_first(t->expire, res->analyse_exp);
|
||||||
|
|
||||||
if (si_f->exp)
|
t->expire = tick_first(t->expire, s->conn_exp);
|
||||||
t->expire = tick_first(t->expire, si_f->exp);
|
|
||||||
|
|
||||||
if (si_b->exp)
|
|
||||||
t->expire = tick_first(t->expire, si_b->exp);
|
|
||||||
|
|
||||||
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
|
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
|
||||||
stream_release_buffers(s);
|
stream_release_buffers(s);
|
||||||
@ -3154,8 +3151,13 @@ static int stats_dump_full_strm_to_buffer(struct conn_stream *cs, struct stream
|
|||||||
}
|
}
|
||||||
|
|
||||||
chunk_appendf(&trash,
|
chunk_appendf(&trash,
|
||||||
" flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n",
|
" flags=0x%x, conn_retries=%d, conn_exp=%s srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n",
|
||||||
strm->flags, strm->conn_retries, strm->srv_conn, strm->pend_pos,
|
strm->flags, strm->conn_retries,
|
||||||
|
strm->conn_exp ?
|
||||||
|
tick_is_expired(strm->conn_exp, now_ms) ? "<PAST>" :
|
||||||
|
human_time(TICKS_TO_MS(strm->conn_exp - now_ms),
|
||||||
|
TICKS_TO_MS(1000)) : "<NEVER>",
|
||||||
|
strm->srv_conn, strm->pend_pos,
|
||||||
LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch);
|
LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch);
|
||||||
|
|
||||||
chunk_appendf(&trash,
|
chunk_appendf(&trash,
|
||||||
@ -3250,29 +3252,21 @@ static int stats_dump_full_strm_to_buffer(struct conn_stream *cs, struct stream
|
|||||||
strm->txn->req.flags, strm->txn->rsp.flags);
|
strm->txn->req.flags, strm->txn->rsp.flags);
|
||||||
|
|
||||||
chunk_appendf(&trash,
|
chunk_appendf(&trash,
|
||||||
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s et=0x%03x sub=%d)\n",
|
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p et=0x%03x sub=%d)\n",
|
||||||
strm->csf->si,
|
strm->csf->si,
|
||||||
si_state_str(strm->csf->si->state),
|
si_state_str(strm->csf->si->state),
|
||||||
strm->csf->si->flags,
|
strm->csf->si->flags,
|
||||||
(strm->csf->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
|
(strm->csf->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
|
||||||
__cs_endp_target(strm->csf),
|
__cs_endp_target(strm->csf),
|
||||||
strm->csf->si->exp ?
|
|
||||||
tick_is_expired(strm->csf->si->exp, now_ms) ? "<PAST>" :
|
|
||||||
human_time(TICKS_TO_MS(strm->csf->si->exp - now_ms),
|
|
||||||
TICKS_TO_MS(1000)) : "<NEVER>",
|
|
||||||
strm->csf->si->err_type, strm->csf->si->wait_event.events);
|
strm->csf->si->err_type, strm->csf->si->wait_event.events);
|
||||||
|
|
||||||
chunk_appendf(&trash,
|
chunk_appendf(&trash,
|
||||||
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s et=0x%03x sub=%d)\n",
|
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p et=0x%03x sub=%d)\n",
|
||||||
strm->csb->si,
|
strm->csb->si,
|
||||||
si_state_str(strm->csb->si->state),
|
si_state_str(strm->csb->si->state),
|
||||||
strm->csb->si->flags,
|
strm->csb->si->flags,
|
||||||
(strm->csb->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
|
(strm->csb->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
|
||||||
__cs_endp_target(strm->csb),
|
__cs_endp_target(strm->csb),
|
||||||
strm->csb->si->exp ?
|
|
||||||
tick_is_expired(strm->csb->si->exp, now_ms) ? "<PAST>" :
|
|
||||||
human_time(TICKS_TO_MS(strm->csb->si->exp - now_ms),
|
|
||||||
TICKS_TO_MS(1000)) : "<NEVER>",
|
|
||||||
strm->csb->si->err_type, strm->csb->si->wait_event.events);
|
strm->csb->si->err_type, strm->csb->si->wait_event.events);
|
||||||
|
|
||||||
csf = strm->csf;
|
csf = strm->csf;
|
||||||
@ -3647,28 +3641,26 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
|
|||||||
|
|
||||||
conn = cs_conn(curr_strm->csf);
|
conn = cs_conn(curr_strm->csf);
|
||||||
chunk_appendf(&trash,
|
chunk_appendf(&trash,
|
||||||
" s0=[%d,%1xh,fd=%d,ex=%s]",
|
" s0=[%d,%1xh,fd=%d]",
|
||||||
curr_strm->csf->si->state,
|
curr_strm->csf->si->state,
|
||||||
curr_strm->csf->si->flags,
|
curr_strm->csf->si->flags,
|
||||||
conn_fd(conn),
|
conn_fd(conn));
|
||||||
curr_strm->csf->si->exp ?
|
|
||||||
human_time(TICKS_TO_MS(curr_strm->csf->si->exp - now_ms),
|
|
||||||
TICKS_TO_MS(1000)) : "");
|
|
||||||
|
|
||||||
conn = cs_conn(curr_strm->csb);
|
conn = cs_conn(curr_strm->csb);
|
||||||
chunk_appendf(&trash,
|
chunk_appendf(&trash,
|
||||||
" s1=[%d,%1xh,fd=%d,ex=%s]",
|
" s1=[%d,%1xh,fd=%d]",
|
||||||
curr_strm->csb->si->state,
|
curr_strm->csb->si->state,
|
||||||
curr_strm->csb->si->flags,
|
curr_strm->csb->si->flags,
|
||||||
conn_fd(conn),
|
conn_fd(conn));
|
||||||
curr_strm->csb->si->exp ?
|
|
||||||
human_time(TICKS_TO_MS(curr_strm->csb->si->exp - now_ms),
|
|
||||||
TICKS_TO_MS(1000)) : "");
|
|
||||||
|
|
||||||
chunk_appendf(&trash,
|
chunk_appendf(&trash,
|
||||||
" exp=%s",
|
" exp=%s rc=%d c_exp=%s",
|
||||||
curr_strm->task->expire ?
|
curr_strm->task->expire ?
|
||||||
human_time(TICKS_TO_MS(curr_strm->task->expire - now_ms),
|
human_time(TICKS_TO_MS(curr_strm->task->expire - now_ms),
|
||||||
|
TICKS_TO_MS(1000)) : "",
|
||||||
|
curr_strm->conn_retries,
|
||||||
|
curr_strm->conn_exp ?
|
||||||
|
human_time(TICKS_TO_MS(curr_strm->conn_exp - now_ms),
|
||||||
TICKS_TO_MS(1000)) : "");
|
TICKS_TO_MS(1000)) : "");
|
||||||
if (task_in_rq(curr_strm->task))
|
if (task_in_rq(curr_strm->task))
|
||||||
chunk_appendf(&trash, " run(nice=%d)", curr_strm->task->nice);
|
chunk_appendf(&trash, " run(nice=%d)", curr_strm->task->nice);
|
||||||
|
@ -128,22 +128,6 @@ void si_free(struct stream_interface *si)
|
|||||||
pool_free(pool_head_streaminterface, si);
|
pool_free(pool_head_streaminterface, si);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* This function only has to be called once after a wakeup event in case of
|
|
||||||
* suspected timeout. It controls the stream interface timeouts and sets
|
|
||||||
* si->flags accordingly. It does NOT close anything, as this timeout may
|
|
||||||
* be used for any purpose. It returns 1 if the timeout fired, otherwise
|
|
||||||
* zero.
|
|
||||||
*/
|
|
||||||
int si_check_timeouts(struct stream_interface *si)
|
|
||||||
{
|
|
||||||
if (tick_is_expired(si->exp, now_ms)) {
|
|
||||||
si->flags |= SI_FL_EXP;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* to be called only when in SI_ST_DIS with SI_FL_ERR */
|
/* to be called only when in SI_ST_DIS with SI_FL_ERR */
|
||||||
void si_report_error(struct stream_interface *si)
|
void si_report_error(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
@ -206,7 +190,7 @@ static void stream_int_shutr(struct stream_interface *si)
|
|||||||
|
|
||||||
if (si_oc(si)->flags & CF_SHUTW) {
|
if (si_oc(si)->flags & CF_SHUTW) {
|
||||||
si->state = SI_ST_DIS;
|
si->state = SI_ST_DIS;
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
else if (si->flags & SI_FL_NOHALF) {
|
else if (si->flags & SI_FL_NOHALF) {
|
||||||
/* we want to immediately forward this close to the write side */
|
/* we want to immediately forward this close to the write side */
|
||||||
@ -268,7 +252,7 @@ static void stream_int_shutw(struct stream_interface *si)
|
|||||||
si_rx_shut_blk(si);
|
si_rx_shut_blk(si);
|
||||||
ic->flags |= CF_SHUTR;
|
ic->flags |= CF_SHUTR;
|
||||||
ic->rex = TICK_ETERNITY;
|
ic->rex = TICK_ETERNITY;
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* note that if the task exists, it must unregister itself once it runs */
|
/* note that if the task exists, it must unregister itself once it runs */
|
||||||
@ -569,12 +553,7 @@ static void stream_int_notify(struct stream_interface *si)
|
|||||||
|
|
||||||
task->expire = tick_first(task->expire, ic->analyse_exp);
|
task->expire = tick_first(task->expire, ic->analyse_exp);
|
||||||
task->expire = tick_first(task->expire, oc->analyse_exp);
|
task->expire = tick_first(task->expire, oc->analyse_exp);
|
||||||
|
task->expire = tick_first(task->expire, __cs_strm(si->cs)->conn_exp);
|
||||||
if (si->exp)
|
|
||||||
task->expire = tick_first(task->expire, si->exp);
|
|
||||||
|
|
||||||
if (sio->exp)
|
|
||||||
task->expire = tick_first(task->expire, sio->exp);
|
|
||||||
|
|
||||||
task_queue(task);
|
task_queue(task);
|
||||||
}
|
}
|
||||||
@ -650,7 +629,7 @@ static int si_cs_process(struct conn_stream *cs)
|
|||||||
|
|
||||||
if (!si_state_in(si->state, SI_SB_EST|SI_SB_DIS|SI_SB_CLO) &&
|
if (!si_state_in(si->state, SI_SB_EST|SI_SB_DIS|SI_SB_CLO) &&
|
||||||
(conn->flags & CO_FL_WAIT_XPRT) == 0) {
|
(conn->flags & CO_FL_WAIT_XPRT) == 0) {
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
|
||||||
oc->flags |= CF_WRITE_NULL;
|
oc->flags |= CF_WRITE_NULL;
|
||||||
if (si->state == SI_ST_CON)
|
if (si->state == SI_ST_CON)
|
||||||
si->state = SI_ST_RDY;
|
si->state = SI_ST_RDY;
|
||||||
@ -1075,7 +1054,7 @@ static void stream_int_shutr_conn(struct stream_interface *si)
|
|||||||
if (si_oc(si)->flags & CF_SHUTW) {
|
if (si_oc(si)->flags & CF_SHUTW) {
|
||||||
cs_close(cs);
|
cs_close(cs);
|
||||||
si->state = SI_ST_DIS;
|
si->state = SI_ST_DIS;
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
else if (si->flags & SI_FL_NOHALF) {
|
else if (si->flags & SI_FL_NOHALF) {
|
||||||
/* we want to immediately forward this close to the write side */
|
/* we want to immediately forward this close to the write side */
|
||||||
@ -1166,7 +1145,7 @@ static void stream_int_shutw_conn(struct stream_interface *si)
|
|||||||
si_rx_shut_blk(si);
|
si_rx_shut_blk(si);
|
||||||
ic->flags |= CF_SHUTR;
|
ic->flags |= CF_SHUTR;
|
||||||
ic->rex = TICK_ETERNITY;
|
ic->rex = TICK_ETERNITY;
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1658,7 +1637,7 @@ static void stream_int_read0(struct stream_interface *si)
|
|||||||
si_done_get(si);
|
si_done_get(si);
|
||||||
|
|
||||||
si->state = SI_ST_DIS;
|
si->state = SI_ST_DIS;
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1727,7 +1706,7 @@ static void stream_int_shutr_applet(struct stream_interface *si)
|
|||||||
if (si_oc(si)->flags & CF_SHUTW) {
|
if (si_oc(si)->flags & CF_SHUTW) {
|
||||||
si_applet_release(si);
|
si_applet_release(si);
|
||||||
si->state = SI_ST_DIS;
|
si->state = SI_ST_DIS;
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
else if (si->flags & SI_FL_NOHALF) {
|
else if (si->flags & SI_FL_NOHALF) {
|
||||||
/* we want to immediately forward this close to the write side */
|
/* we want to immediately forward this close to the write side */
|
||||||
@ -1791,7 +1770,7 @@ static void stream_int_shutw_applet(struct stream_interface *si)
|
|||||||
si_rx_shut_blk(si);
|
si_rx_shut_blk(si);
|
||||||
ic->flags |= CF_SHUTR;
|
ic->flags |= CF_SHUTR;
|
||||||
ic->rex = TICK_ETERNITY;
|
ic->rex = TICK_ETERNITY;
|
||||||
si->exp = TICK_ETERNITY;
|
__cs_strm(si->cs)->conn_exp = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user