From 9ec2f4dc7c3ea4010426361bdbc246b3b0783449 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Wed, 23 Mar 2022 15:15:29 +0100 Subject: [PATCH] MAJOR: conn-stream: Share endpoint struct between the CS and the mux/applet The conn-stream endpoint is now shared between the conn-stream and the applet or the multiplexer. If the mux or the applet is created first, it is responsible to also create the endpoint and share it with the conn-stream. If the conn-stream is created first, it is the opposite. When the endpoint is only owned by an applet or a mux, it is called an orphan endpoint (there is no conn-stream). When it is only owned by a conn-stream, it is called a detached endpoint (there is no mux/applet). The last entity that owns an endpoint is responsible to release it. When a mux or an applet is detached from a conn-stream, the conn-stream relinquishes the endpoint to recreate a new one. This way, the endpoint state is never lost for the mux or the applet. --- dev/flags/flags.c | 2 ++ include/haproxy/applet-t.h | 3 +- include/haproxy/applet.h | 5 +++- include/haproxy/conn_stream-t.h | 8 +++++- include/haproxy/conn_stream.h | 1 + include/haproxy/mux_quic-t.h | 2 ++ include/haproxy/mux_quic.h | 13 ++------- src/applet.c | 51 ++++++++++++++++++++++----------- src/backend.c | 27 +++++++++++++++-- src/check.c | 22 ++++++++++++-- src/cli.c | 8 +++++- src/conn_stream.c | 34 ++++++++++++++++++++-- src/dns.c | 13 ++------- src/flt_spoe.c | 16 ++--------- src/h3.c | 1 - src/hlua.c | 13 ++------- src/http_ana.c | 8 +++++- src/http_client.c | 13 ++------- src/mux_fcgi.c | 7 ++++- src/mux_h1.c | 51 ++++++++++++++++++--------------- src/mux_h2.c | 28 ++++++++++-------- src/mux_pt.c | 44 +++++++++++++++++----------- src/mux_quic.c | 12 ++++++++ src/peers.c | 13 ++------- src/sink.c | 13 ++------- src/stream_interface.c | 2 +- 26 files changed, 248 insertions(+), 162 deletions(-) diff --git a/dev/flags/flags.c b/dev/flags/flags.c index eb98b6856..08cb2e89f 100644 --- a/dev/flags/flags.c +++ b/dev/flags/flags.c @@ -196,6 +196,8 @@ void show_endp_flags(unsigned int f) SHOW_FLAG(f, CS_EP_SHWN); SHOW_FLAG(f, CS_EP_SHRR); SHOW_FLAG(f, CS_EP_SHRD); + SHOW_FLAG(f, CS_EP_ORPHAN); + SHOW_FLAG(f, CS_EP_DETACHED); SHOW_FLAG(f, CS_EP_T_APPLET); SHOW_FLAG(f, CS_EP_T_MUX); diff --git a/include/haproxy/applet-t.h b/include/haproxy/applet-t.h index 8c565dce7..bbc6b02de 100644 --- a/include/haproxy/applet-t.h +++ b/include/haproxy/applet-t.h @@ -58,7 +58,8 @@ struct appctx { struct buffer *chunk; /* used to store unfinished commands */ unsigned int st2; /* output state for stats, unused by peers */ struct applet *applet; /* applet this context refers to */ - void *owner; /* pointer to upper layer's entity (eg: conn_stream) */ + struct conn_stream *owner; + struct cs_endpoint *endp; struct act_rule *rule; /* rule associated with the applet. */ int (*io_handler)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK */ void (*io_release)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK, diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index aa9124b3b..d63b78785 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -36,7 +37,7 @@ extern struct pool_head *pool_head_appctx; struct task *task_run_applet(struct task *t, void *context, unsigned int state); int appctx_buf_available(void *arg); -struct appctx *appctx_new(struct applet *applet); +struct appctx *appctx_new(struct applet *applet, struct cs_endpoint *endp); /* Releases an appctx previously allocated by appctx_new(). */ static inline void __appctx_free(struct appctx *appctx) @@ -45,6 +46,8 @@ static inline void __appctx_free(struct appctx *appctx) if (LIST_INLIST(&appctx->buffer_wait.list)) LIST_DEL_INIT(&appctx->buffer_wait.list); + BUG_ON(appctx->endp && !(appctx->endp->flags & CS_EP_ORPHAN)); + cs_endpoint_free(appctx->endp); pool_free(pool_head_appctx, appctx); _HA_ATOMIC_DEC(&nb_applets); } diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h index e76a8c339..842ce8443 100644 --- a/include/haproxy/conn_stream-t.h +++ b/include/haproxy/conn_stream-t.h @@ -35,7 +35,13 @@ struct stream_interface; CS_EP_T_MUX = 0x00000001, /* The endpoint is a mux (the target may be NULL before the mux init) */ CS_EP_T_APPLET = 0x00000002, /* The endpoint is an applet */ - /* unused: 0x00000004 .. 0x00000080 */ + /* unused: 0x00000004 .. 0x00000008 */ + + /* Endpoint states: none == attached to a mux with a conn-stream */ + CS_EP_DETACHED = 0x00000010, /* The endpoint is detached (no mux/no applet) */ + CS_EP_ORPHAN = 0x00000020, /* The endpoint is orphan (no conn-stream) */ + + /* unused: 0x00000040 .. 0x00000080 */ CS_EP_SHRD = 0x00000100, /* read shut, draining extra data */ CS_EP_SHRR = 0x00000200, /* read shut, resetting extra data */ diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h index e0c3b50bc..ca5a99c93 100644 --- a/include/haproxy/conn_stream.h +++ b/include/haproxy/conn_stream.h @@ -50,6 +50,7 @@ void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx); void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx); int cs_attach_strm(struct conn_stream *cs, struct stream *strm); +int cs_reset_endp(struct conn_stream *cs); void cs_detach_endp(struct conn_stream *cs); void cs_detach_app(struct conn_stream *cs); diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 2a1cd862d..6fa952244 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -11,6 +11,7 @@ #include #include #include +#include /* Stream types */ enum qcs_type { @@ -88,6 +89,7 @@ struct qcc { struct qcs { struct qcc *qcc; struct conn_stream *cs; + struct cs_endpoint *endp; uint32_t flags; /* QC_SF_* */ struct { diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 6c97b7fda..cd0a31825 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -107,20 +107,11 @@ static inline struct qc_stream_desc *qcc_get_stream(struct qcc *qcc, uint64_t id static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *buf) { - struct cs_endpoint *endp; struct conn_stream *cs; - endp = cs_endpoint_new(); - if (!endp) + cs = cs_new_from_mux(qcs->endp, qcs->qcc->conn->owner, buf); + if (!cs) return NULL; - endp->target = qcs; - endp->ctx = qcs->qcc->conn; - endp->flags |= CS_EP_T_MUX; - cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf); - if (!cs) { - cs_endpoint_free(endp); - return NULL; - } qcs->cs = cs; ++qcs->qcc->nb_cs; diff --git a/src/applet.c b/src/applet.c index f6fea7423..663a9df54 100644 --- a/src/applet.c +++ b/src/applet.c @@ -47,28 +47,47 @@ static inline void appctx_init(struct appctx *appctx) * appctx_free(). is assigned as the applet, but it can be NULL. The * applet's task is always created on the current thread. */ -struct appctx *appctx_new(struct applet *applet) +struct appctx *appctx_new(struct applet *applet, struct cs_endpoint *endp) { struct appctx *appctx; appctx = pool_alloc(pool_head_appctx); - if (likely(appctx != NULL)) { - appctx->obj_type = OBJ_TYPE_APPCTX; - appctx->applet = applet; - appctx_init(appctx); - appctx->t = task_new_here(); - if (unlikely(appctx->t == NULL)) { - pool_free(pool_head_appctx, appctx); - return NULL; - } - appctx->t->process = task_run_applet; - appctx->t->context = appctx; - LIST_INIT(&appctx->buffer_wait.list); - appctx->buffer_wait.target = appctx; - appctx->buffer_wait.wakeup_cb = appctx_buf_available; - _HA_ATOMIC_INC(&nb_applets); + if (unlikely(!appctx)) + goto fail_appctx; + + appctx_init(appctx); + appctx->obj_type = OBJ_TYPE_APPCTX; + appctx->applet = applet; + + if (!endp) { + endp = cs_endpoint_new(); + if (!endp) + goto fail_endp; + endp->target = appctx; + endp->ctx = appctx; + endp->flags |= (CS_EP_T_APPLET|CS_EP_ORPHAN); } + appctx->endp = endp; + + appctx->t = task_new_here(); + if (unlikely(!appctx->t)) + goto fail_task; + appctx->t->process = task_run_applet; + appctx->t->context = appctx; + + LIST_INIT(&appctx->buffer_wait.list); + appctx->buffer_wait.target = appctx; + appctx->buffer_wait.wakeup_cb = appctx_buf_available; + + _HA_ATOMIC_INC(&nb_applets); return appctx; + + fail_task: + cs_endpoint_free(appctx->endp); + fail_endp: + pool_free(pool_head_appctx, appctx); + fail_appctx: + return NULL; } /* Callback used to wake up an applet when a buffer is available. The applet diff --git a/src/backend.c b/src/backend.c index f8c13b4f4..f57d05d84 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1496,8 +1496,9 @@ static int connect_server(struct stream *s) if (avail >= 1) { if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) { - cs_detach_endp(s->csb); srv_conn = NULL; + if (cs_reset_endp(s->csb) < 0) + return SF_ERR_INTERNAL; } } else @@ -2290,7 +2291,29 @@ void back_handle_st_cer(struct stream *s) * Note: the stream-interface will be switched to ST_REQ, ST_ASS or * ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset. */ - cs_detach_endp(s->csb); + if (cs_reset_endp(s->csb) < 0) { + if (!si->err_type) + si->err_type = SI_ET_CONN_OTHER; + + if (objt_server(s->target)) + _HA_ATOMIC_INC(&objt_server(s->target)->counters.internal_errors); + _HA_ATOMIC_INC(&s->be->be_counters.internal_errors); + sess_change_server(s, NULL); + if (may_dequeue_tasks(objt_server(s->target), s->be)) + process_srv_queue(objt_server(s->target)); + + /* shutw is enough so stop a connecting socket */ + si_shutw(si); + s->req.flags |= CF_WRITE_ERROR; + s->res.flags |= CF_READ_ERROR; + + si->state = SI_ST_CLO; + if (s->srv_error) + s->srv_error(s, si); + + DBG_TRACE_STATE("error resetting endpoint", STRM_EV_STRM_PROC|STRM_EV_SI_ST|STRM_EV_STRM_ERR, s); + goto end; + } stream_choose_redispatch(s); diff --git a/src/check.c b/src/check.c index 44583c243..47a53db3d 100644 --- a/src/check.c +++ b/src/check.c @@ -1134,6 +1134,15 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state) task_set_affinity(t, tid_bit); check->current_step = NULL; + + if (check->cs->flags & CS_FL_ERROR) { + check->cs->flags &= ~CS_FL_ERROR; + check->cs->endp = cs_endpoint_new(); + if (!check->cs->endp) + check->cs->flags |= CS_FL_ERROR; + else + check->cs->endp->flags |= CS_EP_DETACHED; + } tcpcheck_main(check); expired = 0; } @@ -1155,9 +1164,10 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state) else { if (check->state & CHK_ST_CLOSE_CONN) { TRACE_DEVEL("closing current connection", CHK_EV_TASK_WAKE|CHK_EV_HCHK_RUN, check); - cs_detach_endp(check->cs); - conn = NULL; check->state &= ~CHK_ST_CLOSE_CONN; + if (cs_reset_endp(check->cs) < 0) + check->cs->flags |= CS_FL_ERROR; + conn = NULL; tcpcheck_main(check); } if (check->result == CHK_RES_UNKNOWN) { @@ -1190,7 +1200,13 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state) * the tasklet */ tasklet_remove_from_tasklet_list(check->wait_list.tasklet); - cs_detach_endp(check->cs); + + if (cs_reset_endp(check->cs) < 0) { + /* If an error occurred at this stage, it will be fixed by the + * next check + */ + check->cs->flags |= CS_FL_ERROR; + } if (check->sess != NULL) { vars_prune(&check->vars, check->sess, NULL); diff --git a/src/cli.c b/src/cli.c index 25d28f14d..665631fe8 100644 --- a/src/cli.c +++ b/src/cli.c @@ -2753,8 +2753,14 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit) * connection. */ if (!si_conn_ready(cs_si(s->csb))) { - cs_detach_endp(s->csb); s->srv_conn = NULL; + if (cs_reset_endp(s->csb) < 0) { + if (!cs_si(s->csb)->err_type) + cs_si(s->csb)->err_type = SI_ET_CONN_OTHER; + if (s->srv_error) + s->srv_error(s, cs_si(s->csb)); + return 1; + } } sockaddr_free(&(cs_si(s->csb)->dst)); diff --git a/src/conn_stream.c b/src/conn_stream.c index 24a22b521..d45eba63c 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -86,6 +86,7 @@ struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *se pool_free(pool_head_connstream, cs); cs = NULL; } + endp->flags &= ~CS_EP_ORPHAN; return cs; } @@ -102,6 +103,7 @@ struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session pool_free(pool_head_connstream, cs); cs = NULL; } + endp->flags &= ~CS_EP_ORPHAN; return cs; } @@ -113,6 +115,7 @@ struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags) if (unlikely(!cs)) return NULL; cs->flags |= flags; + cs->endp->flags |= CS_EP_DETACHED; cs->si = si_new(cs); if (unlikely(!cs->si)) { cs_free(cs); @@ -132,6 +135,7 @@ struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags) if (unlikely(!cs)) return NULL; cs->flags |= flags; + cs->endp->flags |= CS_EP_DETACHED; cs->app = &check->obj_type; cs->data_cb = &check_conn_cb; return cs; @@ -144,6 +148,7 @@ void cs_free(struct conn_stream *cs) { si_free(cs->si); if (cs->endp) { + BUG_ON(!(cs->endp->flags & CS_EP_DETACHED)); cs_endpoint_free(cs->endp); } pool_free(pool_head_connstream, cs); @@ -158,6 +163,7 @@ void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx) cs->endp->target = target; cs->endp->ctx = ctx; cs->endp->flags |= CS_EP_T_MUX; + cs->endp->flags &= ~CS_EP_DETACHED; if (!conn->ctx) conn->ctx = cs; if (cs_strm(cs)) { @@ -176,8 +182,9 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx) cs->endp->target = target; cs->endp->ctx = ctx; cs->endp->flags |= CS_EP_T_APPLET; + cs->endp->flags &= ~CS_EP_DETACHED; appctx->owner = cs; - if (cs->si) { + if (cs_strm(cs)) { cs->si->ops = &si_applet_ops; cs->data_cb = NULL; } @@ -191,7 +198,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm) cs->si = si_new(cs); if (unlikely(!cs->si)) return -1; - + cs->endp->flags &= ~CS_EP_ORPHAN; if (cs->endp->flags & CS_EP_T_MUX) { cs->si->ops = &si_conn_ops; cs->data_cb = &si_conn_cb; @@ -220,9 +227,11 @@ void cs_detach_endp(struct conn_stream *cs) if (conn->mux) { /* TODO: handle unsubscribe for healthchecks too */ + cs->endp->flags |= CS_EP_ORPHAN; if (cs->si && cs->si->wait_event.events != 0) conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event); conn->mux->detach(cs); + cs->endp = NULL; } else { /* It's too early to have a mux, let's just destroy @@ -238,13 +247,17 @@ void cs_detach_endp(struct conn_stream *cs) else if (cs->endp->flags & CS_EP_T_APPLET) { struct appctx *appctx = cs_appctx(cs); + cs->endp->flags |= CS_EP_ORPHAN; if (cs->si) si_applet_release(cs->si); appctx_free(appctx); + cs->endp = NULL; } if (cs->endp) { + /* the cs is the only one one the endpoint */ cs_endpoint_init(cs->endp); + cs->endp->flags |= CS_EP_DETACHED; } /* FIXME: Rest CS for now but must be reviewed. CS flags are only @@ -266,6 +279,21 @@ void cs_detach_app(struct conn_stream *cs) cs->si = NULL; cs->data_cb = NULL; - if (!cs->endp || !cs->endp->target) + if (!cs->endp || (cs->endp->flags & CS_EP_DETACHED)) cs_free(cs); } + +int cs_reset_endp(struct conn_stream *cs) +{ + BUG_ON(!cs->app); + cs_detach_endp(cs); + if (!cs->endp) { + cs->endp = cs_endpoint_new(); + if (!cs->endp) { + cs->flags |= CS_FL_ERROR; + return -1; + } + cs->endp->flags |= CS_EP_DETACHED; + } + return 0; +} diff --git a/src/dns.c b/src/dns.c index e6f70935c..7228581e4 100644 --- a/src/dns.c +++ b/src/dns.c @@ -887,13 +887,12 @@ static struct appctx *dns_session_create(struct dns_session *ds) { struct appctx *appctx; struct session *sess; - struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; struct applet *applet = &dns_session_applet; struct sockaddr_storage *addr = NULL; - appctx = appctx_new(applet); + appctx = appctx_new(applet, NULL); if (!appctx) goto out_close; appctx->ctx.sft.ptr = (void *)ds; @@ -907,17 +906,9 @@ static struct appctx *dns_session_create(struct dns_session *ds) if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr))) goto out_free_sess; - endp = cs_endpoint_new(); - if (!endp) - goto out_free_addr; - endp->target = appctx; - endp->ctx = appctx; - endp->flags |= CS_EP_T_APPLET; - - cs = cs_new_from_applet(endp, sess, &BUF_NULL); + cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL); if (!cs) { ha_alert("Failed to initialize stream in dns_session_create().\n"); - cs_endpoint_free(endp); goto out_free_addr; } diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 86fb646f1..1e731b123 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1988,11 +1988,10 @@ spoe_create_appctx(struct spoe_config *conf) { struct appctx *appctx; struct session *sess; - struct cs_endpoint *endp; struct conn_stream *cs; struct stream *strm; - if ((appctx = appctx_new(&spoe_applet)) == NULL) + if ((appctx = appctx_new(&spoe_applet, NULL)) == NULL) goto out_error; appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx); @@ -2025,18 +2024,9 @@ spoe_create_appctx(struct spoe_config *conf) if (!sess) goto out_free_spoe; - endp = cs_endpoint_new(); - if (!endp) + cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL); + if (!cs) goto out_free_sess; - endp->target = appctx; - endp->ctx = appctx; - endp->flags |= CS_EP_T_APPLET; - - cs = cs_new_from_applet(endp, sess, &BUF_NULL); - if (!cs) { - cs_endpoint_free(endp); - goto out_free_sess; - } strm = DISGUISE(cs_strm(cs)); stream_set_backend(strm, conf->agent->b.be); diff --git a/src/h3.c b/src/h3.c index ec219a5c0..0fce7b0fa 100644 --- a/src/h3.c +++ b/src/h3.c @@ -176,7 +176,6 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, cs = qc_attach_cs(qcs, &htx_buf); if (!cs) return 1; - cs->endp->flags |= CS_EP_NOT_FIRST; /* buffer is transferred to conn_stream and set to NULL * except on stream creation error. diff --git a/src/hlua.c b/src/hlua.c index 705c15c19..af26aae03 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2918,7 +2918,6 @@ __LJMP static int hlua_socket_new(lua_State *L) struct hlua_socket *socket; struct appctx *appctx; struct session *sess; - struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; @@ -2946,7 +2945,7 @@ __LJMP static int hlua_socket_new(lua_State *L) lua_setmetatable(L, -2); /* Create the applet context */ - appctx = appctx_new(&update_applet); + appctx = appctx_new(&update_applet, NULL); if (!appctx) { hlua_pusherror(L, "socket: out of memory"); goto out_fail_conf; @@ -2964,17 +2963,9 @@ __LJMP static int hlua_socket_new(lua_State *L) goto out_fail_appctx; } - endp = cs_endpoint_new(); - if (!endp) - goto out_fail_sess; - endp->target = appctx; - endp->ctx = appctx; - endp->flags |= CS_EP_T_APPLET; - - cs = cs_new_from_applet(endp, sess, &BUF_NULL); + cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL); if (!cs) { hlua_pusherror(L, "socket: out of memory"); - cs_endpoint_free(endp); goto out_fail_sess; } diff --git a/src/http_ana.c b/src/http_ana.c index 8c477708f..57bb3f1cd 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -1254,7 +1254,13 @@ static __inline int do_l7_retry(struct stream *s, struct stream_interface *si) res->to_forward = 0; res->analyse_exp = TICK_ETERNITY; res->total = 0; - cs_detach_endp(s->csb); + + if (cs_reset_endp(s->csb) < 0) { + s->csb->flags |= CS_FL_ERROR; + if (!(s->flags & SF_ERR_MASK)) + s->flags |= SF_ERR_INTERNAL; + return -1; + } b_free(&req->buf); /* Swap the L7 buffer with the channel buffer */ diff --git a/src/http_client.c b/src/http_client.c index 55701e5ea..1e87a5033 100644 --- a/src/http_client.c +++ b/src/http_client.c @@ -455,7 +455,6 @@ struct appctx *httpclient_start(struct httpclient *hc) struct applet *applet = &httpclient_applet; struct appctx *appctx; struct session *sess; - struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; struct sockaddr_storage *addr = NULL; @@ -480,7 +479,7 @@ struct appctx *httpclient_start(struct httpclient *hc) /* The HTTP client will be created in the same thread as the caller, * avoiding threading issues */ - appctx = appctx_new(applet); + appctx = appctx_new(applet, NULL); if (!appctx) goto out; @@ -499,17 +498,9 @@ struct appctx *httpclient_start(struct httpclient *hc) if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst))) goto out_free_sess; - endp = cs_endpoint_new(); - if (!endp) - goto out_free_addr; - endp->target = appctx; - endp->ctx = appctx; - endp->flags |= CS_EP_T_APPLET; - - cs = cs_new_from_applet(endp, sess, &hc->req.buf); + cs = cs_new_from_applet(appctx->endp, sess, &hc->req.buf); if (!cs) { ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__); - cs_endpoint_free(endp); goto out_free_addr; } s = DISGUISE(cs_strm(cs)); diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index e18d9abcf..a74554e55 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -155,6 +155,7 @@ enum fcgi_strm_st { /* FCGI stream descriptor */ struct fcgi_strm { struct conn_stream *cs; + struct cs_endpoint *endp; struct session *sess; struct fcgi_conn *fconn; @@ -1042,6 +1043,8 @@ static void fcgi_strm_destroy(struct fcgi_strm *fstrm) */ LIST_DEL_INIT(&fstrm->send_list); tasklet_free(fstrm->shut_tl); + BUG_ON(fstrm->endp && !(fstrm->endp->flags & CS_EP_ORPHAN)); + cs_endpoint_free(fstrm->endp); pool_free(pool_head_fcgi_strm, fstrm); TRACE_LEAVE(FCGI_EV_FSTRM_END, conn); @@ -1077,6 +1080,7 @@ static struct fcgi_strm *fcgi_strm_new(struct fcgi_conn *fconn, int id) LIST_INIT(&fstrm->send_list); fstrm->fconn = fconn; fstrm->cs = NULL; + fstrm->endp = NULL; fstrm->flags = FCGI_SF_NONE; fstrm->proto_status = 0; fstrm->state = FCGI_SS_IDLE; @@ -1132,6 +1136,7 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co } cs_attach_mux(cs, fstrm, fconn->conn); fstrm->cs = cs; + fstrm->endp = cs->endp; fstrm->sess = sess; fconn->nb_cs++; @@ -3117,7 +3122,7 @@ static int fcgi_process(struct fcgi_conn *fconn) while (node) { fstrm = container_of(node, struct fcgi_strm, by_id); - if (fstrm->cs && fstrm->cs->endp->flags & CS_EP_WAIT_FOR_HS) + if (fstrm->cs && fstrm->endp->flags & CS_EP_WAIT_FOR_HS) fcgi_strm_notify_recv(fstrm); node = eb32_next(node); } diff --git a/src/mux_h1.c b/src/mux_h1.c index 9ebd0ffb1..c504c0142 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -119,6 +119,7 @@ struct h1c { struct h1s { struct h1c *h1c; struct conn_stream *cs; + struct cs_endpoint *endp; uint32_t flags; /* Connection flags: H1S_F_* */ struct wait_event *subs; /* Address of the wait_event the conn_stream associated is waiting on */ @@ -716,27 +717,17 @@ static inline size_t h1s_data_pending(const struct h1s *h1s) static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) { struct h1c *h1c = h1s->h1c; - struct cs_endpoint *endp; TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s); - endp = cs_endpoint_new(); - if (!endp) { - TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); - goto err; - } - endp->target = h1s; - endp->ctx = h1c->conn; - endp->flags |= CS_EP_T_MUX; if (h1s->flags & H1S_F_NOT_FIRST) - endp->flags |= CS_EP_NOT_FIRST; + h1s->endp->flags |= CS_EP_NOT_FIRST; if (h1s->req.flags & H1_MF_UPG_WEBSOCKET) - endp->flags |= CS_EP_WEBSOCKET; + h1s->endp->flags |= CS_EP_WEBSOCKET; - h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input); + h1s->cs = cs_new_from_mux(h1s->endp, h1c->conn->owner, input); if (!h1s->cs) { TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); - cs_endpoint_free(endp); goto err; } @@ -785,6 +776,7 @@ static struct h1s *h1s_new(struct h1c *h1c) h1c->h1s = h1s; h1s->sess = NULL; h1s->cs = NULL; + h1s->endp = NULL; h1s->flags = H1S_F_WANT_KAL; h1s->subs = NULL; h1s->rxbuf = BUF_NULL; @@ -811,9 +803,8 @@ static struct h1s *h1s_new(struct h1c *h1c) return NULL; } -static struct h1s *h1c_frt_stream_new(struct h1c *h1c) +static struct h1s *h1c_frt_stream_new(struct h1c *h1c, struct conn_stream *cs, struct session *sess) { - struct session *sess = h1c->conn->owner; struct h1s *h1s; TRACE_ENTER(H1_EV_H1S_NEW, h1c->conn); @@ -822,6 +813,20 @@ static struct h1s *h1c_frt_stream_new(struct h1c *h1c) if (!h1s) goto fail; + if (cs) { + cs_attach_mux(cs, h1s, h1c->conn); + h1s->cs = cs; + h1s->endp = cs->endp; + } + else { + h1s->endp = cs_endpoint_new(); + if (!h1s->endp) + goto fail; + h1s->endp->target = h1s; + h1s->endp->ctx = h1c->conn; + h1s->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN); + } + h1s->sess = sess; if (h1c->px->options2 & PR_O2_REQBUG_OK) @@ -834,6 +839,7 @@ static struct h1s *h1c_frt_stream_new(struct h1c *h1c) fail: TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn); + pool_free(pool_head_h1s, h1s); return NULL; } @@ -850,6 +856,7 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s cs_attach_mux(cs, h1s, h1c->conn); h1s->flags |= H1S_F_RX_BLK; h1s->cs = cs; + h1s->endp = cs->endp; h1s->sess = sess; h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY; @@ -903,6 +910,8 @@ static void h1s_destroy(struct h1s *h1s) } HA_ATOMIC_DEC(&h1c->px_counters->open_streams); + BUG_ON(h1s->endp && !(h1s->endp->flags & CS_EP_ORPHAN)); + cs_endpoint_free(h1s->endp); pool_free(pool_head_h1s, h1s); } } @@ -990,12 +999,8 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session } else if (conn_ctx) { /* Upgraded frontend connection (from TCP) */ - struct conn_stream *cs = conn_ctx; - - if (!h1c_frt_stream_new(h1c)) + if (!h1c_frt_stream_new(h1c, conn_ctx, h1c->conn->owner)) goto fail; - h1c->h1s->cs = cs; - cs_attach_mux(cs, h1c->h1s, conn); /* Attach the CS but Not ready yet */ h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED; @@ -1879,11 +1884,11 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count /* Here h1s->cs is always defined */ if (!(h1m->flags & H1_MF_CHNK) && (h1m->state == H1_MSG_DATA || (h1m->state == H1_MSG_TUNNEL))) { TRACE_STATE("notify the mux can use splicing", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s); - h1s->cs->endp->flags |= CS_EP_MAY_SPLICE; + h1s->endp->flags |= CS_EP_MAY_SPLICE; } else { TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s); - h1s->cs->endp->flags &= ~CS_EP_MAY_SPLICE; + h1s->endp->flags &= ~CS_EP_MAY_SPLICE; } /* Set EOI on conn-stream in DONE state iff: @@ -2948,7 +2953,7 @@ static int h1_process(struct h1c * h1c) /* Create the H1 stream if not already there */ if (!h1s) { - h1s = h1c_frt_stream_new(h1c); + h1s = h1c_frt_stream_new(h1c, NULL, h1c->conn->owner); if (!h1s) { b_reset(&h1c->ibuf); h1c->flags = (h1c->flags & ~(H1C_F_ST_IDLE|H1C_F_WAIT_NEXT_REQ)) | H1C_F_ST_ERROR; diff --git a/src/mux_h2.c b/src/mux_h2.c index 39e5ece76..d4f5320ec 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -214,6 +214,7 @@ enum h2_ss { */ struct h2s { struct conn_stream *cs; + struct cs_endpoint *endp; struct session *sess; struct h2c *h2c; struct eb32_node by_id; /* place in h2c's streams_by_id */ @@ -1520,6 +1521,8 @@ static void h2s_destroy(struct h2s *h2s) /* ditto, calling tasklet_free() here should be ok */ tasklet_free(h2s->shut_tl); + BUG_ON(h2s->endp && !(h2s->endp->flags & CS_EP_ORPHAN)); + cs_endpoint_free(h2s->endp); pool_free(pool_head_h2s, h2s); TRACE_LEAVE(H2_EV_H2S_END, conn); @@ -1552,6 +1555,7 @@ static struct h2s *h2s_new(struct h2c *h2c, int id) LIST_INIT(&h2s->list); h2s->h2c = h2c; h2s->cs = NULL; + h2s->endp = NULL; h2s->sws = 0; h2s->flags = H2_SF_NONE; h2s->errcode = H2_ERR_NO_ERROR; @@ -1590,7 +1594,6 @@ static struct h2s *h2s_new(struct h2c *h2c, int id) static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *input, uint32_t flags) { struct session *sess = h2c->conn->owner; - struct cs_endpoint *endp; struct h2s *h2s; TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn); @@ -1602,17 +1605,18 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in if (!h2s) goto out; - endp = cs_endpoint_new(); - if (!endp) + h2s->endp = cs_endpoint_new(); + if (!h2s->endp) goto out_close; - endp->target = h2s; - endp->ctx = h2c->conn; - endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST); + h2s->endp->target = h2s; + h2s->endp->ctx = h2c->conn; + h2s->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST); + /* FIXME wrong analogy between ext-connect and websocket, this need to * be refine. */ if (flags & H2_SF_EXT_CONNECT_RCVD) - endp->flags |= CS_EP_WEBSOCKET; + h2s->endp->flags |= CS_EP_WEBSOCKET; /* The stream will record the request's accept date (which is either the * end of the connection's or the date immediately after the previous @@ -1621,9 +1625,8 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in */ sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake; - h2s->cs = cs_new_from_mux(endp, sess, input); + h2s->cs = cs_new_from_mux(h2s->endp, sess, input); if (!h2s->cs) { - cs_endpoint_free(endp); goto out_close; } h2c->nb_cs++; @@ -1675,6 +1678,7 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s cs_attach_mux(cs, h2s, h2c->conn); h2s->cs = cs; + h2s->endp = cs->endp; h2s->sess = sess; h2c->nb_cs++; @@ -4080,7 +4084,7 @@ static int h2_process(struct h2c *h2c) while (node) { h2s = container_of(node, struct h2s, by_id); - if (h2s->cs && h2s->cs->endp->flags & CS_EP_WAIT_FOR_HS) + if (h2s->cs && h2s->endp->flags & CS_EP_WAIT_FOR_HS) h2s_notify_recv(h2s); node = eb32_next(node); } @@ -5322,7 +5326,7 @@ static size_t h2s_frt_make_resp_headers(struct h2s *h2s, struct htx *htx) break; } - if (!h2s->cs || h2s->cs->endp->flags & CS_EP_SHW) { + if (!h2s->cs || h2s->endp->flags & CS_EP_SHW) { /* Response already closed: add END_STREAM */ es_now = 1; } @@ -5742,7 +5746,7 @@ static size_t h2s_bck_make_req_headers(struct h2s *h2s, struct htx *htx) break; } - if (!h2s->cs || h2s->cs->endp->flags & CS_EP_SHW) { + if (!h2s->cs || h2s->endp->flags & CS_EP_SHW) { /* Request already closed: add END_STREAM */ es_now = 1; } diff --git a/src/mux_pt.c b/src/mux_pt.c index 5bd71932e..00cf542ed 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -21,6 +21,7 @@ struct mux_pt_ctx { struct conn_stream *cs; + struct cs_endpoint *endp; struct connection *conn; struct wait_event wait_event; }; @@ -207,6 +208,8 @@ static void mux_pt_destroy(struct mux_pt_ctx *ctx) if (conn && ctx->wait_event.events != 0) conn->xprt->unsubscribe(conn, conn->xprt_ctx, ctx->wait_event.events, &ctx->wait_event); + BUG_ON(ctx->endp && !(ctx->endp->flags & CS_EP_ORPHAN)); + cs_endpoint_free(ctx->endp); pool_free(pool_head_pt_ctx, ctx); } @@ -272,7 +275,6 @@ struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status) static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess, struct buffer *input) { - struct cs_endpoint *endp; struct conn_stream *cs = conn->ctx; struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx); @@ -292,21 +294,27 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio ctx->conn = conn; if (!cs) { - endp = cs_endpoint_new(); - if (!endp) - goto fail_free_ctx; - endp->target = ctx; - endp->ctx = conn; - endp->flags |= CS_EP_T_MUX; - - cs = cs_new_from_mux(endp, sess, input); - if (!cs) { + ctx->endp = cs_endpoint_new(); + if (!ctx->endp) { TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn); - cs_endpoint_free(endp); goto fail_free_ctx; } + ctx->endp->target = ctx; + ctx->endp->ctx = conn; + ctx->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN); + + cs = cs_new_from_mux(ctx->endp, sess, input); + if (!cs) { + TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn); + goto fail_free_endp; + } TRACE_POINT(PT_EV_STRM_NEW, conn, cs); } + else { + cs_attach_mux(cs, ctx, conn); + ctx->cs = cs; + ctx->endp = cs->endp; + } conn->ctx = ctx; ctx->cs = cs; cs->flags |= CS_FL_RCV_MORE; @@ -316,6 +324,8 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs); return 0; + fail_free_endp: + cs_endpoint_free(ctx->endp); fail_free_ctx: if (ctx->wait_event.tasklet) tasklet_free(ctx->wait_event.tasklet); @@ -399,8 +409,11 @@ static void mux_pt_destroy_meth(void *ctx) struct mux_pt_ctx *pt = ctx; TRACE_POINT(PT_EV_CONN_END, pt->conn, pt->cs); - if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt) + if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt) { + if (pt->conn->ctx != pt) + pt->endp = NULL; mux_pt_destroy(pt); + } } /* @@ -411,15 +424,14 @@ static void mux_pt_detach(struct conn_stream *cs) struct connection *conn = __cs_conn(cs); struct mux_pt_ctx *ctx; - ALREADY_CHECKED(conn); - ctx = conn->ctx; - TRACE_ENTER(PT_EV_STRM_END, conn, cs); + ctx = conn->ctx; + ctx->cs = NULL; + /* Subscribe, to know if we got disconnected */ if (!conn_is_back(conn) && conn->owner != NULL && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) { - ctx->cs = NULL; conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event); } else { /* There's no session attached to that connection, destroy it */ diff --git a/src/mux_quic.c b/src/mux_quic.c index f61602f7b..c64f8d0f8 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -117,9 +117,19 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->cs = NULL; qcs->flags = QC_SF_NONE; + qcs->endp = cs_endpoint_new(); + if (!qcs->endp) { + pool_free(pool_head_qcs, qcs); + return NULL; + } + qcs->endp->target = qcs; + qcs->endp->ctx = qcc->conn; + qcs->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST); + qcs->id = id; /* store transport layer stream descriptor in qcc tree */ eb64_insert(&qcc->streams_by_id, &stream->by_id); + qcc->strms[type].nb_streams++; /* If stream is local, use peer remote-limit, or else the opposite. */ @@ -160,6 +170,8 @@ void qcs_free(struct qcs *qcs) /* stream desc must be removed from MUX tree before release it */ eb64_delete(&qcs->stream->by_id); qc_stream_desc_release(qcs->stream, qcs->qcc->conn->handle.qc); + BUG_ON(qcs->endp && !(qcs->endp->flags & CS_EP_ORPHAN)); + cs_endpoint_free(qcs->endp); pool_free(pool_head_qcs, qcs); } diff --git a/src/peers.c b/src/peers.c index 970196318..c31aa818f 100644 --- a/src/peers.c +++ b/src/peers.c @@ -3181,7 +3181,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer struct proxy *p = peers->peers_fe; /* attached frontend */ struct appctx *appctx; struct session *sess; - struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; struct sockaddr_storage *addr = NULL; @@ -3193,7 +3192,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer peer->last_hdshk = now_ms; s = NULL; - appctx = appctx_new(&peer_applet); + appctx = appctx_new(&peer_applet, NULL); if (!appctx) goto out_close; @@ -3209,17 +3208,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr))) goto out_free_sess; - endp = cs_endpoint_new(); - if (!endp) - goto out_free_addr; - endp->target = appctx; - endp->ctx = appctx; - endp->flags |= CS_EP_T_APPLET; - - cs = cs_new_from_applet(endp, sess, &BUF_NULL); + cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL); if (!cs) { ha_alert("Failed to initialize stream in peer_session_create().\n"); - cs_endpoint_free(endp); goto out_free_addr; } diff --git a/src/sink.c b/src/sink.c index 0f016890d..c5cd1d7ef 100644 --- a/src/sink.c +++ b/src/sink.c @@ -636,7 +636,6 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink struct proxy *p = sink->forward_px; struct appctx *appctx; struct session *sess; - struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; struct applet *applet = &sink_forward_applet; @@ -645,7 +644,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) applet = &sink_forward_oc_applet; - appctx = appctx_new(applet); + appctx = appctx_new(applet, NULL); if (!appctx) goto out_close; @@ -660,17 +659,9 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr))) goto out_free_sess; - endp = cs_endpoint_new(); - if (!endp) - goto out_free_addr; - endp->target = appctx; - endp->ctx = appctx; - endp->flags |= CS_EP_T_APPLET; - - cs = cs_new_from_applet(endp, sess, &BUF_NULL); + cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL); if (!cs) { ha_alert("Failed to initialize stream in sink_forward_session_create().\n"); - cs_endpoint_free(endp); goto out_free_addr; } s = DISGUISE(cs_strm(cs)); diff --git a/src/stream_interface.c b/src/stream_interface.c index 360964756..134dfcc3d 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -339,7 +339,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si)); - appctx = appctx_new(app); + appctx = appctx_new(app, si->cs->endp); if (!appctx) return NULL; cs_attach_applet(si->cs, appctx, appctx);