diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h index 0cd9baba5..32724588e 100644 --- a/include/haproxy/stream-t.h +++ b/include/haproxy/stream-t.h @@ -295,6 +295,11 @@ struct stream { int type; /* entity type (0: undef, 1: rule, 2: filter) */ } last_entity; /* last evaluated entity that interrupted processing */ + struct { + void *ptr; /* Pointer on the entity (def: NULL) */ + int type; /* entity type (0: undef, 1: rule, 2: filter) */ + } waiting_entity; /* The entity waiting to continue its processing and interrupted by an error/timeout */ + unsigned int stream_epoch; /* copy of stream_epoch when the stream was created */ struct hlua *hlua[2]; /* lua runtime context (0: global, 1: per-thread) */ diff --git a/src/filters.c b/src/filters.c index 45bfa12a6..240e99195 100644 --- a/src/filters.c +++ b/src/filters.c @@ -57,9 +57,14 @@ static int handle_analyzer_result(struct stream *s, struct channel *chn, unsigne do { \ struct filter *filter; \ \ - if (strm_flt(strm)->current[CHN_IDX(chn)]) { \ + if (strm_flt(strm)->current[CHN_IDX(chn)]) { \ filter = strm_flt(strm)->current[CHN_IDX(chn)]; \ - strm_flt(strm)->current[CHN_IDX(chn)] = NULL; \ + strm_flt(strm)->current[CHN_IDX(chn)] = NULL; \ + if (!(chn_prod(chn)->flags & SC_FL_ERROR) && \ + !(chn->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { \ + (strm)->waiting_entity.type = 0; \ + (strm)->waiting_entity.ptr = NULL; \ + } \ goto resume_execution; \ } \ \ @@ -72,7 +77,11 @@ static int handle_analyzer_result(struct stream *s, struct channel *chn, unsigne #define BREAK_EXECUTION(strm, chn, label) \ do { \ - if (ret < 0) { \ + if (ret == 0) { \ + s->waiting_entity.type = 2; \ + s->waiting_entity.ptr = filter; \ + } \ + else if (ret < 0) { \ (strm)->last_entity.type = 2; \ (strm)->last_entity.ptr = filter; \ } \ diff --git a/src/http_ana.c b/src/http_ana.c index d83709af7..255c0e439 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -2734,6 +2734,11 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis (px->options & PR_O_ABRT_CLOSE))) act_opts |= ACT_OPT_FINAL; + if (!(s->scf->flags & SC_FL_ERROR) & !(s->req.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { + s->waiting_entity.type = 0; + s->waiting_entity.ptr = NULL; + } + switch (rule->action_ptr(rule, px, sess, s, act_opts)) { case ACT_RET_CONT: break; @@ -2753,6 +2758,8 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis rule_ret = HTTP_RULE_RES_ERROR; goto end; } + s->waiting_entity.type = 1; + s->waiting_entity.ptr = rule; rule_ret = HTTP_RULE_RES_YIELD; goto end; case ACT_RET_ERR: @@ -2908,6 +2915,11 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis (px->options & PR_O_ABRT_CLOSE))) act_opts |= ACT_OPT_FINAL; + if (!(s->scb->flags & SC_FL_ERROR) & !(s->res.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { + s->waiting_entity.type = 0; + s->waiting_entity.ptr = NULL; + } + switch (rule->action_ptr(rule, px, sess, s, act_opts)) { case ACT_RET_CONT: break; @@ -2927,6 +2939,8 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis rule_ret = HTTP_RULE_RES_ERROR; goto end; } + s->waiting_entity.type = 1; + s->waiting_entity.ptr = rule; rule_ret = HTTP_RULE_RES_YIELD; goto end; case ACT_RET_ERR: diff --git a/src/stream.c b/src/stream.c index e015b97bf..2112c9910 100644 --- a/src/stream.c +++ b/src/stream.c @@ -391,6 +391,8 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer s->rules_exp = TICK_ETERNITY; s->last_entity.type = 0; s->last_entity.ptr = NULL; + s->waiting_entity.type = 0; + s->waiting_entity.ptr = NULL; s->stkctr = NULL; if (pool_head_stk_ctr) { @@ -4158,6 +4160,41 @@ static int smp_fetch_last_entity(const struct arg *args, struct sample *smp, con return 1; } +static int smp_fetch_waiting_entity(const struct arg *args, struct sample *smp, const char *km, void *private) +{ + smp->flags = SMP_F_VOL_TXN; + smp->data.type = SMP_T_STR; + if (!smp->strm) + return 0; + + if (smp->strm->waiting_entity.type == 1) { + struct act_rule *rule = smp->strm->waiting_entity.ptr; + struct buffer *trash = get_trash_chunk(); + + trash->data = snprintf(trash->area, trash->size, "%s:%d", rule->conf.file, rule->conf.line); + smp->data.u.str = *trash; + } + else if (smp->strm->waiting_entity.type == 2) { + struct filter *filter = smp->strm->waiting_entity.ptr; + + if (FLT_ID(filter)) { + smp->flags |= SMP_F_CONST; + smp->data.u.str.area = (char *)FLT_ID(filter); + smp->data.u.str.data = strlen(FLT_ID(filter)); + } + else { + struct buffer *trash = get_trash_chunk(); + + trash->data = snprintf(trash->area, trash->size, "%p", filter->config); + smp->data.u.str = *trash; + } + } + else + return 0; + + return 1; +} + static int smp_fetch_sess_term_state(const struct arg *args, struct sample *smp, const char *km, void *private) { struct buffer *trash = get_trash_chunk(); @@ -4226,6 +4263,7 @@ static struct sample_fetch_kw_list smp_kws = {ILH, { { "txn.id32", smp_fetch_id32, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, }, { "txn.redispatched", smp_fetch_redispatched, 0, NULL, SMP_T_BOOL, SMP_USE_L4SRV, }, { "txn.sess_term_state",smp_fetch_sess_term_state, 0, NULL, SMP_T_STR, SMP_USE_INTRN, }, + { "waiting_entity", smp_fetch_waiting_entity, 0, NULL, SMP_T_STR, SMP_USE_INTRN, }, { NULL, NULL, 0, 0, 0 }, }}; diff --git a/src/tcp_rules.c b/src/tcp_rules.c index d3263b075..c54476db2 100644 --- a/src/tcp_rules.c +++ b/src/tcp_rules.c @@ -137,6 +137,10 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit) if (s->current_rule) { rule = s->current_rule; s->current_rule = NULL; + if (!(req->flags & SC_FL_ERROR) && !(req->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { + s->waiting_entity.type = 0; + s->waiting_entity.ptr = NULL; + } if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) goto resume_execution; } @@ -179,6 +183,8 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit) s->last_entity.ptr = rule; goto internal; } + s->waiting_entity.type = 1; + s->waiting_entity.ptr = rule; goto missing_data; case ACT_RET_DENY: s->last_entity.type = 1; @@ -321,6 +327,10 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit) if (s->current_rule) { rule = s->current_rule; s->current_rule = NULL; + if (!(rep->flags & SC_FL_ERROR) && !(rep->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { + s->waiting_entity.type = 0; + s->waiting_entity.ptr = NULL; + } if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) goto resume_execution; } @@ -343,6 +353,7 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit) if (ret) { act_opts |= ACT_OPT_FIRST; resume_execution: + /* Always call the action function if defined */ if (rule->action_ptr) { switch (rule->action_ptr(rule, s->be, s->sess, s, act_opts)) { @@ -363,6 +374,8 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit) s->last_entity.ptr = rule; goto internal; } + s->waiting_entity.type = 1; + s->waiting_entity.ptr = rule; channel_dont_close(rep); goto missing_data; case ACT_RET_DENY: