MEDIUM: conn-stream: Pre-allocate endpoint to create CS from muxes and applets

It is a transient commit to prepare next changes. Now, when a conn-stream is
created from an applet or a multiplexer, an endpoint is always provided. In
addition, the API to create a conn-stream was specialized to have one
function per type.

The next step will be to share the endpoint structure.
This commit is contained in:
Christopher Faulet 2022-03-23 11:01:09 +01:00
parent b669d684c0
commit a9e8b3979d
19 changed files with 269 additions and 212 deletions

View File

@ -59,7 +59,7 @@ static inline void appctx_init(struct appctx *appctx)
* appctx_free(). <applet> is assigned as the applet, but it can be NULL. The * appctx_free(). <applet> is assigned as the applet, but it can be NULL. The
* applet's task is always created on the current thread. * applet's task is always created on the current thread.
*/ */
static inline struct appctx *appctx_new(struct applet *applet, void *owner) static inline struct appctx *appctx_new(struct applet *applet)
{ {
struct appctx *appctx; struct appctx *appctx;
@ -67,7 +67,6 @@ static inline struct appctx *appctx_new(struct applet *applet, void *owner)
if (likely(appctx != NULL)) { if (likely(appctx != NULL)) {
appctx->obj_type = OBJ_TYPE_APPCTX; appctx->obj_type = OBJ_TYPE_APPCTX;
appctx->applet = applet; appctx->applet = applet;
appctx->owner = owner;
appctx_init(appctx); appctx_init(appctx);
appctx->t = task_new_here(); appctx->t = task_new_here();
if (unlikely(appctx->t == NULL)) { if (unlikely(appctx->t == NULL)) {

View File

@ -23,11 +23,13 @@
#define _HAPROXY_CONN_STREAM_H #define _HAPROXY_CONN_STREAM_H
#include <haproxy/api.h> #include <haproxy/api.h>
#include <haproxy/applet.h>
#include <haproxy/connection.h> #include <haproxy/connection.h>
#include <haproxy/conn_stream-t.h> #include <haproxy/conn_stream-t.h>
#include <haproxy/obj_type.h> #include <haproxy/obj_type.h>
struct buffer;
struct session;
struct appctx;
struct stream; struct stream;
struct stream_interface; struct stream_interface;
struct check; struct check;
@ -38,10 +40,16 @@ struct cs_endpoint *cs_endpoint_new();
void cs_endpoint_free(struct cs_endpoint *endp); void cs_endpoint_free(struct cs_endpoint *endp);
struct conn_stream *cs_new(struct cs_endpoint *endp); struct conn_stream *cs_new(struct cs_endpoint *endp);
struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags);
struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags);
void cs_free(struct conn_stream *cs); void cs_free(struct conn_stream *cs);
void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx);
void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx); void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx);
int cs_attach_app(struct conn_stream *cs, enum obj_type *app); void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx);
int cs_attach_strm(struct conn_stream *cs, struct stream *strm);
void cs_detach_endp(struct conn_stream *cs); void cs_detach_endp(struct conn_stream *cs);
void cs_detach_app(struct conn_stream *cs); void cs_detach_app(struct conn_stream *cs);

View File

@ -115,15 +115,13 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b
return NULL; return NULL;
endp->target = qcs; endp->target = qcs;
endp->ctx = qcs->qcc->conn; endp->ctx = qcs->qcc->conn;
cs = cs_new(endp); endp->flags |= CS_EP_T_MUX;
cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf);
if (!cs) { if (!cs) {
cs_endpoint_free(endp); cs_endpoint_free(endp);
return NULL; return NULL;
} }
cs_attach_endp_mux(cs, qcs, qcs->qcc->conn);
qcs->cs = cs; qcs->cs = cs;
stream_new(qcs->qcc->conn->owner, cs, buf);
++qcs->qcc->nb_cs; ++qcs->qcc->nb_cs;
return cs; return cs;

View File

@ -1570,7 +1570,9 @@ skip_reuse:
return SF_ERR_INTERNAL; /* how did we get there ? */ return SF_ERR_INTERNAL; /* how did we get there ? */
} }
cs_attach_endp_mux(s->csb, NULL, srv_conn); cs_attach_mux(s->csb, NULL, srv_conn);
srv_conn->ctx = s->csb;
#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation) #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
if (!srv || if (!srv ||
(srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) || (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||

View File

@ -1391,11 +1391,9 @@ int start_check_task(struct check *check, int mininter,
if (check->type == PR_O2_EXT_CHK) if (check->type == PR_O2_EXT_CHK)
t = task_new_on(0); t = task_new_on(0);
else { else {
check->cs = cs_new(NULL); check->cs = cs_new_from_check(check, CS_FL_NONE);
if (!check->cs) if (!check->cs)
goto fail_alloc_cs; goto fail_alloc_cs;
if (cs_attach_app(check->cs, &check->obj_type) < 0)
goto fail_attach_cs;
t = task_new_anywhere(); t = task_new_anywhere();
} }
@ -1420,7 +1418,6 @@ int start_check_task(struct check *check, int mininter,
return 1; return 1;
fail_alloc_task: fail_alloc_task:
fail_attach_cs:
cs_free(check->cs); cs_free(check->cs);
fail_alloc_cs: fail_alloc_cs:
ha_alert("Starting [%s:%s] check: out of memory.\n", ha_alert("Starting [%s:%s] check: out of memory.\n",

View File

@ -75,6 +75,68 @@ struct conn_stream *cs_new(struct cs_endpoint *endp)
return NULL; return NULL;
} }
struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
{
struct conn_stream *cs;
cs = cs_new(endp);
if (unlikely(!cs))
return NULL;
if (unlikely(!stream_new(sess, cs, input))) {
pool_free(pool_head_connstream, cs);
cs = NULL;
}
return cs;
}
struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
{
struct conn_stream *cs;
struct appctx *appctx = endp->ctx;
cs = cs_new(endp);
if (unlikely(!cs))
return NULL;
appctx->owner = cs;
if (unlikely(!stream_new(sess, cs, input))) {
pool_free(pool_head_connstream, cs);
cs = NULL;
}
return cs;
}
struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
{
struct conn_stream *cs;
cs = cs_new(NULL);
if (unlikely(!cs))
return NULL;
cs->flags |= flags;
cs->si = si_new(cs);
if (unlikely(!cs->si)) {
cs_free(cs);
return NULL;
}
cs->app = &strm->obj_type;
cs->si->ops = &si_embedded_ops;
cs->data_cb = NULL;
return cs;
}
struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags)
{
struct conn_stream *cs;
cs = cs_new(NULL);
if (unlikely(!cs))
return NULL;
cs->flags |= flags;
cs->app = &check->obj_type;
cs->data_cb = &check_conn_cb;
return cs;
}
/* Releases a conn_stream previously allocated by cs_new(), as well as any /* Releases a conn_stream previously allocated by cs_new(), as well as any
* buffer it would still hold. * buffer it would still hold.
*/ */
@ -89,11 +151,11 @@ void cs_free(struct conn_stream *cs)
/* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */ /* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */
void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx) void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
{ {
struct connection *conn = ctx; struct connection *conn = ctx;
cs->endp->target = endp; cs->endp->target = target;
cs->endp->ctx = ctx; cs->endp->ctx = ctx;
cs->endp->flags |= CS_EP_T_MUX; cs->endp->flags |= CS_EP_T_MUX;
if (!conn->ctx) if (!conn->ctx)
@ -107,11 +169,11 @@ void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx)
} }
/* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */ /* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx) void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
{ {
struct appctx *appctx = endp; struct appctx *appctx = target;
cs->endp->target = endp; cs->endp->target = target;
cs->endp->ctx = ctx; cs->endp->ctx = ctx;
cs->endp->flags |= CS_EP_T_APPLET; cs->endp->flags |= CS_EP_T_APPLET;
appctx->owner = cs; appctx->owner = cs;
@ -122,12 +184,10 @@ void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx)
} }
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */ /* Attaches a conn_stream to a app layer and sets the relevant callbacks */
int cs_attach_app(struct conn_stream *cs, enum obj_type *app) int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
{ {
cs->app = app; cs->app = &strm->obj_type;
if (objt_stream(app)) {
if (!cs->si)
cs->si = si_new(cs); cs->si = si_new(cs);
if (unlikely(!cs->si)) if (unlikely(!cs->si))
return -1; return -1;
@ -144,9 +204,6 @@ int cs_attach_app(struct conn_stream *cs, enum obj_type *app)
cs->si->ops = &si_embedded_ops; cs->si->ops = &si_embedded_ops;
cs->data_cb = NULL; cs->data_cb = NULL;
} }
}
else if (objt_check(app))
cs->data_cb = &check_conn_cb;
return 0; return 0;
} }

View File

@ -887,19 +887,15 @@ static struct appctx *dns_session_create(struct dns_session *ds)
{ {
struct appctx *appctx; struct appctx *appctx;
struct session *sess; struct session *sess;
struct cs_endpoint *endp;
struct conn_stream *cs; struct conn_stream *cs;
struct stream *s; struct stream *s;
struct applet *applet = &dns_session_applet; struct applet *applet = &dns_session_applet;
struct sockaddr_storage *addr = NULL;
cs = cs_new(NULL); appctx = appctx_new(applet);
if (!cs) {
ha_alert("out of memory in dns_session_create().\n");
goto out_close;
}
appctx = appctx_new(applet, cs);
if (!appctx) if (!appctx)
goto out_free_cs; goto out_close;
appctx->ctx.sft.ptr = (void *)ds; appctx->ctx.sft.ptr = (void *)ds;
sess = session_new(ds->dss->srv->proxy, NULL, &appctx->obj_type); sess = session_new(ds->dss->srv->proxy, NULL, &appctx->obj_type);
@ -908,18 +904,28 @@ static struct appctx *dns_session_create(struct dns_session *ds)
goto out_free_appctx; goto out_free_appctx;
} }
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
ha_alert("Failed to initialize stream in dns_session_create().\n");
goto out_free_sess; goto out_free_sess;
endp = cs_endpoint_new();
if (!endp)
goto out_free_addr;
endp->target = appctx;
endp->ctx = appctx;
endp->flags |= CS_EP_T_APPLET;
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
if (!cs) {
ha_alert("Failed to initialize stream in dns_session_create().\n");
cs_endpoint_free(endp);
goto out_free_addr;
} }
s->target = &ds->dss->srv->obj_type; s = DISGUISE(cs_strm(cs));
if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr))) cs_si(s->csb)->dst = addr;
goto out_free_strm;
cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER; cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->target = &ds->dss->srv->obj_type;
s->flags = SF_ASSIGNED|SF_ADDR_SET;
s->do_log = NULL; s->do_log = NULL;
s->uniq_id = 0; s->uniq_id = 0;
@ -934,15 +940,12 @@ static struct appctx *dns_session_create(struct dns_session *ds)
return appctx; return appctx;
/* Error unrolling */ /* Error unrolling */
out_free_strm: out_free_addr:
LIST_DELETE(&s->list); sockaddr_free(&addr);
pool_free(pool_head_stream, s);
out_free_sess: out_free_sess:
session_free(sess); session_free(sess);
out_free_appctx: out_free_appctx:
appctx_free(appctx); appctx_free(appctx);
out_free_cs:
cs_free(cs);
out_close: out_close:
return NULL; return NULL;
} }

View File

@ -1988,16 +1988,13 @@ spoe_create_appctx(struct spoe_config *conf)
{ {
struct appctx *appctx; struct appctx *appctx;
struct session *sess; struct session *sess;
struct cs_endpoint *endp;
struct conn_stream *cs; struct conn_stream *cs;
struct stream *strm; struct stream *strm;
cs = cs_new(NULL); if ((appctx = appctx_new(&spoe_applet)) == NULL)
if (!cs)
goto out_error; goto out_error;
if ((appctx = appctx_new(&spoe_applet, cs)) == NULL)
goto out_free_cs;
appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx); appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx);
if (SPOE_APPCTX(appctx) == NULL) if (SPOE_APPCTX(appctx) == NULL)
goto out_free_appctx; goto out_free_appctx;
@ -2028,14 +2025,24 @@ spoe_create_appctx(struct spoe_config *conf)
if (!sess) if (!sess)
goto out_free_spoe; goto out_free_spoe;
if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL) endp = cs_endpoint_new();
if (!endp)
goto out_free_sess; goto out_free_sess;
endp->target = appctx;
endp->ctx = appctx;
endp->flags |= CS_EP_T_APPLET;
cs_attach_endp_app(cs, appctx, appctx); cs = cs_new_from_applet(endp, sess, &BUF_NULL);
if (!cs) {
cs_endpoint_free(endp);
goto out_free_sess;
}
strm = DISGUISE(cs_strm(cs));
stream_set_backend(strm, conf->agent->b.be); stream_set_backend(strm, conf->agent->b.be);
/* applet is waiting for data */ /* applet is waiting for data */
si_cant_get(strm->csf->si); si_cant_get(cs_si(strm->csf));
appctx_wakeup(appctx); appctx_wakeup(appctx);
strm->do_log = NULL; strm->do_log = NULL;
@ -2058,8 +2065,6 @@ spoe_create_appctx(struct spoe_config *conf)
pool_free(pool_head_spoe_appctx, SPOE_APPCTX(appctx)); pool_free(pool_head_spoe_appctx, SPOE_APPCTX(appctx));
out_free_appctx: out_free_appctx:
appctx_free(appctx); appctx_free(appctx);
out_free_cs:
cs_free(cs);
out_error: out_error:
return NULL; return NULL;
} }

View File

@ -2918,8 +2918,9 @@ __LJMP static int hlua_socket_new(lua_State *L)
struct hlua_socket *socket; struct hlua_socket *socket;
struct appctx *appctx; struct appctx *appctx;
struct session *sess; struct session *sess;
struct cs_endpoint *endp;
struct conn_stream *cs; struct conn_stream *cs;
struct stream *strm; struct stream *s;
/* Check stack size. */ /* Check stack size. */
if (!lua_checkstack(L, 3)) { if (!lua_checkstack(L, 3)) {
@ -2944,17 +2945,11 @@ __LJMP static int hlua_socket_new(lua_State *L)
lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref); lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref);
lua_setmetatable(L, -2); lua_setmetatable(L, -2);
cs = cs_new(NULL);
if (!cs) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_conf;
}
/* Create the applet context */ /* Create the applet context */
appctx = appctx_new(&update_applet, cs); appctx = appctx_new(&update_applet);
if (!appctx) { if (!appctx) {
hlua_pusherror(L, "socket: out of memory"); hlua_pusherror(L, "socket: out of memory");
goto out_fail_cs; goto out_fail_conf;
} }
appctx->ctx.hlua_cosocket.connected = 0; appctx->ctx.hlua_cosocket.connected = 0;
@ -2969,13 +2964,21 @@ __LJMP static int hlua_socket_new(lua_State *L)
goto out_fail_appctx; goto out_fail_appctx;
} }
strm = stream_new(sess, cs, &BUF_NULL); endp = cs_endpoint_new();
if (!strm) { if (!endp)
goto out_fail_sess;
endp->target = appctx;
endp->ctx = appctx;
endp->flags |= CS_EP_T_APPLET;
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
if (!cs) {
hlua_pusherror(L, "socket: out of memory"); hlua_pusherror(L, "socket: out of memory");
cs_endpoint_free(endp);
goto out_fail_sess; goto out_fail_sess;
} }
cs_attach_endp_app(cs, appctx, appctx); s = DISGUISE(cs_strm(cs));
/* Initialise cross reference between stream and Lua socket object. */ /* Initialise cross reference between stream and Lua socket object. */
xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref); xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref);
@ -2984,11 +2987,11 @@ __LJMP static int hlua_socket_new(lua_State *L)
* and retrieve data from the server. The connection is initialized * and retrieve data from the server. The connection is initialized
* with the "struct server". * with the "struct server".
*/ */
si_set_state(strm->csb->si, SI_ST_ASS); si_set_state(cs_si(s->csb), SI_ST_ASS);
/* Force destination server. */ /* Force destination server. */
strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED; s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
strm->target = &socket_tcp->obj_type; s->target = &socket_tcp->obj_type;
return 1; return 1;
@ -2996,8 +2999,6 @@ __LJMP static int hlua_socket_new(lua_State *L)
session_free(sess); session_free(sess);
out_fail_appctx: out_fail_appctx:
appctx_free(appctx); appctx_free(appctx);
out_fail_cs:
cs_free(cs);
out_fail_conf: out_fail_conf:
WILL_LJMP(lua_error(L)); WILL_LJMP(lua_error(L));
return 0; return 0;

View File

@ -455,8 +455,10 @@ struct appctx *httpclient_start(struct httpclient *hc)
struct applet *applet = &httpclient_applet; struct applet *applet = &httpclient_applet;
struct appctx *appctx; struct appctx *appctx;
struct session *sess; struct session *sess;
struct cs_endpoint *endp;
struct conn_stream *cs; struct conn_stream *cs;
struct stream *s; struct stream *s;
struct sockaddr_storage *addr = NULL;
int len; int len;
struct sockaddr_storage ss_url; struct sockaddr_storage ss_url;
struct sockaddr_storage* ss_dst; struct sockaddr_storage* ss_dst;
@ -476,17 +478,11 @@ struct appctx *httpclient_start(struct httpclient *hc)
goto out; goto out;
} }
cs = cs_new(NULL);
if (!cs) {
ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
goto out;
}
/* The HTTP client will be created in the same thread as the caller, /* The HTTP client will be created in the same thread as the caller,
* avoiding threading issues */ * avoiding threading issues */
appctx = appctx_new(applet, cs); appctx = appctx_new(applet);
if (!appctx) if (!appctx)
goto out_free_cs; goto out;
sess = session_new(httpclient_proxy, NULL, &appctx->obj_type); sess = session_new(httpclient_proxy, NULL, &appctx->obj_type);
if (!sess) { if (!sess) {
@ -494,25 +490,33 @@ struct appctx *httpclient_start(struct httpclient *hc)
goto out_free_appctx; goto out_free_appctx;
} }
if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) {
ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_sess;
}
/* set the "timeout server" */
s->req.wto = hc->timeout_server;
s->res.rto = hc->timeout_server;
/* if httpclient_set_dst() was used, sets the alternative address */ /* if httpclient_set_dst() was used, sets the alternative address */
if (hc->dst) if (hc->dst)
ss_dst = hc->dst; ss_dst = hc->dst;
else else
ss_dst = &ss_url; ss_dst = &ss_url;
if (!sockaddr_alloc(&cs_si(s->csb)->dst, ss_dst, sizeof(*hc->dst))) { if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst)))
ha_alert("httpclient: Failed to initialize stream in %s:%d.\n", __FUNCTION__, __LINE__); goto out_free_sess;
goto out_free_stream;
endp = cs_endpoint_new();
if (!endp)
goto out_free_addr;
endp->target = appctx;
endp->ctx = appctx;
endp->flags |= CS_EP_T_APPLET;
cs = cs_new_from_applet(endp, sess, &hc->req.buf);
if (!cs) {
ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
cs_endpoint_free(endp);
goto out_free_addr;
} }
s = DISGUISE(cs_strm(cs));
/* set the "timeout server" */
s->req.wto = hc->timeout_server;
s->res.rto = hc->timeout_server;
/* choose the SSL server or not */ /* choose the SSL server or not */
switch (out.scheme) { switch (out.scheme) {
@ -529,9 +533,9 @@ struct appctx *httpclient_start(struct httpclient *hc)
break; break;
} }
cs_attach_endp_app(cs, appctx, appctx); cs_si(s->csb)->dst = addr;
s->flags |= SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER; cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->flags |= SF_ASSIGNED|SF_ADDR_SET;
s->res.flags |= CF_READ_DONTWAIT; s->res.flags |= CF_READ_DONTWAIT;
/* applet is waiting for data */ /* applet is waiting for data */
@ -550,14 +554,16 @@ struct appctx *httpclient_start(struct httpclient *hc)
return appctx; return appctx;
out_free_stream: out_free_stream:
cs_detach_app(cs);
LIST_DELETE(&s->list); LIST_DELETE(&s->list);
pool_free(pool_head_stream, s); pool_free(pool_head_stream, s);
cs_free(cs);
out_free_addr:
sockaddr_free(&addr);
out_free_sess: out_free_sess:
session_free(sess); session_free(sess);
out_free_appctx: out_free_appctx:
appctx_free(appctx); appctx_free(appctx);
out_free_cs:
cs_free(cs);
out: out:
return NULL; return NULL;

View File

@ -1130,7 +1130,7 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co
TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn); TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn);
goto out; goto out;
} }
cs_attach_endp_mux(cs, fstrm, fconn->conn); cs_attach_mux(cs, fstrm, fconn->conn);
fstrm->cs = cs; fstrm->cs = cs;
fstrm->sess = sess; fstrm->sess = sess;
fconn->nb_cs++; fconn->nb_cs++;

View File

@ -717,9 +717,9 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
{ {
struct h1c *h1c = h1s->h1c; struct h1c *h1c = h1s->h1c;
struct cs_endpoint *endp; struct cs_endpoint *endp;
struct conn_stream *cs;
TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s); TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
endp = cs_endpoint_new(); endp = cs_endpoint_new();
if (!endp) { if (!endp) {
TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
@ -727,36 +727,27 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
} }
endp->target = h1s; endp->target = h1s;
endp->ctx = h1c->conn; endp->ctx = h1c->conn;
endp->flags |= CS_EP_T_MUX;
if (h1s->flags & H1S_F_NOT_FIRST) if (h1s->flags & H1S_F_NOT_FIRST)
endp->flags |= CS_EP_NOT_FIRST; endp->flags |= CS_EP_NOT_FIRST;
if (h1s->req.flags & H1_MF_UPG_WEBSOCKET) if (h1s->req.flags & H1_MF_UPG_WEBSOCKET)
endp->flags |= CS_EP_WEBSOCKET; endp->flags |= CS_EP_WEBSOCKET;
cs = cs_new(endp); h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input);
if (!cs) { if (!h1s->cs) {
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
cs_endpoint_free(endp); cs_endpoint_free(endp);
goto err; goto err;
} }
cs_attach_endp_mux(cs, h1s, h1c->conn);
h1s->cs = cs;
if (!stream_new(h1c->conn->owner, cs, input)) {
TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
goto err_cs;
}
HA_ATOMIC_INC(&h1c->px_counters->open_streams); HA_ATOMIC_INC(&h1c->px_counters->open_streams);
HA_ATOMIC_INC(&h1c->px_counters->total_streams); HA_ATOMIC_INC(&h1c->px_counters->total_streams);
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY; h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
TRACE_LEAVE(H1_EV_STRM_NEW, h1c->conn, h1s); TRACE_LEAVE(H1_EV_STRM_NEW, h1c->conn, h1s);
return cs; return h1s->cs;
err_cs:
cs_free(cs);
err: err:
h1s->cs = NULL;
TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn, h1s); TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn, h1s);
return NULL; return NULL;
} }
@ -856,7 +847,7 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s
if (!h1s) if (!h1s)
goto fail; goto fail;
cs_attach_endp_mux(cs, h1s, h1c->conn); cs_attach_mux(cs, h1s, h1c->conn);
h1s->flags |= H1S_F_RX_BLK; h1s->flags |= H1S_F_RX_BLK;
h1s->cs = cs; h1s->cs = cs;
h1s->sess = sess; h1s->sess = sess;
@ -1004,7 +995,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
if (!h1c_frt_stream_new(h1c)) if (!h1c_frt_stream_new(h1c))
goto fail; goto fail;
h1c->h1s->cs = cs; h1c->h1s->cs = cs;
cs_attach_endp_mux(cs, h1c->h1s, conn); cs_attach_mux(cs, h1c->h1s, conn);
/* Attach the CS but Not ready yet */ /* Attach the CS but Not ready yet */
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED; h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;

View File

@ -1591,7 +1591,6 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
{ {
struct session *sess = h2c->conn->owner; struct session *sess = h2c->conn->owner;
struct cs_endpoint *endp; struct cs_endpoint *endp;
struct conn_stream *cs;
struct h2s *h2s; struct h2s *h2s;
TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn); TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn);
@ -1608,30 +1607,26 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
goto out_close; goto out_close;
endp->target = h2s; endp->target = h2s;
endp->ctx = h2c->conn; endp->ctx = h2c->conn;
endp->flags |= CS_EP_NOT_FIRST; endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST);
/* FIXME wrong analogy between ext-connect and websocket, this need to /* FIXME wrong analogy between ext-connect and websocket, this need to
* be refine. * be refine.
*/ */
if (flags & H2_SF_EXT_CONNECT_RCVD) if (flags & H2_SF_EXT_CONNECT_RCVD)
endp->flags |= CS_EP_WEBSOCKET; endp->flags |= CS_EP_WEBSOCKET;
cs = cs_new(endp);
if (!cs) {
cs_endpoint_free(endp);
goto out_close;
}
cs_attach_endp_mux(cs, h2s, h2c->conn);
h2s->cs = cs;
h2c->nb_cs++;
/* The stream will record the request's accept date (which is either the /* The stream will record the request's accept date (which is either the
* end of the connection's or the date immediately after the previous * end of the connection's or the date immediately after the previous
* request) and the idle time, which is the delay since the previous * request) and the idle time, which is the delay since the previous
* request. We can set the value now, it will be copied by stream_new(). * request. We can set the value now, it will be copied by stream_new().
*/ */
sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake; sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake;
if (!stream_new(h2c->conn->owner, cs, input))
goto out_free_cs; h2s->cs = cs_new_from_mux(endp, sess, input);
if (!h2s->cs) {
cs_endpoint_free(endp);
goto out_close;
}
h2c->nb_cs++;
/* We want the accept date presented to the next stream to be the one /* We want the accept date presented to the next stream to be the one
* we have now, the handshake time to be null (since the next stream * we have now, the handshake time to be null (since the next stream
@ -1649,12 +1644,6 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
TRACE_LEAVE(H2_EV_H2S_NEW, h2c->conn); TRACE_LEAVE(H2_EV_H2S_NEW, h2c->conn);
return h2s; return h2s;
out_free_cs:
h2c->nb_cs--;
if (!h2c->nb_cs)
h2c->idle_start = now_ms;
cs_free(cs);
h2s->cs = NULL;
out_close: out_close:
h2s_destroy(h2s); h2s_destroy(h2s);
out: out:
@ -1684,7 +1673,7 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s
if (!h2s) if (!h2s)
goto out; goto out;
cs_attach_endp_mux(cs, h2s, h2c->conn); cs_attach_mux(cs, h2s, h2c->conn);
h2s->cs = cs; h2s->cs = cs;
h2s->sess = sess; h2s->sess = sess;
h2c->nb_cs++; h2c->nb_cs++;

View File

@ -297,18 +297,14 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
goto fail_free_ctx; goto fail_free_ctx;
endp->target = ctx; endp->target = ctx;
endp->ctx = conn; endp->ctx = conn;
cs = cs_new(endp); endp->flags |= CS_EP_T_MUX;
cs = cs_new_from_mux(endp, sess, input);
if (!cs) { if (!cs) {
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn); TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
cs_endpoint_free(endp); cs_endpoint_free(endp);
goto fail_free_ctx; goto fail_free_ctx;
} }
cs_attach_endp_mux(cs, ctx, conn);
if (!stream_new(conn->owner, cs, &BUF_NULL)) {
TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
goto fail_free;
}
TRACE_POINT(PT_EV_STRM_NEW, conn, cs); TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
} }
conn->ctx = ctx; conn->ctx = ctx;
@ -320,9 +316,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs); TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
return 0; return 0;
fail_free: fail_free_ctx:
cs_free(cs);
fail_free_ctx:
if (ctx->wait_event.tasklet) if (ctx->wait_event.tasklet)
tasklet_free(ctx->wait_event.tasklet); tasklet_free(ctx->wait_event.tasklet);
pool_free(pool_head_pt_ctx, ctx); pool_free(pool_head_pt_ctx, ctx);
@ -379,7 +373,7 @@ static int mux_pt_attach(struct connection *conn, struct conn_stream *cs, struct
TRACE_ENTER(PT_EV_STRM_NEW, conn); TRACE_ENTER(PT_EV_STRM_NEW, conn);
if (ctx->wait_event.events) if (ctx->wait_event.events)
conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event); conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
cs_attach_endp_mux(cs, ctx, conn); cs_attach_mux(cs, ctx, conn);
ctx->cs = cs; ctx->cs = cs;
cs->flags |= CS_FL_RCV_MORE; cs->flags |= CS_FL_RCV_MORE;

View File

@ -3181,8 +3181,10 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
struct proxy *p = peers->peers_fe; /* attached frontend */ struct proxy *p = peers->peers_fe; /* attached frontend */
struct appctx *appctx; struct appctx *appctx;
struct session *sess; struct session *sess;
struct cs_endpoint *endp;
struct conn_stream *cs; struct conn_stream *cs;
struct stream *s; struct stream *s;
struct sockaddr_storage *addr = NULL;
peer->new_conn++; peer->new_conn++;
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
@ -3191,15 +3193,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
peer->last_hdshk = now_ms; peer->last_hdshk = now_ms;
s = NULL; s = NULL;
cs = cs_new(NULL); appctx = appctx_new(&peer_applet);
if (!cs) {
ha_alert("out of memory in peer_session_create().\n");
goto out_close;
}
appctx = appctx_new(&peer_applet, cs);
if (!appctx) if (!appctx)
goto out_free_cs; goto out_close;
appctx->st0 = PEER_SESS_ST_CONNECT; appctx->st0 = PEER_SESS_ST_CONNECT;
appctx->ctx.peers.ptr = (void *)peer; appctx->ctx.peers.ptr = (void *)peer;
@ -3210,23 +3206,34 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
goto out_free_appctx; goto out_free_appctx;
} }
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
ha_alert("Failed to initialize stream in peer_session_create().\n");
goto out_free_sess; goto out_free_sess;
endp = cs_endpoint_new();
if (!endp)
goto out_free_addr;
endp->target = appctx;
endp->ctx = appctx;
endp->flags |= CS_EP_T_APPLET;
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
if (!cs) {
ha_alert("Failed to initialize stream in peer_session_create().\n");
cs_endpoint_free(endp);
goto out_free_addr;
} }
s = DISGUISE(cs_strm(cs));
/* applet is waiting for data */ /* applet is waiting for data */
si_cant_get(cs_si(s->csf)); si_cant_get(cs_si(s->csf));
appctx_wakeup(appctx); appctx_wakeup(appctx);
/* initiate an outgoing connection */ /* initiate an outgoing connection */
s->target = peer_session_target(peer, s); cs_si(s->csb)->dst = addr;
if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr)))
goto out_free_strm;
cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER; cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->flags = SF_ASSIGNED|SF_ADDR_SET;
s->target = peer_session_target(peer, s);
s->do_log = NULL; s->do_log = NULL;
s->uniq_id = 0; s->uniq_id = 0;
@ -3238,15 +3245,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
return appctx; return appctx;
/* Error unrolling */ /* Error unrolling */
out_free_strm: out_free_addr:
LIST_DELETE(&s->list); sockaddr_free(&addr);
pool_free(pool_head_stream, s);
out_free_sess: out_free_sess:
session_free(sess); session_free(sess);
out_free_appctx: out_free_appctx:
appctx_free(appctx); appctx_free(appctx);
out_free_cs:
cs_free(cs);
out_close: out_close:
return NULL; return NULL;
} }

View File

@ -636,22 +636,18 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
struct proxy *p = sink->forward_px; struct proxy *p = sink->forward_px;
struct appctx *appctx; struct appctx *appctx;
struct session *sess; struct session *sess;
struct cs_endpoint *endp;
struct conn_stream *cs; struct conn_stream *cs;
struct stream *s; struct stream *s;
struct applet *applet = &sink_forward_applet; struct applet *applet = &sink_forward_applet;
struct sockaddr_storage *addr = NULL;
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
applet = &sink_forward_oc_applet; applet = &sink_forward_oc_applet;
cs = cs_new(NULL); appctx = appctx_new(applet);
if (!cs) {
ha_alert("out of memory in sink_forward_session_create");
goto out_close;
}
appctx = appctx_new(applet, cs);
if (!appctx) if (!appctx)
goto out_free_cs; goto out_close;
appctx->ctx.sft.ptr = (void *)sft; appctx->ctx.sft.ptr = (void *)sft;
@ -661,19 +657,29 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
goto out_free_appctx; goto out_free_appctx;
} }
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
goto out_free_sess; goto out_free_sess;
}
endp = cs_endpoint_new();
if (!endp)
goto out_free_addr;
endp->target = appctx;
endp->ctx = appctx;
endp->flags |= CS_EP_T_APPLET;
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
if (!cs) {
ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
cs_endpoint_free(endp);
goto out_free_addr;
}
s = DISGUISE(cs_strm(cs));
cs_si(s->csb)->dst = addr;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->target = &sft->srv->obj_type; s->target = &sft->srv->obj_type;
if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr)))
goto out_free_strm;
cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET; s->flags = SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->do_log = NULL; s->do_log = NULL;
s->uniq_id = 0; s->uniq_id = 0;
@ -688,15 +694,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
return appctx; return appctx;
/* Error unrolling */ /* Error unrolling */
out_free_strm: out_free_addr:
LIST_DELETE(&s->list); sockaddr_free(&addr);
pool_free(pool_head_stream, s);
out_free_sess: out_free_sess:
session_free(sess); session_free(sess);
out_free_appctx: out_free_appctx:
appctx_free(appctx); appctx_free(appctx);
out_free_cs:
cs_free(cs);
out_close: out_close:
return NULL; return NULL;
} }

View File

@ -443,15 +443,13 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
s->flags |= SF_HTX; s->flags |= SF_HTX;
s->csf = cs; s->csf = cs;
s->csb = cs_new(NULL); if (cs_attach_strm(s->csf, s) < 0)
goto out_fail_attach_csf;
s->csb = cs_new_from_strm(s, CS_FL_NONE);
if (!s->csb) if (!s->csb)
goto out_fail_alloc_csb; goto out_fail_alloc_csb;
if (cs_attach_app(s->csf, &s->obj_type) < 0)
goto out_fail_attach_csf;
if (cs_attach_app(s->csb, &s->obj_type) < 0)
goto out_fail_attach_csb;
si_set_state(cs_si(s->csf), SI_ST_EST); si_set_state(cs_si(s->csf), SI_ST_EST);
cs_si(s->csf)->hcto = sess->fe->timeout.clientfin; cs_si(s->csf)->hcto = sess->fe->timeout.clientfin;

View File

@ -339,10 +339,11 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si)); DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
appctx = appctx_new(app, si->cs); appctx = appctx_new(app);
if (!appctx) if (!appctx)
return NULL; return NULL;
cs_attach_endp_app(si->cs, appctx, appctx); cs_attach_applet(si->cs, appctx, appctx);
appctx->owner = si->cs;
appctx->t->nice = si_strm(si)->task->nice; appctx->t->nice = si_strm(si)->task->nice;
si_cant_get(si); si_cant_get(si);
appctx_wakeup(appctx); appctx_wakeup(appctx);

View File

@ -1101,7 +1101,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check); TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
goto out; goto out;
} }
cs_attach_endp_mux(check->cs, NULL, conn); cs_attach_mux(check->cs, NULL, conn);
conn->ctx = check->cs;
tasklet_set_tid(check->wait_list.tasklet, tid); tasklet_set_tid(check->wait_list.tasklet, tid);
conn_set_owner(conn, check->sess, NULL); conn_set_owner(conn, check->sess, NULL);