MINOR: tevt/stream/stconn: Report termination events for stream and sc

In this patch, events for the stream location are reported. These events are
first reported on the corresponding stream-connector. So front events on scf
and back event on scb. Then all events are both merged in the stream. But
only 4 events are saved on the stream.

Several internal events are for now grouped with the type
"tevt_type_intercepted". More events will be added to have a better
resolution. But at least the place to report these events are identified.

For now, when a event is reported on a SC, it is also reported on the stream
and vice versa.
This commit is contained in:
Christopher Faulet 2024-12-23 14:30:33 +01:00
parent 147b6d3d4d
commit 00a07c8b54
8 changed files with 83 additions and 18 deletions

View File

@ -347,6 +347,7 @@ struct stconn {
unsigned int flags; /* SC_FL_* */
unsigned int ioto; /* I/O activity timeout */
uint32_t term_evts_log; /* termination events log aggregating SE + connection events */
ssize_t room_needed; /* free space in the input buffer required to receive more data.
* -1 : the SC is waiting for room but not on a specific amount of data
* >= 0 : min free space required to progress. 0 means SC must be unblocked ASAP

View File

@ -565,4 +565,13 @@ static inline size_t se_done_ff(struct sedesc *se)
return ret;
}
static inline void sc_report_term_evt(struct stconn *sc, enum term_event_loc loc, enum term_event_type type)
{
if (sc->flags & SC_FL_ISBACK)
loc += 8;
sc->term_evts_log = tevt_report_event(sc->term_evts_log, loc, type);
if (sc_strm(sc))
__sc_strm(sc)->term_evts_log = tevt_report_event(__sc_strm(sc)->term_evts_log, loc, type);
}
#endif /* _HAPROXY_STCONN_H */

View File

@ -324,6 +324,7 @@ struct stream {
} waiting_entity; /* The entity waiting to continue its processing and interrupted by an error/timeout */
unsigned int stream_epoch; /* copy of stream_epoch when the stream was created */
uint32_t term_evts_log; /* termination events log */
struct hlua *hlua[2]; /* lua runtime context (0: global, 1: per-thread) */
/* Context */

View File

@ -418,6 +418,19 @@ static inline unsigned int stream_map_task_state(unsigned int state)
0;
}
static inline void stream_report_term_evt(struct stconn *sc, enum term_event_loc loc, enum term_event_type type)
{
struct stream *s = sc_strm(sc);
if (!s)
return;
if (sc->flags & SC_FL_ISBACK)
loc += 8;
s->term_evts_log = tevt_report_event(s->term_evts_log, loc, type);
sc->term_evts_log = tevt_report_event(sc->term_evts_log, loc, type);
}
int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
void stream_retnclose(struct stream *s, const struct buffer *msg);

View File

@ -353,6 +353,8 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
/* fall through */
return_prx_cond:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
http_set_term_flags(s);
http_reply_and_close(s, txn->status, http_error_message(s));
@ -505,6 +507,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
if (!http_apply_redirect_rule(rule, s, txn)) {
goto return_int_err;
}
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
goto done;
}
@ -610,8 +613,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
/* fall through */
return_prx_cond:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
http_set_term_flags(s);
req->analysers &= AN_REQ_FLT_END;
req->analyse_exp = TICK_ETERNITY;
s->current_rule = s->current_rule_list = NULL;
@ -747,6 +751,8 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
if (sess->listener && sess->listener->counters)
_HA_ATOMIC_INC(&sess->listener->counters->internal_errors);
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
http_set_term_flags(s);
http_reply_and_close(s, txn->status, http_error_message(s));
@ -788,6 +794,8 @@ int http_process_tarpit(struct stream *s, struct channel *req, int an_bit)
*/
s->logs.t_queue = ns_to_ms(now_ns - s->logs.accept_ts);
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
http_set_term_flags(s);
http_reply_and_close(s, txn->status, (!(s->scf->flags & SC_FL_ERROR) ? http_error_message(s) : NULL));
@ -868,6 +876,8 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit
/* fall through */
return_prx_cond:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
http_set_term_flags(s);
req->analysers &= AN_REQ_FLT_END;
@ -1102,6 +1112,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
goto return_prx_cond;
return_int_err:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
if (!(s->flags & SF_ERR_MASK))
s->flags |= SF_ERR_INTERNAL;
_HA_ATOMIC_INC(&sess->fe->fe_counters.internal_errors);
@ -1115,6 +1127,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
goto return_prx_cond;
return_bad_req:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
_HA_ATOMIC_INC(&sess->fe->fe_counters.failed_req);
if (sess->listener && sess->listener->counters)
_HA_ATOMIC_INC(&sess->listener->counters->failed_req);
@ -1669,6 +1683,8 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
/* fall through */
return_prx_cond:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
http_set_term_flags(s);
http_reply_and_close(s, txn->status, http_error_message(s));
@ -2004,6 +2020,8 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
return_prx_cond:
s->scb->flags |= SC_FL_NOLINGER;
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
http_set_term_flags(s);
rep->analysers &= AN_RES_FLT_END;
@ -2245,6 +2263,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
goto return_error;
return_int_err:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
_HA_ATOMIC_INC(&sess->fe->fe_counters.internal_errors);
_HA_ATOMIC_INC(&s->be->be_counters.internal_errors);
if (sess->listener && sess->listener->counters)
@ -2257,6 +2277,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
goto return_error;
return_bad_res:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
_HA_ATOMIC_INC(&s->be->be_counters.failed_resp);
if (objt_server(s->target)) {
_HA_ATOMIC_INC(&__objt_server(s->target)->counters.failed_resp);

View File

@ -100,7 +100,7 @@ void sedesc_init(struct sedesc *sedesc)
sedesc->fsb = TICK_ETERNITY;
sedesc->xref.peer = NULL;
se_fl_setall(sedesc, SE_FL_NONE);
sedesc->term_evts_log = 0;
sedesc->abort_info.info = 0;
sedesc->abort_info.code = 0;
@ -146,8 +146,10 @@ void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode)
struct se_abort_info *reason = NULL;
unsigned int flags = 0;
if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW))
if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) {
sc_report_term_evt(sedesc->sc, tevt_loc_strm, tevt_type_shutw);
flags |= (mode & SE_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS;
}
if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR))
flags |= (mode & SE_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR;
@ -208,6 +210,8 @@ static struct stconn *sc_new(struct sedesc *sedesc)
sc->wait_event.tasklet = NULL;
sc->wait_event.events = 0;
sc->term_evts_log = 0;
/* If there is no endpoint, allocate a new one now */
if (!sedesc) {
sedesc = sedesc_new();
@ -1233,7 +1237,7 @@ static void sc_conn_eos(struct stconn *sc)
sc->flags |= SC_FL_EOS;
ic->flags |= CF_READ_EVENT;
sc_ep_report_read_activity(sc);
sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_shutr: tevt_type_truncated_shutr));
if (sc->state != SC_ST_EST)
return;
@ -1520,6 +1524,8 @@ int sc_conn_recv(struct stconn *sc)
}
if (sc_ep_test(sc, SE_FL_ERROR)) {
sc->flags |= SC_FL_ERROR;
if (!(sc->flags & SC_FL_EOS))
sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_rcv_err: tevt_type_truncated_rcv_err));
ret = 1;
}
@ -1745,6 +1751,7 @@ int sc_conn_send(struct stconn *sc)
if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
oc->flags |= CF_WRITE_EVENT;
BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
sc_report_term_evt(sc, tevt_loc_strm, tevt_type_snd_err);
if (sc_ep_test(sc, SE_FL_ERROR))
sc->flags |= SC_FL_ERROR;
return 1;
@ -1856,6 +1863,19 @@ int sc_conn_process(struct stconn *sc)
sc->state = SC_ST_RDY;
}
/* Report EOI on the channel if it was reached from the mux point of
* view.
*
* Note: This test is only required because sc_conn_process is also the SI
* wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take
* care of it.
*/
if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
sc->flags |= SC_FL_EOI;
ic->flags |= CF_READ_EVENT;
sc_ep_report_read_activity(sc);
}
/* Report EOS on the channel if it was reached from the mux point of
* view.
*
@ -1870,21 +1890,11 @@ int sc_conn_process(struct stconn *sc)
sc_conn_eos(sc);
}
/* Report EOI on the channel if it was reached from the mux point of
* view.
*
* Note: This test is only required because sc_conn_process is also the SI
* wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take
* care of it.
*/
if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
sc->flags |= SC_FL_EOI;
ic->flags |= CF_READ_EVENT;
sc_ep_report_read_activity(sc);
}
if (sc_ep_test(sc, SE_FL_ERROR))
if (sc_ep_test(sc, SE_FL_ERROR) && !(sc->flags & SC_FL_ERROR)) {
if (!(sc->flags & SC_FL_EOS))
sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_rcv_err: tevt_type_truncated_rcv_err));
sc->flags |= SC_FL_ERROR;
}
/* Second step : update the stream connector and channels, try to forward any
* pending data, then possibly wake the stream up based on the new

View File

@ -413,6 +413,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
s->stream_epoch = _HA_ATOMIC_LOAD(&stream_epoch);
s->uniq_id = _HA_ATOMIC_FETCH_ADD(&global.req_count, 1);
s->term_evts_log = 0;
/* OK, we're keeping the stream, so let's properly initialize the stream */
LIST_INIT(&s->back_refs);
@ -1575,21 +1576,25 @@ static void stream_handle_timeouts(struct stream *s)
channel_check_timeout(&s->res);
if (unlikely(!(s->scb->flags & SC_FL_SHUT_DONE) && (s->req.flags & CF_WRITE_TIMEOUT))) {
stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_tout);
s->scb->flags |= SC_FL_NOLINGER;
sc_shutdown(s->scb);
}
if (unlikely(!(s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->req.flags & CF_READ_TIMEOUT))) {
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_tout);
if (s->scf->flags & SC_FL_NOHALF)
s->scf->flags |= SC_FL_NOLINGER;
sc_abort(s->scf);
}
if (unlikely(!(s->scf->flags & SC_FL_SHUT_DONE) && (s->res.flags & CF_WRITE_TIMEOUT))) {
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_tout);
s->scf->flags |= SC_FL_NOLINGER;
sc_shutdown(s->scf);
}
if (unlikely(!(s->scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->res.flags & CF_READ_TIMEOUT))) {
stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_tout);
if (s->scb->flags & SC_FL_NOHALF)
s->scb->flags |= SC_FL_NOLINGER;
sc_abort(s->scb);

View File

@ -269,6 +269,8 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit)
stream_abort(s);
abort:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
req->analysers &= AN_REQ_FLT_END;
s->current_rule = s->current_rule_list = NULL;
req->analyse_exp = s->rules_exp = TICK_ETERNITY;
@ -475,6 +477,8 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit)
stream_abort(s);
abort:
// XXX: All errors are handled as intercepted here !
stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
rep->analysers &= AN_RES_FLT_END;
s->current_rule = s->current_rule_list = NULL;
rep->analyse_exp = s->rules_exp = TICK_ETERNITY;