From f8413cba2a3e824747b9ef3ea3b65a07b03b0292 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Tue, 7 Feb 2023 16:06:14 +0100 Subject: [PATCH] MEDIUM: channel/stconn: Move rex/wex timer from the channel to the sedesc These timers are related to the I/O. Thus it is logical to move them into the SE descriptor. The patch is a bit huge but it is just a replacement. However it is error-prone. From the stconn or the stream, helper functions are used to get, set or reset these timers. This simplify the timers manipulations. --- include/haproxy/channel-t.h | 2 - include/haproxy/channel.h | 4 +- include/haproxy/stconn-t.h | 4 ++ include/haproxy/stconn.h | 31 ++++++++ src/cli.c | 9 +-- src/debug.c | 21 +++--- src/dns.c | 2 +- src/hlua.c | 14 ++-- src/http_ana.c | 5 +- src/sink.c | 6 +- src/stconn.c | 88 +++++++++++------------ src/stream.c | 140 ++++++++++++++---------------------- 12 files changed, 164 insertions(+), 162 deletions(-) diff --git a/include/haproxy/channel-t.h b/include/haproxy/channel-t.h index 9bd493345..a61e4a52c 100644 --- a/include/haproxy/channel-t.h +++ b/include/haproxy/channel-t.h @@ -251,8 +251,6 @@ struct channel { unsigned char xfer_large; /* number of consecutive large xfers */ unsigned char xfer_small; /* number of consecutive small xfers */ unsigned long long total; /* total data read */ - int rex; /* expiration date for a read, in ticks */ - int wex; /* expiration date for a write or connect, in ticks */ int analyse_exp; /* expiration date for current analysers (if set) */ }; diff --git a/include/haproxy/channel.h b/include/haproxy/channel.h index 053bc24c4..be49eef7b 100644 --- a/include/haproxy/channel.h +++ b/include/haproxy/channel.h @@ -531,11 +531,11 @@ static inline int channel_output_closed(struct channel *chn) static inline void channel_check_timeouts(struct channel *chn) { if (likely(!(chn->flags & (CF_SHUTR|CF_READ_TIMEOUT|CF_READ_EVENT))) && - unlikely(tick_is_expired(chn->rex, now_ms))) + unlikely(tick_is_expired(sc_ep_rex(chn_prod(chn)), now_ms))) chn->flags |= CF_READ_TIMEOUT; if (likely(!(chn->flags & (CF_SHUTW|CF_WRITE_TIMEOUT|CF_WRITE_EVENT))) && - unlikely(tick_is_expired(chn->wex, now_ms))) + unlikely(tick_is_expired(sc_ep_wex(chn_cons(chn)), now_ms))) chn->flags |= CF_WRITE_TIMEOUT; if (likely(!(chn->flags & CF_READ_EVENT)) && diff --git a/include/haproxy/stconn-t.h b/include/haproxy/stconn-t.h index 526b29be1..ddaf733d6 100644 --- a/include/haproxy/stconn-t.h +++ b/include/haproxy/stconn-t.h @@ -199,6 +199,8 @@ struct stconn; * is the stream endpoint, i.e. the mux stream or the appctx * is the connection for connection-based streams * is the stream connector we're attached to, or NULL + * is the expiration date for a read, in ticks + * is the expiration date for a write or connect, in ticks * SE_FL_* */ struct sedesc { @@ -206,6 +208,8 @@ struct sedesc { struct connection *conn; struct stconn *sc; unsigned int flags; + int rex; + int wex; }; /* sc_app_ops describes the application layer's operations and notification diff --git a/include/haproxy/stconn.h b/include/haproxy/stconn.h index 067c71e02..56de284e0 100644 --- a/include/haproxy/stconn.h +++ b/include/haproxy/stconn.h @@ -137,6 +137,37 @@ static forceinline uint sc_ep_get(const struct stconn *sc) } +static forceinline int sc_ep_rex(const struct stconn *sc) +{ + return sc->sedesc->rex; +} + +static forceinline int sc_ep_wex(const struct stconn *sc) +{ + return sc->sedesc->wex; +} + +static forceinline void sc_ep_reset_rex(struct stconn *sc) +{ + sc->sedesc->rex = TICK_ETERNITY; +} + +static forceinline void sc_ep_reset_wex(struct stconn *sc) +{ + sc->sedesc->wex = TICK_ETERNITY; +} + + +static forceinline void sc_ep_set_rex(struct stconn *sc, unsigned int rto) +{ + sc->sedesc->rex = tick_add_ifset(now_ms, rto); +} + +static forceinline void sc_ep_set_wex(struct stconn *sc, unsigned int wto) +{ + sc->sedesc->wex = tick_add_ifset(now_ms, wto); +} + /* Returns the stream endpoint from an connector, without any control */ static inline void *__sc_endp(const struct stconn *sc) { diff --git a/src/cli.c b/src/cli.c index acaaaae4b..d1d5a56d3 100644 --- a/src/cli.c +++ b/src/cli.c @@ -2831,11 +2831,12 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit) s->scb->rto = TICK_ETERNITY; s->scb->wto = TICK_ETERNITY; - s->req.rex = TICK_ETERNITY; - s->req.wex = TICK_ETERNITY; + sc_ep_reset_rex(s->scf); + sc_ep_reset_wex(s->scf); s->req.analyse_exp = TICK_ETERNITY; - s->res.rex = TICK_ETERNITY; - s->res.wex = TICK_ETERNITY; + + sc_ep_reset_rex(s->scb); + sc_ep_reset_wex(s->scb); s->res.analyse_exp = TICK_ETERNITY; s->scb->hcto = TICK_ETERNITY; diff --git a/src/debug.c b/src/debug.c index 44f9755fe..135affd6f 100644 --- a/src/debug.c +++ b/src/debug.c @@ -760,8 +760,9 @@ static int debug_parse_cli_stream(char **args, char *payload, struct appctx *app if (!*args[3]) { return cli_err(appctx, "Usage: debug dev stream { | wake }*\n" - " = {strm | strm.f | strm.x | scf.s | scb.s |\n" - " txn.f | req.f | req.r | req.w | res.f | res.r | res.w}\n" + " = {strm | strm.f | strm.x |\n" + " scf.s | scf.r | scf.w | scb.s | scb.r | scb.w |\n" + " txn.f | req.f | res.f}\n" " = {'' (show) | '=' (assign) | '^' (xor) | '+' (or) | '-' (andnot)}\n" " = 'now' | 64-bit dec/hex integer (0x prefix supported)\n" " 'wake' wakes the stream asssigned to 'strm' (default: current)\n" @@ -787,18 +788,18 @@ static int debug_parse_cli_stream(char **args, char *payload, struct appctx *app ptr = (!s || !may_access(s)) ? NULL : &s->req.flags; size = sizeof(s->req.flags); } else if (isteq(name, ist("res.f"))) { ptr = (!s || !may_access(s)) ? NULL : &s->res.flags; size = sizeof(s->res.flags); - } else if (isteq(name, ist("req.r"))) { - ptr = (!s || !may_access(s)) ? NULL : &s->req.rex; size = sizeof(s->req.rex); - } else if (isteq(name, ist("res.r"))) { - ptr = (!s || !may_access(s)) ? NULL : &s->res.rex; size = sizeof(s->res.rex); - } else if (isteq(name, ist("req.w"))) { - ptr = (!s || !may_access(s)) ? NULL : &s->req.wex; size = sizeof(s->req.wex); - } else if (isteq(name, ist("res.w"))) { - ptr = (!s || !may_access(s)) ? NULL : &s->res.wex; size = sizeof(s->res.wex); } else if (isteq(name, ist("scf.s"))) { ptr = (!s || !may_access(s)) ? NULL : &s->scf->state; size = sizeof(s->scf->state); + } else if (isteq(name, ist("scf.r"))) { + ptr = (!s || !may_access(s)) ? NULL : &s->scf->sedesc->rex; size = sizeof(s->scf->sedesc->rex); + } else if (isteq(name, ist("scf.w"))) { + ptr = (!s || !may_access(s)) ? NULL : &s->scf->sedesc->wex; size = sizeof(s->scf->sedesc->wex); } else if (isteq(name, ist("scb.s"))) { ptr = (!s || !may_access(s)) ? NULL : &s->scf->state; size = sizeof(s->scb->state); + } else if (isteq(name, ist("scb.r"))) { + ptr = (!s || !may_access(s)) ? NULL : &s->scb->sedesc->rex; size = sizeof(s->scb->sedesc->rex); + } else if (isteq(name, ist("scb.w"))) { + ptr = (!s || !may_access(s)) ? NULL : &s->scb->sedesc->wex; size = sizeof(s->scb->sedesc->wex); } else if (isteq(name, ist("wake"))) { if (s && may_access(s) && may_access((void *)s + sizeof(*s) - 1)) task_wakeup(s->task, TASK_WOKEN_TIMER|TASK_WOKEN_IO|TASK_WOKEN_MSG); diff --git a/src/dns.c b/src/dns.c index 0494abaf4..d3114fb4f 100644 --- a/src/dns.c +++ b/src/dns.c @@ -839,7 +839,7 @@ static int dns_session_init(struct appctx *appctx) * We are using a syslog server. */ s->scb->rto = TICK_ETERNITY; - s->res.rex = TICK_ETERNITY; + sc_ep_reset_rex(s->scb); ds->appctx = appctx; return 0; diff --git a/src/hlua.c b/src/hlua.c index 5ef1618f5..864771aa9 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2524,8 +2524,8 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext /* update buffers. */ appctx_wakeup(appctx); - s->req.rex = TICK_ETERNITY; - s->res.wex = TICK_ETERNITY; + sc_ep_reset_rex(s->scf);; + sc_ep_reset_wex(s->scb); /* Update length sent. */ lua_pop(L, 1); @@ -3018,10 +3018,10 @@ __LJMP static int hlua_socket_settimeout(struct lua_State *L) s->sess->fe->timeout.connect = tmout; s->scf->rto = s->scf->wto = tmout; s->scb->rto = s->scb->wto = tmout; - s->req.rex = tick_add_ifset(now_ms, tmout); - s->req.wex = tick_add_ifset(now_ms, tmout); - s->res.rex = tick_add_ifset(now_ms, tmout); - s->res.wex = tick_add_ifset(now_ms, tmout); + sc_ep_set_rex(s->scf, tmout); + sc_ep_set_wex(s->scf, tmout); + sc_ep_set_rex(s->scb, tmout); + sc_ep_set_wex(s->scb, tmout); s->task->expire = tick_add_ifset(now_ms, tmout); task_queue(s->task); @@ -8084,7 +8084,7 @@ __LJMP static int hlua_txn_done(lua_State *L) channel_auto_close(req); channel_erase(req); - res->wex = tick_add_ifset(now_ms, s->scf->wto); + sc_ep_set_wex(s->scb, s->scf->wto); channel_auto_read(res); channel_auto_close(res); channel_shutr_now(res); diff --git a/src/http_ana.c b/src/http_ana.c index ac22e5ace..3f1569527 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -1141,7 +1141,6 @@ static __inline int do_l7_retry(struct stream *s, struct stconn *sc) s->flags &= ~(SF_CONN_EXP | SF_ERR_MASK | SF_FINST_MASK); s->conn_exp = TICK_ETERNITY; stream_choose_redispatch(s); - res->rex = TICK_ETERNITY; res->to_forward = 0; res->analyse_exp = TICK_ETERNITY; res->total = 0; @@ -4439,7 +4438,7 @@ int http_forward_proxy_resp(struct stream *s, int final) channel_auto_close(req); channel_htx_erase(req, htxbuf(&req->buf)); - res->wex = tick_add_ifset(now_ms, s->scf->wto); + sc_ep_set_wex(s->scb, s->scf->wto); channel_auto_read(res); channel_auto_close(res); channel_shutr_now(res); @@ -4493,7 +4492,7 @@ void http_reply_and_close(struct stream *s, short status, struct http_reply *msg } end: - s->res.wex = tick_add_ifset(now_ms, s->scf->wto); + sc_ep_set_wex(s->scb, s->scf->wto); /* At this staged, HTTP analysis is finished */ s->req.analysers &= AN_REQ_FLT_END; diff --git a/src/sink.c b/src/sink.c index 035109f44..8d5218414 100644 --- a/src/sink.c +++ b/src/sink.c @@ -326,7 +326,7 @@ static void sink_forward_io_handler(struct appctx *appctx) * and we don't want expire on this case * with a syslog server */ - sc_oc(sc)->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc_opposite(sc)); /* rto should not change but it seems the case */ sc_opposite(sc)->rto = TICK_ETERNITY; @@ -474,7 +474,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx) * and we don't want expire on this case * with a syslog server */ - sc_oc(sc)->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc_opposite(sc)); /* rto should not change but it seems the case */ sc_opposite(sc)->rto = TICK_ETERNITY; @@ -636,7 +636,7 @@ static int sink_forward_session_init(struct appctx *appctx) * We are using a syslog server. */ s->scb->rto = TICK_ETERNITY; - s->res.rex = TICK_ETERNITY; + sc_ep_reset_rex(s->scb); sft->appctx = appctx; return 0; diff --git a/src/stconn.c b/src/stconn.c index d47872368..fd8c0f629 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -92,6 +92,7 @@ void sedesc_init(struct sedesc *sedesc) sedesc->se = NULL; sedesc->conn = NULL; sedesc->sc = NULL; + sedesc->rex = sedesc->wex = TICK_ETERNITY; se_fl_setall(sedesc, SE_FL_NONE); } @@ -532,7 +533,7 @@ static void sc_app_shutr(struct stconn *sc) if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) return; @@ -566,11 +567,11 @@ static void sc_app_shutw(struct stconn *sc) if (oc->flags & CF_SHUTW) return; oc->flags |= CF_SHUTW; - oc->wex = TICK_ETERNITY; + sc_ep_reset_wex(sc); if (tick_isset(sc->hcto)) { sc->rto = sc->hcto; - ic->rex = tick_add(now_ms, sc->rto); + sc_ep_set_rex(sc, sc->rto); } switch (sc->state) { @@ -597,7 +598,7 @@ static void sc_app_shutw(struct stconn *sc) default: sc->flags &= ~SC_FL_NOLINGER; ic->flags |= CF_SHUTR; - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); if (sc->flags & SC_FL_ISBACK) __sc_strm(sc)->conn_exp = TICK_ETERNITY; } @@ -647,8 +648,8 @@ static void sc_app_chk_snd(struct stconn *sc) * so we tell the handler. */ sc_ep_clr(sc, SE_FL_WAIT_DATA); - if (!tick_isset(oc->wex)) - oc->wex = tick_add_ifset(now_ms, sc->wto); + if (!tick_isset(sc_ep_wex(sc))) + sc_ep_set_wex(sc, sc->wto); if (!(sc->flags & SC_FL_DONT_WAKE)) task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO); @@ -673,7 +674,7 @@ static void sc_app_shutr_conn(struct stconn *sc) if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) return; @@ -707,11 +708,11 @@ static void sc_app_shutw_conn(struct stconn *sc) if (oc->flags & CF_SHUTW) return; oc->flags |= CF_SHUTW; - oc->wex = TICK_ETERNITY; + sc_ep_reset_wex(sc); if (tick_isset(sc->hcto)) { sc->rto = sc->hcto; - ic->rex = tick_add(now_ms, sc->rto); + sc_ep_set_rex(sc, sc->rto); } switch (sc->state) { @@ -763,7 +764,7 @@ static void sc_app_shutw_conn(struct stconn *sc) default: sc->flags &= ~SC_FL_NOLINGER; ic->flags |= CF_SHUTR; - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); if (sc->flags & SC_FL_ISBACK) __sc_strm(sc)->conn_exp = TICK_ETERNITY; } @@ -835,25 +836,23 @@ static void sc_app_chk_snd_conn(struct stconn *sc) if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0) sc_ep_set(sc, SE_FL_WAIT_DATA); - oc->wex = TICK_ETERNITY; + sc_ep_reset_wex(sc); } else { /* Otherwise there are remaining data to be sent in the buffer, * which means we have to poll before doing so. */ sc_ep_clr(sc, SE_FL_WAIT_DATA); - if (!tick_isset(oc->wex)) - oc->wex = tick_add_ifset(now_ms, sc->wto); + if (!tick_isset(sc_ep_wex(sc))) + sc_ep_set_wex(sc, sc->wto); } if (likely(oc->flags & CF_WRITE_EVENT)) { - struct channel *ic = sc_ic(sc); - /* update timeout if we have written something */ if (!(oc->flags & CF_SHUTW) && !channel_is_empty(oc)) - oc->wex = tick_add_ifset(now_ms, sc->wto); + sc_ep_set_wex(sc, sc->wto); - if (tick_isset(ic->rex) && !(sc->flags & SC_FL_INDEP_STR)) { + if (tick_isset(sc_ep_rex(sc)) && !(sc->flags & SC_FL_INDEP_STR)) { /* Note: to prevent the client from expiring read timeouts * during writes, we refresh it. We only do this if the * interface is not configured for "independent streams", @@ -862,7 +861,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc) * of data which can full the socket buffers long before a * write timeout is detected. */ - ic->rex = tick_add_ifset(now_ms, sc->rto); + sc_ep_set_rex(sc, sc->rto); } } @@ -897,7 +896,7 @@ static void sc_app_shutr_applet(struct stconn *sc) if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); /* Note: on shutr, we don't call the applet */ @@ -932,11 +931,11 @@ static void sc_app_shutw_applet(struct stconn *sc) if (oc->flags & CF_SHUTW) return; oc->flags |= CF_SHUTW; - oc->wex = TICK_ETERNITY; + sc_ep_reset_wex(sc); if (tick_isset(sc->hcto)) { sc->rto = sc->hcto; - ic->rex = tick_add(now_ms, sc->rto); + sc_ep_set_rex(sc, sc->rto); } /* on shutw we always wake the applet up */ @@ -967,7 +966,7 @@ static void sc_app_shutw_applet(struct stconn *sc) default: sc->flags &= ~SC_FL_NOLINGER; ic->flags |= CF_SHUTR; - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); if (sc->flags & SC_FL_ISBACK) __sc_strm(sc)->conn_exp = TICK_ETERNITY; } @@ -1008,8 +1007,8 @@ static void sc_app_chk_snd_applet(struct stconn *sc) if (!sc_ep_test(sc, SE_FL_WAIT_DATA) || sc_ep_test(sc, SE_FL_WONT_CONSUME)) return; - if (!tick_isset(oc->wex)) - oc->wex = tick_add_ifset(now_ms, sc->wto); + if (!tick_isset(sc_ep_wex(sc))) + sc_ep_set_wex(sc, sc->wto); if (!channel_is_empty(oc)) { /* (re)start sending */ @@ -1041,9 +1040,9 @@ void sc_update_rx(struct stconn *sc) sc_will_read(sc); if ((ic->flags & CF_EOI) || sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) - ic->rex = TICK_ETERNITY; - else if (!tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, sc->rto); + sc_ep_reset_rex(sc); + else if (!tick_isset(sc_ep_rex(sc))) + sc_ep_set_rex(sc, sc->rto); sc_chk_rcv(sc); } @@ -1060,7 +1059,6 @@ void sc_update_rx(struct stconn *sc) void sc_update_tx(struct stconn *sc) { struct channel *oc = sc_oc(sc); - struct channel *ic = sc_ic(sc); if (oc->flags & CF_SHUTW) return; @@ -1071,7 +1069,7 @@ void sc_update_tx(struct stconn *sc) if (!sc_ep_test(sc, SE_FL_WAIT_DATA)) { if ((oc->flags & CF_SHUTW_NOW) == 0) sc_ep_set(sc, SE_FL_WAIT_DATA); - oc->wex = TICK_ETERNITY; + sc_ep_reset_wex(sc); } return; } @@ -1082,16 +1080,16 @@ void sc_update_tx(struct stconn *sc) * have updated it if there has been a completed I/O. */ sc_ep_clr(sc, SE_FL_WAIT_DATA); - if (!tick_isset(oc->wex)) { - oc->wex = tick_add_ifset(now_ms, sc->wto); - if (tick_isset(ic->rex) && !(sc->flags & SC_FL_INDEP_STR)) { + if (!tick_isset(sc_ep_wex(sc))) { + sc_ep_set_wex(sc, sc->wto); + if (tick_isset(sc_ep_rex(sc)) && !(sc->flags & SC_FL_INDEP_STR)) { /* Note: depending on the protocol, we don't know if we're waiting * for incoming data or not. So in order to prevent the socket from * expiring read timeouts during writes, we refresh the read timeout, * except if it was already infinite or if we have explicitly setup * independent streams. */ - ic->rex = tick_add_ifset(now_ms, sc->rto); + sc_ep_set_rex(sc, sc->rto); } } } @@ -1120,7 +1118,7 @@ static void sc_notify(struct stconn *sc) if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) && (sc->state == SC_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)))) sc_shutw(sc); - oc->wex = TICK_ETERNITY; + sc_ep_reset_wex(sc); } /* indicate that we may be waiting for data from the output channel or @@ -1135,12 +1133,12 @@ static void sc_notify(struct stconn *sc) if (oc->flags & (CF_WRITE_EVENT)) { if (sc_ep_test(sc, SE_FL_ERR_PENDING|SE_FL_ERROR) && !channel_is_empty(oc)) - if (tick_isset(oc->wex)) - oc->wex = tick_add_ifset(now_ms, sc->wto); + if (tick_isset(sc_ep_wex(sc))) + sc_ep_set_wex(sc, sc->wto); if (!(sc->flags & SC_FL_INDEP_STR)) - if (tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, sc->rto); + if (tick_isset(sc_ep_rex(sc))) + sc_ep_set_rex(sc, sc->rto); } if (oc->flags & CF_DONT_READ) @@ -1190,12 +1188,12 @@ static void sc_notify(struct stconn *sc) if (ic->flags & (CF_EOI|CF_SHUTR) || sc_ep_test(sc, SE_FL_APPLET_NEED_CONN) || (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) { - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); } else if ((ic->flags & (CF_SHUTR|CF_READ_EVENT)) == CF_READ_EVENT) { /* we must re-enable reading if sc_chk_snd() has freed some space */ - if (tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, sc->rto); + if (tick_isset(sc_ep_rex(sc))) + sc_ep_set_rex(sc, sc->rto); } /* wake the task up only when needed */ @@ -1223,8 +1221,8 @@ static void sc_notify(struct stconn *sc) else { /* Update expiration date for the task and requeue it */ task->expire = tick_first((tick_is_expired(task->expire, now_ms) ? 0 : task->expire), - tick_first(tick_first(ic->rex, ic->wex), - tick_first(oc->rex, oc->wex))); + tick_first(tick_first(sc_ep_rex(sc), sc_ep_wex(sc)), + tick_first(sc_ep_rex(sco), sc_ep_wex(sco)))); task->expire = tick_first(task->expire, ic->analyse_exp); task->expire = tick_first(task->expire, oc->analyse_exp); @@ -1251,7 +1249,7 @@ static void sc_conn_read0(struct stconn *sc) if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; - ic->rex = TICK_ETERNITY; + sc_ep_reset_rex(sc); if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) return; @@ -1275,7 +1273,7 @@ static void sc_conn_read0(struct stconn *sc) oc->flags &= ~CF_SHUTW_NOW; oc->flags |= CF_SHUTW; - oc->wex = TICK_ETERNITY; + sc_ep_reset_wex(sc); sc->state = SC_ST_DIS; if (sc->flags & SC_FL_ISBACK) diff --git a/src/stream.c b/src/stream.c index 0296bed82..0c8fad547 100644 --- a/src/stream.c +++ b/src/stream.c @@ -206,10 +206,11 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace task, s, s->flags, s->conn_err_type, txn->flags, txn->req.flags, txn->rsp.flags, txn->status); } else { - chunk_appendf(&trace_buf, " - t=%p s=(%p,0x%08x,0x%x) scf=(%p,%d,0x%08x,0x%x) scb=(%p,%d,0x%08x,0x%x) retries=%d", + chunk_appendf(&trace_buf, " - t=%p s=(%p,0x%08x,0x%x) scf=(%p,%d,0x%08x,0x%x) scb=(%p,%d,0x%08x,0x%x) scf.exp(r,w)=(%u,%u) scb.exp(r,w)=(%u,%u) retries=%d", task, s, s->flags, s->conn_err_type, s->scf, s->scf->state, s->scf->flags, s->scf->sedesc->flags, s->scb, s->scb->state, s->scb->flags, s->scb->sedesc->flags, + sc_ep_rex(s->scf), sc_ep_wex(s->scf), sc_ep_rex(s->scb), sc_ep_wex(s->scb), s->conn_retries); } @@ -219,17 +220,17 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace /* If txn defined, don't display all channel info */ if (src->verbosity == STRM_VERB_SIMPLE || txn) { - chunk_appendf(&trace_buf, " req=(%p .fl=0x%08x .exp(r,w,a)=(%u,%u,%u))", - req, req->flags, req->rex, req->wex, req->analyse_exp); - chunk_appendf(&trace_buf, " res=(%p .fl=0x%08x .exp(r,w,a)=(%u,%u,%u))", - res, res->flags, res->rex, res->wex, res->analyse_exp); + chunk_appendf(&trace_buf, " req=(%p .fl=0x%08x .exp=,%u)", + req, req->flags, req->analyse_exp); + chunk_appendf(&trace_buf, " res=(%p .fl=0x%08x .exp=%u)", + res, res->flags, res->analyse_exp); } else { - chunk_appendf(&trace_buf, " req=(%p .fl=0x%08x .ana=0x%08x .exp(r,w,a)=(%u,%u,%u) .o=%lu .tot=%llu .to_fwd=%u)", - req, req->flags, req->analysers, req->rex, req->wex, req->analyse_exp, + chunk_appendf(&trace_buf, " req=(%p .fl=0x%08x .ana=0x%08x .exp=%u .o=%lu .tot=%llu .to_fwd=%u)", + req, req->flags, req->analysers, req->analyse_exp, (long)req->output, req->total, req->to_forward); - chunk_appendf(&trace_buf, " res=(%p .fl=0x%08x .ana=0x%08x .exp(r,w,a)=(%u,%u,%u) .o=%lu .tot=%llu .to_fwd=%u)", - res, res->flags, res->analysers, res->rex, res->wex, res->analyse_exp, + chunk_appendf(&trace_buf, " res=(%p .fl=0x%08x .ana=0x%08x .exp=%u .o=%lu .tot=%llu .to_fwd=%u)", + res, res->flags, res->analysers, res->analyse_exp, (long)res->output, res->total, res->to_forward); } @@ -519,8 +520,6 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer s->scf->rto = sess->fe->timeout.client; s->scf->wto = sess->fe->timeout.client; - s->req.rex = TICK_ETERNITY; - s->req.wex = TICK_ETERNITY; s->req.analyse_exp = TICK_ETERNITY; channel_init(&s->res); @@ -534,8 +533,6 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer s->scb->wto = TICK_ETERNITY; s->scb->rto = TICK_ETERNITY; - s->res.rex = TICK_ETERNITY; - s->res.wex = TICK_ETERNITY; s->res.analyse_exp = TICK_ETERNITY; s->txn = NULL; @@ -854,7 +851,7 @@ void stream_retnclose(struct stream *s, const struct buffer *msg) if (likely(msg && msg->data)) co_inject(oc, msg->area, msg->data); - oc->wex = tick_add_ifset(now_ms, s->scf->wto); + sc_ep_set_wex(s->scf, s->scf->wto); channel_auto_read(oc); channel_auto_close(oc); channel_shutr_now(oc); @@ -950,7 +947,7 @@ static void back_establish(struct stream *s) */ sc_chk_rcv(s->scb); } - req->wex = TICK_ETERNITY; + sc_ep_reset_wex(s->scf); /* If we managed to get the whole response, and we don't have anything * left to send, or can't, switch to SC_ST_DIS now. */ if (rep->flags & (CF_SHUTR | CF_SHUTW)) { @@ -2434,10 +2431,10 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if ((res->flags & CF_SHUTW) && tick_isset(sess->fe->timeout.clientfin)) scf->rto = sess->fe->timeout.clientfin; - req->rex = tick_add(now_ms, scf->rto); - req->wex = tick_add(now_ms, scb->wto); - res->rex = tick_add(now_ms, scb->rto); - res->wex = tick_add(now_ms, scf->wto); + sc_ep_set_rex(scf, scf->rto); + sc_ep_set_wex(scf, scb->wto); + sc_ep_set_rex(scb, scb->rto); + sc_ep_set_wex(scb, scf->wto); } } @@ -2526,8 +2523,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) update_exp_and_leave: /* Note: please ensure that if you branch here you disable SC_FL_DONT_WAKE */ t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 : t->expire), - tick_first(tick_first(req->rex, req->wex), - tick_first(res->rex, res->wex))); + tick_first(tick_first(sc_ep_rex(scf), sc_ep_wex(scf)), + tick_first(sc_ep_rex(scb), sc_ep_wex(scb)))); if (!req->analysers) req->analyse_exp = TICK_ETERNITY; @@ -3353,10 +3350,14 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm strm->txn->req.flags, strm->txn->rsp.flags); scf = strm->scf; - chunk_appendf(&trash, " scf=%p flags=0x%08x state=%s endp=%s,%p,0x%08x sub=%d\n", + chunk_appendf(&trash, " scf=%p flags=0x%08x state=%s endp=%s,%p,0x%08x sub=%d", scf, scf->flags, sc_state_str(scf->state), (sc_ep_test(scf, SE_FL_T_MUX) ? "CONN" : (sc_ep_test(scf, SE_FL_T_APPLET) ? "APPCTX" : "NONE")), scf->sedesc->se, sc_ep_get(scf), scf->wait_event.events); + chunk_appendf(&trash, " rex=%s", + sc_ep_rex(scf) ? human_time(TICKS_TO_MS(sc_ep_rex(scf) - now_ms), TICKS_TO_MS(1000)) : ""); + chunk_appendf(&trash, " wex=%s\n", + sc_ep_wex(scf) ? human_time(TICKS_TO_MS(sc_ep_wex(scf) - now_ms), TICKS_TO_MS(1000)) : ""); if ((conn = sc_conn(scf)) != NULL) { if (conn->mux && conn->mux->show_sd) { @@ -3395,10 +3396,14 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm } scb = strm->scb; - chunk_appendf(&trash, " scb=%p flags=0x%08x state=%s endp=%s,%p,0x%08x sub=%d\n", + chunk_appendf(&trash, " scb=%p flags=0x%08x state=%s endp=%s,%p,0x%08x sub=%d", scb, scb->flags, sc_state_str(scb->state), (sc_ep_test(scb, SE_FL_T_MUX) ? "CONN" : (sc_ep_test(scb, SE_FL_T_APPLET) ? "APPCTX" : "NONE")), scb->sedesc->se, sc_ep_get(scb), scb->wait_event.events); + chunk_appendf(&trash, " rex=%s", + sc_ep_rex(scb) ? human_time(TICKS_TO_MS(sc_ep_rex(scb) - now_ms), TICKS_TO_MS(1000)) : ""); + chunk_appendf(&trash, " wex=%s\n", + sc_ep_wex(scb) ? human_time(TICKS_TO_MS(sc_ep_wex(scb) - now_ms), TICKS_TO_MS(1000)) : ""); if ((conn = sc_conn(scb)) != NULL) { if (conn->mux && conn->mux->show_sd) { @@ -3438,29 +3443,16 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm chunk_appendf(&trash, " req=%p (f=0x%06x an=0x%x pipe=%d tofwd=%d total=%lld)\n" - " an_exp=%s", + " an_exp=%s buf=%p data=%p o=%u p=%u i=%u size=%u\n", &strm->req, strm->req.flags, strm->req.analysers, strm->req.pipe ? strm->req.pipe->data : 0, strm->req.to_forward, strm->req.total, strm->req.analyse_exp ? human_time(TICKS_TO_MS(strm->req.analyse_exp - now_ms), - TICKS_TO_MS(1000)) : ""); - - chunk_appendf(&trash, - " rex=%s", - strm->req.rex ? - human_time(TICKS_TO_MS(strm->req.rex - now_ms), - TICKS_TO_MS(1000)) : ""); - - chunk_appendf(&trash, - " wex=%s\n" - " buf=%p data=%p o=%u p=%u i=%u size=%u\n", - strm->req.wex ? - human_time(TICKS_TO_MS(strm->req.wex - now_ms), TICKS_TO_MS(1000)) : "", &strm->req.buf, - b_orig(&strm->req.buf), (unsigned int)co_data(&strm->req), + b_orig(&strm->req.buf), (unsigned int)co_data(&strm->req), (unsigned int)ci_head_ofs(&strm->req), (unsigned int)ci_data(&strm->req), (unsigned int)strm->req.buf.size); @@ -3482,26 +3474,13 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm chunk_appendf(&trash, " res=%p (f=0x%06x an=0x%x pipe=%d tofwd=%d total=%lld)\n" - " an_exp=%s", + " an_exp=%s buf=%p data=%p o=%u p=%u i=%u size=%u\n", &strm->res, strm->res.flags, strm->res.analysers, strm->res.pipe ? strm->res.pipe->data : 0, strm->res.to_forward, strm->res.total, strm->res.analyse_exp ? human_time(TICKS_TO_MS(strm->res.analyse_exp - now_ms), - TICKS_TO_MS(1000)) : ""); - - chunk_appendf(&trash, - " rex=%s", - strm->res.rex ? - human_time(TICKS_TO_MS(strm->res.rex - now_ms), - TICKS_TO_MS(1000)) : ""); - - chunk_appendf(&trash, - " wex=%s\n" - " buf=%p data=%p o=%u p=%u i=%u size=%u\n", - strm->res.wex ? - human_time(TICKS_TO_MS(strm->res.wex - now_ms), TICKS_TO_MS(1000)) : "", &strm->res.buf, b_orig(&strm->res.buf), (unsigned int)co_data(&strm->res), @@ -3692,19 +3671,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) (unsigned long long)curr_strm->cpu_time, (unsigned long long)curr_strm->lat_time); chunk_appendf(&trash, - " rq[f=%06xh,i=%u,an=%02xh,rx=%s", + " rq[f=%06xh,i=%u,an=%02xh", curr_strm->req.flags, (unsigned int)ci_data(&curr_strm->req), - curr_strm->req.analysers, - curr_strm->req.rex ? - human_time(TICKS_TO_MS(curr_strm->req.rex - now_ms), - TICKS_TO_MS(1000)) : ""); - - chunk_appendf(&trash, - ",wx=%s", - curr_strm->req.wex ? - human_time(TICKS_TO_MS(curr_strm->req.wex - now_ms), - TICKS_TO_MS(1000)) : ""); + curr_strm->req.analysers); chunk_appendf(&trash, ",ax=%s]", @@ -3713,20 +3683,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) TICKS_TO_MS(1000)) : ""); chunk_appendf(&trash, - " rp[f=%06xh,i=%u,an=%02xh,rx=%s", + " rp[f=%06xh,i=%u,an=%02xh", curr_strm->res.flags, (unsigned int)ci_data(&curr_strm->res), - curr_strm->res.analysers, - curr_strm->res.rex ? - human_time(TICKS_TO_MS(curr_strm->res.rex - now_ms), - TICKS_TO_MS(1000)) : ""); - - chunk_appendf(&trash, - ",wx=%s", - curr_strm->res.wex ? - human_time(TICKS_TO_MS(curr_strm->res.wex - now_ms), - TICKS_TO_MS(1000)) : ""); - + curr_strm->res.analysers); chunk_appendf(&trash, ",ax=%s]", curr_strm->res.analyse_exp ? @@ -3734,18 +3694,28 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) TICKS_TO_MS(1000)) : ""); conn = sc_conn(curr_strm->scf); - chunk_appendf(&trash, - " scf=[%d,%1xh,fd=%d]", - curr_strm->scf->state, - curr_strm->scf->flags, - conn_fd(conn)); + chunk_appendf(&trash," scf=[%d,%1xh,fd=%d", + curr_strm->scf->state, curr_strm->scf->flags, conn_fd(conn)); + chunk_appendf(&trash, ",rex=%s", + sc_ep_rex(curr_strm->scf) ? + human_time(TICKS_TO_MS(sc_ep_rex(curr_strm->scf) - now_ms), + TICKS_TO_MS(1000)) : ""); + chunk_appendf(&trash,",wex=%s]", + sc_ep_wex(curr_strm->scf) ? + human_time(TICKS_TO_MS(sc_ep_wex(curr_strm->scf) - now_ms), + TICKS_TO_MS(1000)) : ""); conn = sc_conn(curr_strm->scb); - chunk_appendf(&trash, - " scb=[%d,%1xh,fd=%d]", - curr_strm->scb->state, - curr_strm->scb->flags, - conn_fd(conn)); + chunk_appendf(&trash, " scb=[%d,%1xh,fd=%d", + curr_strm->scb->state, curr_strm->scb->flags, conn_fd(conn)); + chunk_appendf(&trash, ",rex=%s", + sc_ep_rex(curr_strm->scb) ? + human_time(TICKS_TO_MS(sc_ep_rex(curr_strm->scb) - now_ms), + TICKS_TO_MS(1000)) : ""); + chunk_appendf(&trash, ",wex=%s]", + sc_ep_wex(curr_strm->scb) ? + human_time(TICKS_TO_MS(sc_ep_wex(curr_strm->scb) - now_ms), + TICKS_TO_MS(1000)) : ""); chunk_appendf(&trash, " exp=%s rc=%d c_exp=%s",