diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h index eee431890..dff167c27 100644 --- a/include/haproxy/conn_stream-t.h +++ b/include/haproxy/conn_stream-t.h @@ -95,11 +95,11 @@ struct conn_stream { enum obj_type obj_type; /* differentiates connection from applet context */ /* 3 bytes hole here */ unsigned int flags; /* CS_FL_* */ - enum obj_type *end; /* points to the end point (connection or appctx) */ + void *end; /* points to the end point (MUX stream or appctx) */ enum obj_type *app; /* points to the applicative point (stream or check) */ struct stream_interface *si; const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ - void *ctx; /* mux-specific context */ + void *ctx; /* endpoint-specific context */ }; diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h index 6304c1bf0..c03362d64 100644 --- a/include/haproxy/conn_stream.h +++ b/include/haproxy/conn_stream.h @@ -36,7 +36,8 @@ struct check; struct conn_stream *cs_new(); void cs_free(struct conn_stream *cs); -void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx); +void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx); +void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx); int cs_attach_app(struct conn_stream *cs, enum obj_type *app); void cs_detach_endp(struct conn_stream *cs); void cs_detach_app(struct conn_stream *cs); @@ -55,13 +56,13 @@ static inline void cs_init(struct conn_stream *cs) cs->data_cb = NULL; } -/* Returns the connection from a cs if the endpoint is a connection. Otherwise +/* Returns the connection from a cs if the endpoint is a mux stream. Otherwise * NULL is returned. __cs_conn() returns the connection without any control * while cs_conn() check the endpoint type. */ static inline struct connection *__cs_conn(const struct conn_stream *cs) { - return __objt_conn(cs->end); + return cs->ctx; } static inline struct connection *cs_conn(const struct conn_stream *cs) { @@ -70,8 +71,8 @@ static inline struct connection *cs_conn(const struct conn_stream *cs) return NULL; } -/* Returns the mux of the connection from a cs if the endpoint is a - * connection. Otherwise NULL is returned. +/* Returns the mux ops of the connection from a cs if the endpoint is a + * mux stream. Otherwise NULL is returned. */ static inline const struct mux_ops *cs_conn_mux(const struct conn_stream *cs) { @@ -86,7 +87,7 @@ static inline const struct mux_ops *cs_conn_mux(const struct conn_stream *cs) */ static inline struct appctx *__cs_appctx(const struct conn_stream *cs) { - return __objt_appctx(cs->end); + return cs->end; } static inline struct appctx *cs_appctx(const struct conn_stream *cs) { diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 44dfe1efd..ec3270d7b 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -110,10 +110,8 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b struct conn_stream *cs = cs_new(); if (!cs) return NULL; - cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs); - + cs_attach_endp_mux(cs, qcs, qcs->qcc->conn); qcs->cs = cs; - cs->ctx = qcs; stream_new(qcs->qcc->conn->owner, cs, buf); ++qcs->qcc->nb_cs; diff --git a/src/backend.c b/src/backend.c index 83c4d52f5..3e1665395 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1495,7 +1495,6 @@ static int connect_server(struct stream *s) } if (avail >= 1) { - cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn); if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) { cs_detach_endp(s->csb); srv_conn = NULL; @@ -1571,7 +1570,7 @@ skip_reuse: return SF_ERR_INTERNAL; /* how did we get there ? */ } - cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn); + cs_attach_endp_mux(s->csb, NULL, srv_conn); #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation) if (!srv || (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) || diff --git a/src/conn_stream.c b/src/conn_stream.c index d2fd72908..01541a598 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -43,33 +43,37 @@ void cs_free(struct conn_stream *cs) } -/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */ -void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx) +/* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */ +void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx) { - struct connection *conn; - struct appctx *appctx; + struct connection *conn = ctx; cs->end = endp; cs->ctx = ctx; - if ((conn = objt_conn(endp)) != NULL) { - if (!conn->ctx) - conn->ctx = cs; - if (cs_strm(cs)) { - cs->si->ops = &si_conn_ops; - cs->data_cb = &si_conn_cb; - } - else if (cs_check(cs)) - cs->data_cb = &check_conn_cb; - cs->flags |= CS_FL_ENDP_MUX; + if (!conn->ctx) + conn->ctx = cs; + if (cs_strm(cs)) { + cs->si->ops = &si_conn_ops; + cs->data_cb = &si_conn_cb; } - else if ((appctx = objt_appctx(endp)) != NULL) { - appctx->owner = cs; - if (cs->si) { - cs->si->ops = &si_applet_ops; - cs->data_cb = NULL; - } - cs->flags |= CS_FL_ENDP_APP; + else if (cs_check(cs)) + cs->data_cb = &check_conn_cb; + cs->flags |= CS_FL_ENDP_MUX; +} + +/* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */ +void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx) +{ + struct appctx *appctx = endp; + + cs->end = endp; + cs->ctx = ctx; + appctx->owner = cs; + if (cs->si) { + cs->si->ops = &si_applet_ops; + cs->data_cb = NULL; } + cs->flags |= CS_FL_ENDP_APP; } /* Attaches a conn_stream to a app layer and sets the relevant callbacks */ diff --git a/src/dns.c b/src/dns.c index cd7780ce6..b13206deb 100644 --- a/src/dns.c +++ b/src/dns.c @@ -917,7 +917,7 @@ static struct appctx *dns_session_create(struct dns_session *ds) if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr))) goto out_free_strm; - cs_attach_endp(cs, &appctx->obj_type, appctx); + cs_attach_endp_app(cs, appctx, appctx); s->flags = SF_ASSIGNED|SF_ADDR_SET; cs_si(s->csb)->flags |= SI_FL_NOLINGER; diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 36b1f9ac7..a3713cf63 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -2031,7 +2031,7 @@ spoe_create_appctx(struct spoe_config *conf) if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL) goto out_free_sess; - cs_attach_endp(cs, &appctx->obj_type, appctx); + cs_attach_endp_app(cs, appctx, appctx); stream_set_backend(strm, conf->agent->b.be); /* applet is waiting for data */ diff --git a/src/h3.c b/src/h3.c index 2d450d8b8..4906f295d 100644 --- a/src/h3.c +++ b/src/h3.c @@ -636,7 +636,7 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count) size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { size_t total = 0; - struct qcs *qcs = cs->ctx; + struct qcs *qcs = cs->end; struct htx *htx; enum htx_blk_type btype; struct htx_blk *blk; diff --git a/src/hlua.c b/src/hlua.c index 270e3f6b2..179b8f232 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2975,7 +2975,7 @@ __LJMP static int hlua_socket_new(lua_State *L) goto out_fail_sess; } - cs_attach_endp(cs, &appctx->obj_type, appctx); + cs_attach_endp_app(cs, appctx, appctx); /* Initialise cross reference between stream and Lua socket object. */ xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref); diff --git a/src/hq_interop.c b/src/hq_interop.c index 71419d1ea..68022c6f9 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -75,6 +75,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx) if (!cs) return -1; + b_del(rxbuf, b_data(rxbuf)); b_free(&htx_buf); @@ -95,7 +96,7 @@ static struct buffer *mux_get_buf(struct qcs *qcs) static size_t hq_interop_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct qcs *qcs = cs->ctx; + struct qcs *qcs = cs->end; struct htx *htx; enum htx_blk_type btype; struct htx_blk *blk; diff --git a/src/http_client.c b/src/http_client.c index cf1165bb5..22a13b1cb 100644 --- a/src/http_client.c +++ b/src/http_client.c @@ -529,7 +529,7 @@ struct appctx *httpclient_start(struct httpclient *hc) break; } - cs_attach_endp(cs, &appctx->obj_type, appctx); + cs_attach_endp_app(cs, appctx, appctx); s->flags |= SF_ASSIGNED|SF_ADDR_SET; cs_si(s->csb)->flags |= SI_FL_NOLINGER; s->res.flags |= CF_READ_DONTWAIT; diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index e3192dacc..3228f8b91 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -1130,10 +1130,10 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn); goto out; } - + cs_attach_endp_mux(cs, fstrm, fconn->conn); fstrm->cs = cs; fstrm->sess = sess; - cs->ctx = fstrm; + cs->end = fstrm; fconn->nb_cs++; TRACE_LEAVE(FCGI_EV_FSTRM_NEW, fconn->conn, fstrm); @@ -3579,12 +3579,13 @@ static void fcgi_destroy(void *ctx) */ static void fcgi_detach(struct conn_stream *cs) { - struct fcgi_strm *fstrm = cs->ctx; + struct fcgi_strm *fstrm = cs->end; struct fcgi_conn *fconn; struct session *sess; TRACE_ENTER(FCGI_EV_STRM_END, (fstrm ? fstrm->fconn->conn : NULL), fstrm); + cs->end = NULL; cs->ctx = NULL; if (!fstrm) { TRACE_LEAVE(FCGI_EV_STRM_END); @@ -3853,7 +3854,7 @@ struct task *fcgi_deferred_shut(struct task *t, void *ctx, unsigned int state) /* shutr() called by the conn_stream (mux_ops.shutr) */ static void fcgi_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { - struct fcgi_strm *fstrm = cs->ctx; + struct fcgi_strm *fstrm = cs->end; TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm); if (cs->flags & CS_FL_KILL_CONN) @@ -3868,7 +3869,7 @@ static void fcgi_shutr(struct conn_stream *cs, enum cs_shr_mode mode) /* shutw() called by the conn_stream (mux_ops.shutw) */ static void fcgi_shutw(struct conn_stream *cs, enum cs_shw_mode mode) { - struct fcgi_strm *fstrm = cs->ctx; + struct fcgi_strm *fstrm = cs->end; TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm); if (cs->flags & CS_FL_KILL_CONN) @@ -3884,7 +3885,7 @@ static void fcgi_shutw(struct conn_stream *cs, enum cs_shw_mode mode) */ static int fcgi_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - struct fcgi_strm *fstrm = cs->ctx; + struct fcgi_strm *fstrm = cs->end; struct fcgi_conn *fconn = fstrm->fconn; BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV)); @@ -3910,7 +3911,7 @@ static int fcgi_subscribe(struct conn_stream *cs, int event_type, struct wait_ev */ static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - struct fcgi_strm *fstrm = cs->ctx; + struct fcgi_strm *fstrm = cs->end; struct fcgi_conn *fconn = fstrm->fconn; BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV)); @@ -3946,7 +3947,7 @@ static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ */ static size_t fcgi_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct fcgi_strm *fstrm = cs->ctx; + struct fcgi_strm *fstrm = cs->end; struct fcgi_conn *fconn = fstrm->fconn; size_t ret = 0; @@ -3990,7 +3991,7 @@ static size_t fcgi_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t co */ static size_t fcgi_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct fcgi_strm *fstrm = cs->ctx; + struct fcgi_strm *fstrm = cs->end; struct fcgi_conn *fconn = fstrm->fconn; size_t total = 0; size_t ret; diff --git a/src/mux_h1.c b/src/mux_h1.c index 29dee6ff9..4baaf43ce 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -724,7 +724,7 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); goto err; } - cs_attach_endp(cs, &h1c->conn->obj_type, h1s); + cs_attach_endp_mux(cs, h1s, h1c->conn); h1s->cs = cs; if (h1s->flags & H1S_F_NOT_FIRST) @@ -848,10 +848,10 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s if (!h1s) goto fail; + cs_attach_endp_mux(cs, h1s, h1c->conn); h1s->flags |= H1S_F_RX_BLK; h1s->cs = cs; h1s->sess = sess; - cs->ctx = h1s; h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY; @@ -995,9 +995,8 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session if (!h1c_frt_stream_new(h1c)) goto fail; - h1c->h1s->cs = cs; - cs->ctx = h1c->h1s; + cs_attach_endp_mux(cs, h1c->h1s, conn); /* Attach the CS but Not ready yet */ h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED; @@ -3338,13 +3337,14 @@ static void h1_destroy(void *ctx) */ static void h1_detach(struct conn_stream *cs) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c; struct session *sess; int is_not_first; TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s); + cs->end = NULL; cs->ctx = NULL; if (!h1s) { TRACE_LEAVE(H1_EV_STRM_END); @@ -3447,7 +3447,7 @@ static void h1_detach(struct conn_stream *cs) static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c; if (!h1s) @@ -3490,7 +3490,7 @@ static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode) static void h1_shutw(struct conn_stream *cs, enum cs_shw_mode mode) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c; if (!h1s) @@ -3550,7 +3550,7 @@ static void h1_shutw_conn(struct connection *conn) */ static int h1_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; if (!h1s) return 0; @@ -3579,7 +3579,7 @@ static int h1_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev */ static int h1_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c; if (!h1s) @@ -3627,7 +3627,7 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, struct wait_even */ static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c = h1s->h1c; struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res); size_t ret = 0; @@ -3663,7 +3663,7 @@ static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun /* Called from the upper layer, to send data */ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c; size_t total = 0; @@ -3728,7 +3728,7 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun /* Send and get, using splicing */ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c = h1s->h1c; struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res); int ret = 0; @@ -3798,7 +3798,7 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe) { - struct h1s *h1s = cs->ctx; + struct h1s *h1s = cs->end; struct h1c *h1c = h1s->h1c; struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req); int ret = 0; diff --git a/src/mux_h2.c b/src/mux_h2.c index 9949bbc5c..d06b093e2 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -1606,7 +1606,7 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in if (!cs) goto out_close; cs->flags |= CS_FL_NOT_FIRST; - cs_attach_endp(cs, &h2c->conn->obj_type, h2s); + cs_attach_endp_mux(cs, h2s, h2c->conn); h2s->cs = cs; h2c->nb_cs++; @@ -1676,9 +1676,9 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s if (!h2s) goto out; + cs_attach_endp_mux(cs, h2s, h2c->conn); h2s->cs = cs; h2s->sess = sess; - cs->ctx = h2s; h2c->nb_cs++; out: @@ -4349,12 +4349,13 @@ static void h2_destroy(void *ctx) */ static void h2_detach(struct conn_stream *cs) { - struct h2s *h2s = cs->ctx; + struct h2s *h2s = cs->end; struct h2c *h2c; struct session *sess; TRACE_ENTER(H2_EV_STRM_END, h2s ? h2s->h2c->conn : NULL, h2s); + cs->end = NULL; cs->ctx = NULL; if (!h2s) { TRACE_LEAVE(H2_EV_STRM_END); @@ -4669,7 +4670,7 @@ struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned int state) /* shutr() called by the conn_stream (mux_ops.shutr) */ static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { - struct h2s *h2s = cs->ctx; + struct h2s *h2s = cs->end; TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s); if (cs->flags & CS_FL_KILL_CONN) @@ -4684,7 +4685,7 @@ static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode) /* shutw() called by the conn_stream (mux_ops.shutw) */ static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode) { - struct h2s *h2s = cs->ctx; + struct h2s *h2s = cs->end; TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s); if (cs->flags & CS_FL_KILL_CONN) @@ -6361,7 +6362,7 @@ static size_t h2s_make_trailers(struct h2s *h2s, struct htx *htx) */ static int h2_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - struct h2s *h2s = cs->ctx; + struct h2s *h2s = cs->end; struct h2c *h2c = h2s->h2c; TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s); @@ -6395,7 +6396,7 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, struct wait_even */ static int h2_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - struct h2s *h2s = cs->ctx; + struct h2s *h2s = cs->end; TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s); @@ -6435,7 +6436,7 @@ static int h2_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev */ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct h2s *h2s = cs->ctx; + struct h2s *h2s = cs->end; struct h2c *h2c = h2s->h2c; struct htx *h2s_htx = NULL; struct htx *buf_htx = NULL; @@ -6518,7 +6519,7 @@ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun */ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct h2s *h2s = cs->ctx; + struct h2s *h2s = cs->end; size_t total = 0; size_t ret; struct htx *htx; diff --git a/src/mux_pt.c b/src/mux_pt.c index 2fcd139fa..f22c3a1fd 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -296,7 +296,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn); goto fail_free_ctx; } - cs_attach_endp(cs, &conn->obj_type, NULL); + cs_attach_endp_mux(cs, ctx, conn); if (!stream_new(conn->owner, cs, &BUF_NULL)) { TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs); @@ -372,6 +372,7 @@ static int mux_pt_attach(struct connection *conn, struct conn_stream *cs, struct TRACE_ENTER(PT_EV_STRM_NEW, conn); if (ctx->wait_event.events) conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event); + cs_attach_endp_mux(cs, ctx, conn); ctx->cs = cs; cs->flags |= CS_FL_RCV_MORE; @@ -414,6 +415,9 @@ static void mux_pt_detach(struct conn_stream *cs) TRACE_ENTER(PT_EV_STRM_END, conn, cs); + cs->end = NULL; + cs->ctx = NULL; + /* Subscribe, to know if we got disconnected */ if (!conn_is_back(conn) && conn->owner != NULL && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) { diff --git a/src/mux_quic.c b/src/mux_quic.c index 275b92a9d..08848caae 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1056,7 +1056,7 @@ static int qc_init(struct connection *conn, struct proxy *prx, static void qc_detach(struct conn_stream *cs) { - struct qcs *qcs = cs->ctx; + struct qcs *qcs = cs->end; struct qcc *qcc = qcs->qcc; TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs); @@ -1092,7 +1092,7 @@ static void qc_detach(struct conn_stream *cs) static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct qcs *qcs = cs->ctx; + struct qcs *qcs = cs->end; struct htx *qcs_htx = NULL; struct htx *cs_htx = NULL; size_t ret = 0; @@ -1160,7 +1160,7 @@ static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf, static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { - struct qcs *qcs = cs->ctx; + struct qcs *qcs = cs->end; size_t ret; TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); @@ -1180,7 +1180,7 @@ static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf, static int qc_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - return qcs_subscribe(cs->ctx, event_type, es); + return qcs_subscribe(cs->end, event_type, es); } /* Called from the upper layer, to unsubscribe from events . @@ -1189,7 +1189,7 @@ static int qc_subscribe(struct conn_stream *cs, int event_type, */ static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - struct qcs *qcs = cs->ctx; + struct qcs *qcs = cs->end; BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV)); BUG_ON(qcs->subs && qcs->subs != es); diff --git a/src/peers.c b/src/peers.c index e2bcee7c9..d82e562af 100644 --- a/src/peers.c +++ b/src/peers.c @@ -3224,7 +3224,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr))) goto out_free_strm; - cs_attach_endp(cs, &appctx->obj_type, appctx); + cs_attach_endp_app(cs, appctx, appctx); s->flags = SF_ASSIGNED|SF_ADDR_SET; cs_si(s->csb)->flags |= SI_FL_NOLINGER; diff --git a/src/sink.c b/src/sink.c index 16d353359..87f076bd4 100644 --- a/src/sink.c +++ b/src/sink.c @@ -671,7 +671,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr))) goto out_free_strm; - cs_attach_endp(cs, &appctx->obj_type, appctx); + cs_attach_endp_app(cs, appctx, appctx); s->flags = SF_ASSIGNED|SF_ADDR_SET; cs_si(s->csb)->flags |= SI_FL_NOLINGER; diff --git a/src/stream.c b/src/stream.c index 3aeba1844..778b4ae75 100644 --- a/src/stream.c +++ b/src/stream.c @@ -988,7 +988,7 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px, if (unlikely(!appctx)) return ACT_RET_ERR; - /* Initialise the context. */ + /* Finish initialisation of the context. */ memset(&appctx->ctx, 0, sizeof(appctx->ctx)); appctx->rule = rule; if (appctx->applet->init && !appctx->applet->init(appctx)) @@ -3275,7 +3275,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st strm->csb->si->err_type, strm->csb->si->wait_event.events); cs = strm->csf; - chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); + chunk_appendf(&trash, " cs=%p csf=0x%08x endp=%p\n", cs, cs->flags, cs->end); if ((conn = cs_conn(cs)) != NULL) { chunk_appendf(&trash, @@ -3311,7 +3311,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st } cs = strm->csb; - chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); + chunk_appendf(&trash, " cs=%p csf=0x%08x end=%p\n", cs, cs->flags, cs->end); if ((conn = cs_conn(cs)) != NULL) { chunk_appendf(&trash, " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", diff --git a/src/stream_interface.c b/src/stream_interface.c index f8f7794ce..9396c90fc 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -342,7 +342,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a appctx = appctx_new(app, si->cs); if (!appctx) return NULL; - cs_attach_endp(si->cs, &appctx->obj_type, appctx); + cs_attach_endp_app(si->cs, appctx, appctx); appctx->t->nice = si_strm(si)->task->nice; si_cant_get(si); appctx_wakeup(appctx); diff --git a/src/tcpcheck.c b/src/tcpcheck.c index 6e995a0ea..ee439daeb 100644 --- a/src/tcpcheck.c +++ b/src/tcpcheck.c @@ -1101,7 +1101,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check); goto out; } - cs_attach_endp(check->cs, &conn->obj_type, conn); + cs_attach_endp_mux(check->cs, NULL, conn); tasklet_set_tid(check->wait_list.tasklet, tid); conn_set_owner(conn, check->sess, NULL);