From 00a07c8b54255ba748d71bde0a2e18767ea79d3e Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Mon, 23 Dec 2024 14:30:33 +0100 Subject: [PATCH] MINOR: tevt/stream/stconn: Report termination events for stream and sc In this patch, events for the stream location are reported. These events are first reported on the corresponding stream-connector. So front events on scf and back event on scb. Then all events are both merged in the stream. But only 4 events are saved on the stream. Several internal events are for now grouped with the type "tevt_type_intercepted". More events will be added to have a better resolution. But at least the place to report these events are identified. For now, when a event is reported on a SC, it is also reported on the stream and vice versa. --- include/haproxy/stconn-t.h | 1 + include/haproxy/stconn.h | 9 ++++++++ include/haproxy/stream-t.h | 1 + include/haproxy/stream.h | 13 +++++++++++ src/http_ana.c | 24 ++++++++++++++++++++- src/stconn.c | 44 +++++++++++++++++++++++--------------- src/stream.c | 5 +++++ src/tcp_rules.c | 4 ++++ 8 files changed, 83 insertions(+), 18 deletions(-) diff --git a/include/haproxy/stconn-t.h b/include/haproxy/stconn-t.h index 6db88f709..52fa3e88e 100644 --- a/include/haproxy/stconn-t.h +++ b/include/haproxy/stconn-t.h @@ -347,6 +347,7 @@ struct stconn { unsigned int flags; /* SC_FL_* */ unsigned int ioto; /* I/O activity timeout */ + uint32_t term_evts_log; /* termination events log aggregating SE + connection events */ ssize_t room_needed; /* free space in the input buffer required to receive more data. * -1 : the SC is waiting for room but not on a specific amount of data * >= 0 : min free space required to progress. 0 means SC must be unblocked ASAP diff --git a/include/haproxy/stconn.h b/include/haproxy/stconn.h index 53f761a28..0299815fd 100644 --- a/include/haproxy/stconn.h +++ b/include/haproxy/stconn.h @@ -565,4 +565,13 @@ static inline size_t se_done_ff(struct sedesc *se) return ret; } +static inline void sc_report_term_evt(struct stconn *sc, enum term_event_loc loc, enum term_event_type type) +{ + if (sc->flags & SC_FL_ISBACK) + loc += 8; + sc->term_evts_log = tevt_report_event(sc->term_evts_log, loc, type); + if (sc_strm(sc)) + __sc_strm(sc)->term_evts_log = tevt_report_event(__sc_strm(sc)->term_evts_log, loc, type); +} + #endif /* _HAPROXY_STCONN_H */ diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h index 0211adaae..335fed8c9 100644 --- a/include/haproxy/stream-t.h +++ b/include/haproxy/stream-t.h @@ -324,6 +324,7 @@ struct stream { } waiting_entity; /* The entity waiting to continue its processing and interrupted by an error/timeout */ unsigned int stream_epoch; /* copy of stream_epoch when the stream was created */ + uint32_t term_evts_log; /* termination events log */ struct hlua *hlua[2]; /* lua runtime context (0: global, 1: per-thread) */ /* Context */ diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h index 4e503a32d..4070b603a 100644 --- a/include/haproxy/stream.h +++ b/include/haproxy/stream.h @@ -418,6 +418,19 @@ static inline unsigned int stream_map_task_state(unsigned int state) 0; } +static inline void stream_report_term_evt(struct stconn *sc, enum term_event_loc loc, enum term_event_type type) +{ + struct stream *s = sc_strm(sc); + + if (!s) + return; + + if (sc->flags & SC_FL_ISBACK) + loc += 8; + s->term_evts_log = tevt_report_event(s->term_evts_log, loc, type); + sc->term_evts_log = tevt_report_event(sc->term_evts_log, loc, type); +} + int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout); void stream_retnclose(struct stream *s, const struct buffer *msg); diff --git a/src/http_ana.c b/src/http_ana.c index 41c5effd6..353333402 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -353,6 +353,8 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) /* fall through */ return_prx_cond: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); http_set_term_flags(s); http_reply_and_close(s, txn->status, http_error_message(s)); @@ -505,6 +507,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s if (!http_apply_redirect_rule(rule, s, txn)) { goto return_int_err; } + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); goto done; } @@ -610,8 +613,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s /* fall through */ return_prx_cond: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); http_set_term_flags(s); - req->analysers &= AN_REQ_FLT_END; req->analyse_exp = TICK_ETERNITY; s->current_rule = s->current_rule_list = NULL; @@ -747,6 +751,8 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit) if (sess->listener && sess->listener->counters) _HA_ATOMIC_INC(&sess->listener->counters->internal_errors); + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); http_set_term_flags(s); http_reply_and_close(s, txn->status, http_error_message(s)); @@ -788,6 +794,8 @@ int http_process_tarpit(struct stream *s, struct channel *req, int an_bit) */ s->logs.t_queue = ns_to_ms(now_ns - s->logs.accept_ts); + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); http_set_term_flags(s); http_reply_and_close(s, txn->status, (!(s->scf->flags & SC_FL_ERROR) ? http_error_message(s) : NULL)); @@ -868,6 +876,8 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit /* fall through */ return_prx_cond: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); http_set_term_flags(s); req->analysers &= AN_REQ_FLT_END; @@ -1102,6 +1112,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) goto return_prx_cond; return_int_err: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_INTERNAL; _HA_ATOMIC_INC(&sess->fe->fe_counters.internal_errors); @@ -1115,6 +1127,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) goto return_prx_cond; return_bad_req: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); _HA_ATOMIC_INC(&sess->fe->fe_counters.failed_req); if (sess->listener && sess->listener->counters) _HA_ATOMIC_INC(&sess->listener->counters->failed_req); @@ -1669,6 +1683,8 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) /* fall through */ return_prx_cond: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted); http_set_term_flags(s); http_reply_and_close(s, txn->status, http_error_message(s)); @@ -2004,6 +2020,8 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s return_prx_cond: s->scb->flags |= SC_FL_NOLINGER; + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted); http_set_term_flags(s); rep->analysers &= AN_RES_FLT_END; @@ -2245,6 +2263,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit goto return_error; return_int_err: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted); _HA_ATOMIC_INC(&sess->fe->fe_counters.internal_errors); _HA_ATOMIC_INC(&s->be->be_counters.internal_errors); if (sess->listener && sess->listener->counters) @@ -2257,6 +2277,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit goto return_error; return_bad_res: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted); _HA_ATOMIC_INC(&s->be->be_counters.failed_resp); if (objt_server(s->target)) { _HA_ATOMIC_INC(&__objt_server(s->target)->counters.failed_resp); diff --git a/src/stconn.c b/src/stconn.c index 1e69abc48..ef149c573 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -100,7 +100,7 @@ void sedesc_init(struct sedesc *sedesc) sedesc->fsb = TICK_ETERNITY; sedesc->xref.peer = NULL; se_fl_setall(sedesc, SE_FL_NONE); - + sedesc->term_evts_log = 0; sedesc->abort_info.info = 0; sedesc->abort_info.code = 0; @@ -146,8 +146,10 @@ void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode) struct se_abort_info *reason = NULL; unsigned int flags = 0; - if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) + if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) { + sc_report_term_evt(sedesc->sc, tevt_loc_strm, tevt_type_shutw); flags |= (mode & SE_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS; + } if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR)) flags |= (mode & SE_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR; @@ -208,6 +210,8 @@ static struct stconn *sc_new(struct sedesc *sedesc) sc->wait_event.tasklet = NULL; sc->wait_event.events = 0; + sc->term_evts_log = 0; + /* If there is no endpoint, allocate a new one now */ if (!sedesc) { sedesc = sedesc_new(); @@ -1233,7 +1237,7 @@ static void sc_conn_eos(struct stconn *sc) sc->flags |= SC_FL_EOS; ic->flags |= CF_READ_EVENT; sc_ep_report_read_activity(sc); - + sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_shutr: tevt_type_truncated_shutr)); if (sc->state != SC_ST_EST) return; @@ -1520,6 +1524,8 @@ int sc_conn_recv(struct stconn *sc) } if (sc_ep_test(sc, SE_FL_ERROR)) { sc->flags |= SC_FL_ERROR; + if (!(sc->flags & SC_FL_EOS)) + sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_rcv_err: tevt_type_truncated_rcv_err)); ret = 1; } @@ -1745,6 +1751,7 @@ int sc_conn_send(struct stconn *sc) if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { oc->flags |= CF_WRITE_EVENT; BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); + sc_report_term_evt(sc, tevt_loc_strm, tevt_type_snd_err); if (sc_ep_test(sc, SE_FL_ERROR)) sc->flags |= SC_FL_ERROR; return 1; @@ -1856,6 +1863,19 @@ int sc_conn_process(struct stconn *sc) sc->state = SC_ST_RDY; } + /* Report EOI on the channel if it was reached from the mux point of + * view. + * + * Note: This test is only required because sc_conn_process is also the SI + * wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take + * care of it. + */ + if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { + sc->flags |= SC_FL_EOI; + ic->flags |= CF_READ_EVENT; + sc_ep_report_read_activity(sc); + } + /* Report EOS on the channel if it was reached from the mux point of * view. * @@ -1870,21 +1890,11 @@ int sc_conn_process(struct stconn *sc) sc_conn_eos(sc); } - /* Report EOI on the channel if it was reached from the mux point of - * view. - * - * Note: This test is only required because sc_conn_process is also the SI - * wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take - * care of it. - */ - if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { - sc->flags |= SC_FL_EOI; - ic->flags |= CF_READ_EVENT; - sc_ep_report_read_activity(sc); - } - - if (sc_ep_test(sc, SE_FL_ERROR)) + if (sc_ep_test(sc, SE_FL_ERROR) && !(sc->flags & SC_FL_ERROR)) { + if (!(sc->flags & SC_FL_EOS)) + sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_rcv_err: tevt_type_truncated_rcv_err)); sc->flags |= SC_FL_ERROR; + } /* Second step : update the stream connector and channels, try to forward any * pending data, then possibly wake the stream up based on the new diff --git a/src/stream.c b/src/stream.c index 819de8967..c61c3fcb6 100644 --- a/src/stream.c +++ b/src/stream.c @@ -413,6 +413,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer s->stream_epoch = _HA_ATOMIC_LOAD(&stream_epoch); s->uniq_id = _HA_ATOMIC_FETCH_ADD(&global.req_count, 1); + s->term_evts_log = 0; /* OK, we're keeping the stream, so let's properly initialize the stream */ LIST_INIT(&s->back_refs); @@ -1575,21 +1576,25 @@ static void stream_handle_timeouts(struct stream *s) channel_check_timeout(&s->res); if (unlikely(!(s->scb->flags & SC_FL_SHUT_DONE) && (s->req.flags & CF_WRITE_TIMEOUT))) { + stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_tout); s->scb->flags |= SC_FL_NOLINGER; sc_shutdown(s->scb); } if (unlikely(!(s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->req.flags & CF_READ_TIMEOUT))) { + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_tout); if (s->scf->flags & SC_FL_NOHALF) s->scf->flags |= SC_FL_NOLINGER; sc_abort(s->scf); } if (unlikely(!(s->scf->flags & SC_FL_SHUT_DONE) && (s->res.flags & CF_WRITE_TIMEOUT))) { + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_tout); s->scf->flags |= SC_FL_NOLINGER; sc_shutdown(s->scf); } if (unlikely(!(s->scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->res.flags & CF_READ_TIMEOUT))) { + stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_tout); if (s->scb->flags & SC_FL_NOHALF) s->scb->flags |= SC_FL_NOLINGER; sc_abort(s->scb); diff --git a/src/tcp_rules.c b/src/tcp_rules.c index 9d450cfa9..8bb5d00c9 100644 --- a/src/tcp_rules.c +++ b/src/tcp_rules.c @@ -269,6 +269,8 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit) stream_abort(s); abort: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted); req->analysers &= AN_REQ_FLT_END; s->current_rule = s->current_rule_list = NULL; req->analyse_exp = s->rules_exp = TICK_ETERNITY; @@ -475,6 +477,8 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit) stream_abort(s); abort: + // XXX: All errors are handled as intercepted here ! + stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted); rep->analysers &= AN_RES_FLT_END; s->current_rule = s->current_rule_list = NULL; rep->analyse_exp = s->rules_exp = TICK_ETERNITY;