diff --git a/include/haproxy/sc_strm.h b/include/haproxy/sc_strm.h index 6e66e6c15..b50b04045 100644 --- a/include/haproxy/sc_strm.h +++ b/include/haproxy/sc_strm.h @@ -309,8 +309,10 @@ static inline int sc_is_recv_allowed(const struct stconn *sc) static inline void sc_chk_rcv(struct stconn *sc) { if (sc_ep_test(sc, SE_FL_APPLET_NEED_CONN) && - sc_state_in(sc_opposite(sc)->state, SC_SB_RDY|SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) + sc_state_in(sc_opposite(sc)->state, SC_SB_RDY|SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) { sc_ep_clr(sc, SE_FL_APPLET_NEED_CONN); + sc_ep_report_read_activity(sc); + } if (!sc_is_recv_allowed(sc)) return; diff --git a/include/haproxy/stconn-t.h b/include/haproxy/stconn-t.h index 5eddd8832..090487e17 100644 --- a/include/haproxy/stconn-t.h +++ b/include/haproxy/stconn-t.h @@ -199,15 +199,26 @@ struct stconn; * is the stream endpoint, i.e. the mux stream or the appctx * is the connection for connection-based streams * is the stream connector we're attached to, or NULL + * is the last read activity + * is the first send blocked * is the expiration date for a read, in ticks * is the expiration date for a write or connect, in ticks * SE_FL_* -*/ + * + * should be updated when a read activity is detected. It can be a + * sucessful receive, when a shutr is reported or when receives are + * unblocked. + + * should be updated when the first send of a series is blocked and reset + * when a successful send is reported. + */ struct sedesc { void *se; struct connection *conn; struct stconn *sc; unsigned int flags; + unsigned int lra; + unsigned int fsb; int rex; int wex; }; diff --git a/include/haproxy/stconn.h b/include/haproxy/stconn.h index 56de284e0..211a5d1a8 100644 --- a/include/haproxy/stconn.h +++ b/include/haproxy/stconn.h @@ -136,6 +136,56 @@ static forceinline uint sc_ep_get(const struct stconn *sc) return se_fl_get(sc->sedesc); } +/* Return the last read activity timestamp. May be TICK_ETERNITY */ +static forceinline unsigned int sc_ep_lra(const struct stconn *sc) +{ + return sc->sedesc->lra; +} + +/* Return the first send blocked timestamp. May be TICK_ETERNITY */ +static forceinline unsigned int sc_ep_fsb(const struct stconn *sc) +{ + return sc->sedesc->fsb; +} + +/* Report a read activity. This function sets to now_ms */ +static forceinline void sc_ep_report_read_activity(struct stconn *sc) +{ + sc->sedesc->lra = now_ms; +} + +/* Report a send blocked. This function sets to now_ms if it was not + * already set + */ +static forceinline void sc_ep_report_blocked_send(struct stconn *sc) +{ + if (!tick_isset(sc->sedesc->fsb)) + sc->sedesc->fsb = now_ms; +} + +/* Report a send activity by setting to TICK_ETERNITY. + * For non-independent stream, a read activity is reported. + */ +static forceinline void sc_ep_report_send_activity(struct stconn *sc) +{ + sc->sedesc->fsb = TICK_ETERNITY; + if (!(sc->flags & SC_FL_INDEP_STR)) + sc_ep_report_read_activity(sc); +} + +static forceinline int sc_ep_rcv_ex(const struct stconn *sc) +{ + return (tick_isset(sc->sedesc->lra) + ? tick_add_ifset(sc->sedesc->lra, sc->ioto) + : TICK_ETERNITY); +} + +static forceinline int sc_ep_snd_ex(const struct stconn *sc) +{ + return (tick_isset(sc->sedesc->fsb) + ? tick_add_ifset(sc->sedesc->fsb, sc->ioto) + : TICK_ETERNITY); +} static forceinline int sc_ep_rex(const struct stconn *sc) { @@ -345,11 +395,14 @@ static inline void se_have_no_more_data(struct sedesc *se) } /* The application layer informs a stream connector that it's willing to - * receive data from the endpoint. + * receive data from the endpoint. A read activity is reported. */ static inline void sc_will_read(struct stconn *sc) { - sc->flags &= ~SC_FL_WONT_READ; + if (sc->flags & SC_FL_WONT_READ) { + sc->flags &= ~SC_FL_WONT_READ; + sc_ep_report_read_activity(sc); + } } /* The application layer informs a stream connector that it will not receive @@ -372,11 +425,14 @@ static inline void se_need_remote_conn(struct sedesc *se) } /* The application layer tells the stream connector that it just got the input - * buffer it was waiting for. + * buffer it was waiting for. A read activity is reported. */ static inline void sc_have_buff(struct stconn *sc) { - sc->flags &= ~SC_FL_NEED_BUFF; + if (sc->flags & SC_FL_NEED_BUFF) { + sc->flags &= ~SC_FL_NEED_BUFF; + sc_ep_report_read_activity(sc); + } } /* The stream connector failed to get an input buffer and is waiting for it. @@ -392,10 +448,14 @@ static inline void sc_need_buff(struct stconn *sc) /* Tell a stream connector some room was made in the input buffer and any * failed attempt to inject data into it may be tried again. This is usually * called after a successful transfer of buffer contents to the other side. + * A read activity is reported. */ static inline void sc_have_room(struct stconn *sc) { - sc->flags &= ~SC_FL_NEED_ROOM; + if (sc->flags & SC_FL_NEED_ROOM) { + sc->flags &= ~SC_FL_NEED_ROOM; + sc_ep_report_read_activity(sc); + } } /* The stream connector announces it failed to put data into the input buffer diff --git a/src/applet.c b/src/applet.c index 7f63c2a1f..bfdab6f91 100644 --- a/src/applet.c +++ b/src/applet.c @@ -250,7 +250,17 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) if (count != co_data(sc_oc(sc))) { sc_oc(sc)->flags |= CF_WRITE_EVENT | CF_WROTE_DATA; sc_have_room(sc_opposite(sc)); + sc_ep_report_send_activity(sc); } + else { + if (sc_ep_test(sc, SE_FL_WONT_CONSUME)) + sc_ep_report_send_activity(sc); + else + sc_ep_report_blocked_send(sc); + } + + if (sc_ic(sc)->flags & CF_READ_EVENT) + sc_ep_report_read_activity(sc); /* measure the call rate and check for anomalies when too high */ if (((b_size(sc_ib(sc)) && sc->flags & SC_FL_NEED_BUFF) || // asks for a buffer which is present diff --git a/src/stconn.c b/src/stconn.c index f65ae157e..07b9767b7 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -92,6 +92,8 @@ void sedesc_init(struct sedesc *sedesc) sedesc->se = NULL; sedesc->conn = NULL; sedesc->sc = NULL; + sedesc->lra = TICK_ETERNITY; + sedesc->fsb = TICK_ETERNITY; sedesc->rex = sedesc->wex = TICK_ETERNITY; se_fl_setall(sedesc, SE_FL_NONE); } @@ -533,6 +535,7 @@ static void sc_app_shutr(struct stconn *sc) if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; + sc_ep_report_read_activity(sc); sc_ep_reset_rex(sc); if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) @@ -1238,6 +1241,7 @@ static void sc_conn_read0(struct stconn *sc) if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; + sc_ep_report_read_activity(sc); sc_ep_reset_rex(sc); if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) @@ -1567,6 +1571,7 @@ static int sc_conn_recv(struct stconn *sc) ic->xfer_large = 0; } ic->last_read = now_ms; + sc_ep_report_read_activity(sc); } end_recv: @@ -1575,6 +1580,7 @@ static int sc_conn_recv(struct stconn *sc) /* Report EOI on the channel if it was reached from the mux point of * view. */ if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI)) { + sc_ep_report_read_activity(sc); ic->flags |= (CF_EOI|CF_READ_EVENT); ret = 1; } @@ -1760,7 +1766,10 @@ static int sc_conn_send(struct stconn *sc) sc->state = SC_ST_RDY; sc_have_room(sc_opposite(sc)); + sc_ep_report_send_activity(sc); } + else + sc_ep_report_blocked_send(sc); if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { oc->flags |= CF_WRITE_EVENT; diff --git a/src/stream.c b/src/stream.c index 96739fa46..7f07ee8ac 100644 --- a/src/stream.c +++ b/src/stream.c @@ -295,6 +295,7 @@ int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input) s->req.buf = *input; *input = BUF_NULL; s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf)); + sc_ep_report_read_activity(s->scf); } s->req.flags |= CF_READ_EVENT; /* Always report a read event */ @@ -562,6 +563,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer s->req.buf = *input; *input = BUF_NULL; s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf)); + sc_ep_report_read_activity(s->scf); } /* it is important not to call the wakeup function directly but to @@ -925,6 +927,7 @@ static void back_establish(struct stream *s) se_have_more_data(s->scb->sedesc); rep->flags |= CF_READ_EVENT; /* producer is now attached */ + sc_ep_report_read_activity(s->scb); if (conn) { /* real connections have timeouts * if already defined, it means that a set-timeout rule has