MAJOR: stream/conn_stream: Move the stream-interface into the conn-stream

Thanks to all previous changes, it is now possible to move the
stream-interface into the conn-stream. To do so, some SI functions are
removed and their conn-stream counterparts are added. In addition, the
conn-stream is now responsible to create and release the
stream-interface. While the stream-interfaces were inlined in the stream
structure, there is now a pointer in the conn-stream. stream-interfaces are
now dynamically allocated. Thus a dedicated pool is added. It is a temporary
change because, at the end, the stream-interface structure will most
probably disappear.
This commit is contained in:
Christopher Faulet 2021-12-23 17:28:17 +01:00
parent 9a86f6399f
commit cda94accb1
27 changed files with 261 additions and 242 deletions

View File

@ -68,18 +68,18 @@ static inline struct stream *chn_strm(const struct channel *chn)
static inline struct stream_interface *chn_prod(const struct channel *chn) static inline struct stream_interface *chn_prod(const struct channel *chn)
{ {
if (chn->flags & CF_ISRESP) if (chn->flags & CF_ISRESP)
return &LIST_ELEM(chn, struct stream *, res)->si[1]; return LIST_ELEM(chn, struct stream *, res)->csb->si;
else else
return &LIST_ELEM(chn, struct stream *, req)->si[0]; return LIST_ELEM(chn, struct stream *, req)->csf->si;
} }
/* returns a pointer to the stream interface consuming the channel (producer) */ /* returns a pointer to the stream interface consuming the channel (producer) */
static inline struct stream_interface *chn_cons(const struct channel *chn) static inline struct stream_interface *chn_cons(const struct channel *chn)
{ {
if (chn->flags & CF_ISRESP) if (chn->flags & CF_ISRESP)
return &LIST_ELEM(chn, struct stream *, res)->si[0]; return LIST_ELEM(chn, struct stream *, res)->csf->si;
else else
return &LIST_ELEM(chn, struct stream *, req)->si[1]; return LIST_ELEM(chn, struct stream *, req)->csb->si;
} }
/* c_orig() : returns the pointer to the channel buffer's origin */ /* c_orig() : returns the pointer to the channel buffer's origin */

View File

@ -25,6 +25,8 @@
#include <haproxy/obj_type-t.h> #include <haproxy/obj_type-t.h>
struct stream_interface;
/* conn_stream flags */ /* conn_stream flags */
enum { enum {
CS_FL_NONE = 0x00000000, /* Just for initialization purposes */ CS_FL_NONE = 0x00000000, /* Just for initialization purposes */
@ -92,7 +94,7 @@ struct conn_stream {
unsigned int flags; /* CS_FL_* */ unsigned int flags; /* CS_FL_* */
enum obj_type *end; /* points to the end point (connection or appctx) */ enum obj_type *end; /* points to the end point (connection or appctx) */
enum obj_type *app; /* points to the applicative point (stream or check) */ enum obj_type *app; /* points to the applicative point (stream or check) */
void *data; /* pointer to upper layer's entity (eg: stream interface) */ struct stream_interface *si;
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
void *ctx; /* mux-specific context */ void *ctx; /* mux-specific context */
}; };

View File

@ -29,13 +29,16 @@
#include <haproxy/obj_type.h> #include <haproxy/obj_type.h>
struct stream; struct stream;
struct stream_interface;
struct check; struct check;
#define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(cs_conn(cs))) #define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(cs_conn(cs)))
struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb); struct conn_stream *cs_new();
void cs_free(struct conn_stream *cs); void cs_free(struct conn_stream *cs);
void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx);
int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
void cs_detach_endp(struct conn_stream *cs);
/* /*
* Initializes all required fields for a new conn_strema. * Initializes all required fields for a new conn_strema.
@ -47,7 +50,7 @@ static inline void cs_init(struct conn_stream *cs)
cs->end = NULL; cs->end = NULL;
cs->app = NULL; cs->app = NULL;
cs->ctx = NULL; cs->ctx = NULL;
cs->data = NULL; cs->si = NULL;
cs->data_cb = NULL; cs->data_cb = NULL;
} }
@ -77,11 +80,6 @@ static inline struct appctx *cs_appctx(const struct conn_stream *cs)
return (cs ? objt_appctx(cs->end) : NULL); return (cs ? objt_appctx(cs->end) : NULL);
} }
static inline struct stream_interface *cs_si(const struct conn_stream *cs)
{
return (cs ? cs->data : NULL);
}
static inline struct stream *cs_strm(const struct conn_stream *cs) static inline struct stream *cs_strm(const struct conn_stream *cs)
{ {
return (cs ? objt_stream(cs->app) : NULL); return (cs ? objt_stream(cs->app) : NULL);
@ -92,57 +90,9 @@ static inline struct check *cs_check(const struct conn_stream *cs)
return (cs ? objt_check(cs->app) : NULL); return (cs ? objt_check(cs->app) : NULL);
} }
/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */ static inline struct stream_interface *cs_si(const struct conn_stream *cs)
static inline void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
{ {
cs->end = endp; return (cs_strm(cs) ? cs->si : NULL);
cs->ctx = ctx;
}
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
static inline void cs_attach_app(struct conn_stream *cs, enum obj_type *app, void *data, const struct data_cb *data_cb)
{
cs->app = app;
cs->data = data;
cs->data_cb = data_cb;
}
/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux
* owns the connection ->detach() callback is called. Otherwise, it means the
* conn-stream owns the connection. In this case the connection is closed and
* released. For an applet, the appctx is released. At the end, the conn-stream
* is not released but some fields a reset.
*/
static inline void cs_detach_endp(struct conn_stream *cs)
{
struct connection *conn;
struct appctx *appctx;
if ((conn = cs_conn(cs))) {
if (conn->mux)
conn->mux->detach(cs);
else {
/* It's too early to have a mux, let's just destroy
* the connection
*/
conn_stop_tracking(conn);
conn_full_close(conn);
if (conn->destroy_cb)
conn->destroy_cb(conn);
conn_free(conn);
}
}
else if ((appctx = cs_appctx(cs))) {
if (appctx->applet->release)
appctx->applet->release(appctx);
appctx_free(appctx);
}
/* Rest CS */
cs->flags = CS_FL_NONE;
cs->end = NULL;
cs->ctx = NULL;
cs->data_cb = NULL;
} }
/* Release a conn_stream */ /* Release a conn_stream */

View File

@ -167,7 +167,6 @@ struct stream {
struct conn_stream *csf; /* frontend conn-stream */ struct conn_stream *csf; /* frontend conn-stream */
struct conn_stream *csb; /* backend conn-stream */ struct conn_stream *csb; /* backend conn-stream */
struct stream_interface si[2]; /* client and server stream interfaces */
struct strm_logs logs; /* logs for this stream */ struct strm_logs logs; /* logs for this stream */
void (*do_log)(struct stream *s); /* the function to call in order to log (or NULL) */ void (*do_log)(struct stream *s); /* the function to call in order to log (or NULL) */

View File

@ -33,6 +33,10 @@ extern struct si_ops si_embedded_ops;
extern struct si_ops si_conn_ops; extern struct si_ops si_conn_ops;
extern struct si_ops si_applet_ops; extern struct si_ops si_applet_ops;
extern struct data_cb si_conn_cb; extern struct data_cb si_conn_cb;
extern struct data_cb check_conn_cb;
struct stream_interface *si_new(struct conn_stream *cs);
void si_free(struct stream_interface *si);
/* main event functions used to move data between sockets and buffers */ /* main event functions used to move data between sockets and buffers */
int si_check_timeouts(struct stream_interface *si); int si_check_timeouts(struct stream_interface *si);
@ -87,7 +91,7 @@ static inline struct task *si_task(struct stream_interface *si)
/* returns the stream interface on the other side. Used during forwarding. */ /* returns the stream interface on the other side. Used during forwarding. */
static inline struct stream_interface *si_opposite(struct stream_interface *si) static inline struct stream_interface *si_opposite(struct stream_interface *si)
{ {
return ((si->flags & SI_FL_ISBACK) ? &(cs_strm(si->cs)->si[0]) : &(cs_strm(si->cs)->si[1])); return ((si->flags & SI_FL_ISBACK) ? cs_strm(si->cs)->csf->si : cs_strm(si->cs)->csb->si);
} }
/* initializes a stream interface in the SI_ST_INI state. It's detached from /* initializes a stream interface in the SI_ST_INI state. It's detached from
@ -105,6 +109,7 @@ static inline int si_reset(struct stream_interface *si)
si->cs = NULL; si->cs = NULL;
si->state = si->prev_state = SI_ST_INI; si->state = si->prev_state = SI_ST_INI;
si->ops = &si_embedded_ops; si->ops = &si_embedded_ops;
si->l7_buffer = BUF_NULL;
si->wait_event.tasklet = tasklet_new(); si->wait_event.tasklet = tasklet_new();
if (!si->wait_event.tasklet) if (!si->wait_event.tasklet)
return -1; return -1;
@ -137,81 +142,6 @@ static inline int si_state_in(enum si_state state, enum si_state_bit mask)
return !!(si_state_bit(state) & mask); return !!(si_state_bit(state) & mask);
} }
/* Reset the endpoint detaching it from the conn-stream. For a connection
* attached to a mux, it is unsubscribe from any event.
*/
static inline void si_reset_endpoint(struct stream_interface *si)
{
if (!si->cs)
return;
if (cs_conn_mux(si->cs) && si->wait_event.events != 0)
(cs_conn_mux(si->cs))->unsubscribe(si->cs, si->wait_event.events, &si->wait_event);
cs_detach_endp(si->cs);
si->ops = &si_embedded_ops;
}
/* Release the endpoint if it's a connection or an applet, then nullify it.
* Note: released connections are closed then freed.
*/
static inline void si_release_endpoint(struct stream_interface *si)
{
if (!si->cs)
return;
si_reset_endpoint(si);
cs_free(si->cs);
si->cs = NULL;
si->ops = &si_embedded_ops;
}
/* Attach conn_stream <cs> to the stream interface <si>. */
static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs)
{
si->cs = cs;
if (cs_conn(cs)) {
si->ops = &si_conn_ops;
cs_attach_app(cs, &si_strm(si)->obj_type, si, &si_conn_cb);
}
else if (cs_appctx(cs)) {
struct appctx *appctx = cs_appctx(cs);
si->ops = &si_applet_ops;
appctx->owner = cs;
cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL);
}
else {
si->ops = &si_embedded_ops;
cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL);
}
}
/* Attach connection <conn> to the stream interface <si>. The stream interface
* is configured to work with a connection context.
*/
static inline void si_attach_conn(struct stream_interface *si, struct connection *conn)
{
si_reset_endpoint(si);
if (!conn->ctx)
conn->ctx = si->cs;
si->ops = &si_conn_ops;
cs_attach_endp(si->cs, &conn->obj_type, conn);
cs_attach_app(si->cs, &si_strm(si)->obj_type, si, &si_conn_cb);
}
/* Attach appctx <appctx> to the stream interface <si>. The stream interface
* is configured to work with an applet context.
*/
static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx)
{
si_reset_endpoint(si);
appctx->owner = si->cs;
si->ops = &si_applet_ops;
cs_attach_endp(si->cs, &appctx->obj_type, appctx);
cs_attach_app(si->cs, &si_strm(si)->obj_type, si, NULL);
}
/* call the applet's release function if any. Needs to be called upon close() */ /* call the applet's release function if any. Needs to be called upon close() */
static inline void si_applet_release(struct stream_interface *si) static inline void si_applet_release(struct stream_interface *si)
{ {

View File

@ -1495,9 +1495,9 @@ int connect_server(struct stream *s)
} }
if (avail >= 1) { if (avail >= 1) {
si_attach_conn(cs_si(s->csb), srv_conn); cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) { if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
si_reset_endpoint(cs_si(s->csb)); cs_detach_endp(s->csb);
srv_conn = NULL; srv_conn = NULL;
} }
} }
@ -1571,7 +1571,7 @@ int connect_server(struct stream *s)
return SF_ERR_INTERNAL; /* how did we get there ? */ return SF_ERR_INTERNAL; /* how did we get there ? */
} }
si_attach_conn(cs_si(s->csb), srv_conn); cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation) #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
if (!srv || if (!srv ||
(srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) || (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
@ -2289,7 +2289,7 @@ void back_handle_st_cer(struct stream *s)
* Note: the stream-interface will be switched to ST_REQ, ST_ASS or * Note: the stream-interface will be switched to ST_REQ, ST_ASS or
* ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset. * ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset.
*/ */
si_reset_endpoint(cs_si(s->csb)); cs_detach_endp(s->csb);
stream_choose_redispatch(s); stream_choose_redispatch(s);

View File

@ -2715,7 +2715,7 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
* connection. * connection.
*/ */
if (!si_conn_ready(cs_si(s->csb))) { if (!si_conn_ready(cs_si(s->csb))) {
si_reset_endpoint(cs_si(s->csb)); cs_detach_endp(s->csb);
s->srv_conn = NULL; s->srv_conn = NULL;
} }

View File

@ -14,7 +14,7 @@
#include <haproxy/connection.h> #include <haproxy/connection.h>
#include <haproxy/conn_stream.h> #include <haproxy/conn_stream.h>
#include <haproxy/pool.h> #include <haproxy/pool.h>
//#include <haproxy/stream_interface.h> #include <haproxy/stream_interface.h>
DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream)); DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
@ -22,7 +22,7 @@ DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
/* Tries to allocate a new conn_stream and initialize its main fields. On /* Tries to allocate a new conn_stream and initialize its main fields. On
* failure, nothing is allocated and NULL is returned. * failure, nothing is allocated and NULL is returned.
*/ */
struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb) struct conn_stream *cs_new()
{ {
struct conn_stream *cs; struct conn_stream *cs;
@ -30,8 +30,6 @@ struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, v
if (unlikely(!cs)) if (unlikely(!cs))
return NULL; return NULL;
cs_init(cs); cs_init(cs);
cs_attach_endp(cs, endp, ctx);
cs_attach_app(cs, app, data, data_cb);
return cs; return cs;
} }
@ -40,5 +38,106 @@ struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, v
*/ */
void cs_free(struct conn_stream *cs) void cs_free(struct conn_stream *cs)
{ {
si_free(cs->si);
pool_free(pool_head_connstream, cs); pool_free(pool_head_connstream, cs);
} }
/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */
void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
{
struct connection *conn;
struct appctx *appctx;
cs->end = endp;
cs->ctx = ctx;
if ((conn = objt_conn(endp)) != NULL) {
if (!conn->ctx)
conn->ctx = cs;
if (cs_strm(cs)) {
cs->si->ops = &si_conn_ops;
cs->data_cb = &si_conn_cb;
}
else if (cs_check(cs))
cs->data_cb = &check_conn_cb;
}
else if ((appctx = objt_appctx(endp)) != NULL) {
appctx->owner = cs;
if (cs->si) {
cs->si->ops = &si_applet_ops;
cs->data_cb = NULL;
}
}
}
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
int cs_attach_app(struct conn_stream *cs, enum obj_type *app)
{
cs->app = app;
if (objt_stream(app)) {
if (!cs->si)
cs->si = si_new(cs);
if (unlikely(!cs->si))
return -1;
if (cs_conn(cs)) {
cs->si->ops = &si_conn_ops;
cs->data_cb = &si_conn_cb;
}
else if (cs_appctx(cs)) {
cs->si->ops = &si_applet_ops;
cs->data_cb = NULL;
}
else {
cs->si->ops = &si_embedded_ops;
cs->data_cb = NULL;
}
}
else if (objt_check(app))
cs->data_cb = &check_conn_cb;
return 0;
}
/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux
* owns the connection ->detach() callback is called. Otherwise, it means the
* conn-stream owns the connection. In this case the connection is closed and
* released. For an applet, the appctx is released. At the end, the conn-stream
* is not released but some fields a reset.
*/
void cs_detach_endp(struct conn_stream *cs)
{
struct connection *conn;
struct appctx *appctx;
if ((conn = cs_conn(cs))) {
if (conn->mux) {
if (cs->si && cs->si->wait_event.events != 0)
conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
conn->mux->detach(cs);
}
else {
/* It's too early to have a mux, let's just destroy
* the connection
*/
conn_stop_tracking(conn);
conn_full_close(conn);
if (conn->destroy_cb)
conn->destroy_cb(conn);
conn_free(conn);
}
}
else if ((appctx = cs_appctx(cs))) {
if (appctx->applet->release)
appctx->applet->release(appctx);
appctx_free(appctx);
}
/* Rest CS */
cs->flags = CS_FL_NONE;
cs->end = NULL;
cs->ctx = NULL;
if (cs->si)
cs->si->ops = &si_embedded_ops;
cs->data_cb = NULL;
}

View File

@ -1738,8 +1738,8 @@ static int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct
memcpy(hdr->sig, pp2_signature, PP2_SIGNATURE_LEN); memcpy(hdr->sig, pp2_signature, PP2_SIGNATURE_LEN);
if (strm) { if (strm) {
src = si_src(&strm->si[0]); src = si_src(strm->csf->si);
dst = si_dst(&strm->si[0]); dst = si_dst(strm->csf->si);
} }
else if (remote && conn_get_src(remote) && conn_get_dst(remote)) { else if (remote && conn_get_src(remote) && conn_get_dst(remote)) {
src = conn_src(remote); src = conn_src(remote);
@ -1937,8 +1937,8 @@ int make_proxy_line(char *buf, int buf_len, struct server *srv, struct connectio
const struct sockaddr_storage *dst = NULL; const struct sockaddr_storage *dst = NULL;
if (strm) { if (strm) {
src = si_src(&strm->si[0]); src = si_src(strm->csf->si);
dst = si_dst(&strm->si[0]); dst = si_dst(strm->csf->si);
} }
else if (remote && conn_get_src(remote) && conn_get_dst(remote)) { else if (remote && conn_get_src(remote) && conn_get_dst(remote)) {
src = conn_src(remote); src = conn_src(remote);

View File

@ -903,11 +903,12 @@ static struct appctx *dns_session_create(struct dns_session *ds)
goto out_free_appctx; goto out_free_appctx;
} }
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); cs = cs_new();
if (!cs) { if (!cs) {
ha_alert("out of memory in dns_session_create().\n"); ha_alert("out of memory in dns_session_create().\n");
goto out_free_sess; goto out_free_sess;
} }
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in dns_session_create().\n"); ha_alert("Failed to initialize stream in dns_session_create().\n");

View File

@ -2024,9 +2024,10 @@ spoe_create_appctx(struct spoe_config *conf)
if (!sess) if (!sess)
goto out_free_spoe; goto out_free_spoe;
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); cs = cs_new();
if (!cs) if (!cs)
goto out_free_sess; goto out_free_sess;
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL) if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
goto out_free_cs; goto out_free_cs;
@ -2034,7 +2035,7 @@ spoe_create_appctx(struct spoe_config *conf)
stream_set_backend(strm, conf->agent->b.be); stream_set_backend(strm, conf->agent->b.be);
/* applet is waiting for data */ /* applet is waiting for data */
si_cant_get(&strm->si[0]); si_cant_get(strm->csf->si);
appctx_wakeup(appctx); appctx_wakeup(appctx);
strm->do_log = NULL; strm->do_log = NULL;

View File

@ -176,9 +176,10 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
if (fin) if (fin)
htx->flags |= HTX_FL_EOM; htx->flags |= HTX_FL_EOM;
cs = cs_new(qcs->qcc->conn->obj_type); cs = cs_new();
if (!cs) if (!cs)
return 1; return 1;
cs_attach_endp(&qcs->qcc->conn->obj_type, qcs);
cs->flags |= CS_FL_NOT_FIRST; cs->flags |= CS_FL_NOT_FIRST;
cs->ctx = qcs; cs->ctx = qcs;

View File

@ -2961,11 +2961,12 @@ __LJMP static int hlua_socket_new(lua_State *L)
goto out_fail_appctx; goto out_fail_appctx;
} }
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); cs = cs_new();
if (!cs) { if (!cs) {
hlua_pusherror(L, "socket: out of memory"); hlua_pusherror(L, "socket: out of memory");
goto out_fail_sess; goto out_fail_sess;
} }
cs_attach_endp(cs, &appctx->obj_type, appctx);
strm = stream_new(sess, cs, &BUF_NULL); strm = stream_new(sess, cs, &BUF_NULL);
if (!strm) { if (!strm) {
@ -2980,7 +2981,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
* and retrieve data from the server. The connection is initialized * and retrieve data from the server. The connection is initialized
* with the "struct server". * with the "struct server".
*/ */
si_set_state(&strm->si[1], SI_ST_ASS); si_set_state(strm->csb->si, SI_ST_ASS);
/* Force destination server. */ /* Force destination server. */
strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED; strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;

View File

@ -72,10 +72,10 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
htx_add_endof(htx, HTX_BLK_EOH); htx_add_endof(htx, HTX_BLK_EOH);
htx_to_buf(htx, &htx_buf); htx_to_buf(htx, &htx_buf);
cs = cs_new(&qcs->qcc->conn->obj_type); cs = cs_new();
if (!cs) if (!cs)
return -1; return -1;
cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs);
cs->ctx = qcs; cs->ctx = qcs;
stream_create_from_cs(cs, &htx_buf); stream_create_from_cs(cs, &htx_buf);

View File

@ -1257,7 +1257,7 @@ static __inline int do_l7_retry(struct stream *s, struct stream_interface *si)
res->to_forward = 0; res->to_forward = 0;
res->analyse_exp = TICK_ETERNITY; res->analyse_exp = TICK_ETERNITY;
res->total = 0; res->total = 0;
si_reset_endpoint(cs_si(s->csb)); cs_detach_endp(s->csb);
b_free(&req->buf); b_free(&req->buf);
/* Swap the L7 buffer with the channel buffer */ /* Swap the L7 buffer with the channel buffer */

View File

@ -486,11 +486,12 @@ struct appctx *httpclient_start(struct httpclient *hc)
ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__); ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_appctx; goto out_free_appctx;
} }
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); cs = cs_new();
if (!cs) { if (!cs) {
ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__); ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_sess; goto out_free_sess;
} }
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) { if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) {
ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__); ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_cs; goto out_free_cs;

View File

@ -1186,7 +1186,7 @@ static int smp_fetch_base32(const struct arg *args, struct sample *smp, const ch
*/ */
static int smp_fetch_base32_src(const struct arg *args, struct sample *smp, const char *kw, void *private) static int smp_fetch_base32_src(const struct arg *args, struct sample *smp, const char *kw, void *private)
{ {
const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL); const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL);
struct buffer *temp; struct buffer *temp;
if (!src) if (!src)
@ -2053,7 +2053,7 @@ static int smp_fetch_url32(const struct arg *args, struct sample *smp, const cha
*/ */
static int smp_fetch_url32_src(const struct arg *args, struct sample *smp, const char *kw, void *private) static int smp_fetch_url32_src(const struct arg *args, struct sample *smp, const char *kw, void *private)
{ {
const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL); const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL);
struct buffer *temp; struct buffer *temp;
if (!src) if (!src)

View File

@ -682,11 +682,12 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
struct conn_stream *cs; struct conn_stream *cs;
TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s); TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
cs = cs_new(&h1c->conn->obj_type, h1s, NULL, NULL, NULL); cs = cs_new();
if (!cs) { if (!cs) {
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
goto err; goto err;
} }
cs_attach_endp(cs, &h1c->conn->obj_type, h1s);
h1s->cs = cs; h1s->cs = cs;
if (h1s->flags & H1S_F_NOT_FIRST) if (h1s->flags & H1S_F_NOT_FIRST)

View File

@ -1529,11 +1529,11 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
if (!h2s) if (!h2s)
goto out; goto out;
cs = cs_new(&h2c->conn->obj_type, h2s, NULL, NULL, NULL); cs = cs_new();
if (!cs) if (!cs)
goto out_close; goto out_close;
cs->flags |= CS_FL_NOT_FIRST; cs->flags |= CS_FL_NOT_FIRST;
cs_attach_endp(cs, &h2c->conn->obj_type, h2s);
h2s->cs = cs; h2s->cs = cs;
h2c->nb_cs++; h2c->nb_cs++;

View File

@ -291,11 +291,12 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
ctx->conn = conn; ctx->conn = conn;
if (!cs) { if (!cs) {
cs = cs_new(&conn->obj_type, NULL, NULL, NULL, NULL); cs = cs_new();
if (!cs) { if (!cs) {
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn); TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
goto fail_free_ctx; goto fail_free_ctx;
} }
cs_attach_endp(cs, &conn->obj_type, NULL);
if (!stream_new(conn->owner, cs, &BUF_NULL)) { if (!stream_new(conn->owner, cs, &BUF_NULL)) {
TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs); TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);

View File

@ -3204,11 +3204,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
goto out_free_appctx; goto out_free_appctx;
} }
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); cs = cs_new();
if (!cs) { if (!cs) {
ha_alert("out of memory in peer_session_create().\n"); ha_alert("out of memory in peer_session_create().\n");
goto out_free_sess; goto out_free_sess;
} }
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in peer_session_create().\n"); ha_alert("Failed to initialize stream in peer_session_create().\n");

View File

@ -655,11 +655,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
goto out_free_appctx; goto out_free_appctx;
} }
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); cs = cs_new();
if (!cs) { if (!cs) {
ha_alert("out of memory in sink_forward_session_create"); ha_alert("out of memory in sink_forward_session_create");
goto out_free_sess; goto out_free_sess;
} }
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in sink_forward_session_create().\n"); ha_alert("Failed to initialize stream in sink_forward_session_create().\n");

View File

@ -438,30 +438,26 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
if (sess->fe->mode == PR_MODE_HTTP) if (sess->fe->mode == PR_MODE_HTTP)
s->flags |= SF_HTX; s->flags |= SF_HTX;
cs->app = &s->obj_type;
s->csf = cs; s->csf = cs;
s->csb = cs_new(NULL, NULL, &s->obj_type, &s->si[1], NULL); s->csb = cs_new();
if (!s->csb) if (!s->csb)
goto out_fail_alloc_cs; goto out_fail_alloc_csb;
s->si[0].flags = SI_FL_NONE; if (cs_attach_app(s->csf, &s->obj_type) < 0)
if (si_reset(&s->si[0]) < 0) goto out_fail_attach_csf;
goto out_fail_reset_si0; if (cs_attach_app(s->csb, &s->obj_type) < 0)
si_attach_cs(&s->si[0], s->csf); goto out_fail_attach_csb;
si_set_state(&s->si[0], SI_ST_EST);
s->si[0].hcto = sess->fe->timeout.clientfin; si_set_state(cs_si(s->csf), SI_ST_EST);
cs_si(s->csf)->hcto = sess->fe->timeout.clientfin;
if (likely(sess->fe->options2 & PR_O2_INDEPSTR)) if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
s->si[0].flags |= SI_FL_INDEP_STR; cs_si(s->csf)->flags |= SI_FL_INDEP_STR;
s->si[1].flags = SI_FL_ISBACK;
if (si_reset(&s->si[1]) < 0)
goto out_fail_reset_si1;
si_attach_cs(&s->si[1], s->csb);
s->si[1].hcto = TICK_ETERNITY;
cs_si(s->csb)->flags = SI_FL_ISBACK;
cs_si(s->csb)->hcto = TICK_ETERNITY;
if (likely(sess->fe->options2 & PR_O2_INDEPSTR)) if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
s->si[1].flags |= SI_FL_INDEP_STR; cs_si(s->csb)->flags |= SI_FL_INDEP_STR;
if (cs->flags & CS_FL_WEBSOCKET) if (cs->flags & CS_FL_WEBSOCKET)
s->flags |= SF_WEBSOCKET; s->flags |= SF_WEBSOCKET;
@ -470,7 +466,7 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
if (mux) { if (mux) {
if (mux->flags & MX_FL_CLEAN_ABRT) if (mux->flags & MX_FL_CLEAN_ABRT)
s->si[0].flags |= SI_FL_CLEAN_ABRT; cs_si(s->csf)->flags |= SI_FL_CLEAN_ABRT;
if (mux->flags & MX_FL_HTX) if (mux->flags & MX_FL_HTX)
s->flags |= SF_HTX; s->flags |= SF_HTX;
} }
@ -539,10 +535,9 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0) if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
goto out_fail_accept; goto out_fail_accept;
s->si[1].l7_buffer = BUF_NULL;
/* finish initialization of the accepted file descriptor */ /* finish initialization of the accepted file descriptor */
if (cs_appctx(cs)) if (cs_appctx(cs))
si_want_get(&s->si[0]); si_want_get(cs_si(s->csf));
if (sess->fe->accept && sess->fe->accept(s) < 0) if (sess->fe->accept && sess->fe->accept(s) < 0)
goto out_fail_accept; goto out_fail_accept;
@ -571,13 +566,12 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
/* Error unrolling */ /* Error unrolling */
out_fail_accept: out_fail_accept:
flt_stream_release(s, 0); flt_stream_release(s, 0);
tasklet_free(s->si[1].wait_event.tasklet);
LIST_DELETE(&s->list); LIST_DELETE(&s->list);
out_fail_reset_si1: out_fail_attach_csb:
tasklet_free(s->si[0].wait_event.tasklet); si_free(cs_si(s->csf));
out_fail_reset_si0: out_fail_attach_csf:
si_release_endpoint(&s->si[1]); cs_free(s->csb);
out_fail_alloc_cs: out_fail_alloc_csb:
task_destroy(t); task_destroy(t);
out_fail_alloc: out_fail_alloc:
pool_free(pool_head_stream, s); pool_free(pool_head_stream, s);
@ -722,23 +716,15 @@ static void stream_free(struct stream *s)
/* FIXME: Handle it in appctx_free ??? */ /* FIXME: Handle it in appctx_free ??? */
must_free_sess = objt_appctx(sess->origin) && sess->origin == s->csf->end; must_free_sess = objt_appctx(sess->origin) && sess->origin == s->csf->end;
/* FIXME: ATTENTION, si CSF est librérer avant, ça plante !!!! */
cs_destroy(s->csb);
cs_destroy(s->csf);
si_release_endpoint(cs_si(s->csb));
si_release_endpoint(cs_si(s->csf));
tasklet_free(s->si[0].wait_event.tasklet);
tasklet_free(s->si[1].wait_event.tasklet);
b_free(&s->si[1].l7_buffer);
if (must_free_sess) { if (must_free_sess) {
sess->origin = NULL; sess->origin = NULL;
session_free(sess); session_free(sess);
} }
sockaddr_free(&s->si[0].src);
sockaddr_free(&s->si[0].dst);
sockaddr_free(&s->si[1].src);
sockaddr_free(&s->si[1].dst);
pool_free(pool_head_stream, s); pool_free(pool_head_stream, s);
/* We may want to free the maximum amount of pools if the proxy is stopping */ /* We may want to free the maximum amount of pools if the proxy is stopping */
@ -2187,7 +2173,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
} }
} }
else { else {
si_reset_endpoint(si_b); cs_detach_endp(s->csb);
si_b->state = SI_ST_CLO; /* shutw+ini = abort */ si_b->state = SI_ST_CLO; /* shutw+ini = abort */
channel_shutw_now(req); /* fix buffer flags upon abort */ channel_shutw_now(req); /* fix buffer flags upon abort */
channel_shutr_now(res); channel_shutr_now(res);
@ -3157,7 +3143,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
chunk_appendf(&trash, chunk_appendf(&trash,
" flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n", " flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n",
strm->flags, strm->si[1].conn_retries, strm->srv_conn, strm->pend_pos, strm->flags, strm->csb->si->conn_retries, strm->srv_conn, strm->pend_pos,
LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch); LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch);
chunk_appendf(&trash, chunk_appendf(&trash,
@ -3253,29 +3239,29 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
chunk_appendf(&trash, chunk_appendf(&trash,
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s et=0x%03x sub=%d)\n", " si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s et=0x%03x sub=%d)\n",
&strm->si[0], strm->csf->si,
si_state_str(strm->si[0].state), si_state_str(strm->csf->si->state),
strm->si[0].flags, strm->csf->si->flags,
obj_type_name(strm->csf->end), obj_type_name(strm->csf->end),
obj_base_ptr(strm->csf->end), obj_base_ptr(strm->csf->end),
strm->si[0].exp ? strm->csf->si->exp ?
tick_is_expired(strm->si[0].exp, now_ms) ? "<PAST>" : tick_is_expired(strm->csf->si->exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->si[0].exp - now_ms), human_time(TICKS_TO_MS(strm->csf->si->exp - now_ms),
TICKS_TO_MS(1000)) : "<NEVER>", TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[0].err_type, strm->si[0].wait_event.events); strm->csf->si->err_type, strm->csf->si->wait_event.events);
chunk_appendf(&trash, chunk_appendf(&trash,
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s et=0x%03x sub=%d)\n", " si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s et=0x%03x sub=%d)\n",
&strm->si[1], strm->csb->si,
si_state_str(strm->si[1].state), si_state_str(strm->csb->si->state),
strm->si[1].flags, strm->csb->si->flags,
obj_type_name(strm->csb->end), obj_type_name(strm->csb->end),
obj_base_ptr(strm->csb->end), obj_base_ptr(strm->csb->end),
strm->si[1].exp ? strm->csb->si->exp ?
tick_is_expired(strm->si[1].exp, now_ms) ? "<PAST>" : tick_is_expired(strm->csb->si->exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->si[1].exp - now_ms), human_time(TICKS_TO_MS(strm->csb->si->exp - now_ms),
TICKS_TO_MS(1000)) : "<NEVER>", TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[1].err_type, strm->si[1].wait_event.events); strm->csb->si->err_type, strm->csb->si->wait_event.events);
cs = strm->csf; cs = strm->csf;
chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx);
@ -3650,21 +3636,21 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
conn = cs_conn(curr_strm->csf); conn = cs_conn(curr_strm->csf);
chunk_appendf(&trash, chunk_appendf(&trash,
" s0=[%d,%1xh,fd=%d,ex=%s]", " s0=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[0].state, curr_strm->csf->si->state,
curr_strm->si[0].flags, curr_strm->csf->si->flags,
conn ? conn->handle.fd : -1, conn ? conn->handle.fd : -1,
curr_strm->si[0].exp ? curr_strm->csf->si->exp ?
human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms), human_time(TICKS_TO_MS(curr_strm->csf->si->exp - now_ms),
TICKS_TO_MS(1000)) : ""); TICKS_TO_MS(1000)) : "");
conn = cs_conn(curr_strm->csb); conn = cs_conn(curr_strm->csb);
chunk_appendf(&trash, chunk_appendf(&trash,
" s1=[%d,%1xh,fd=%d,ex=%s]", " s1=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[1].state, curr_strm->csb->si->state,
curr_strm->si[1].flags, curr_strm->csb->si->flags,
conn ? conn->handle.fd : -1, conn ? conn->handle.fd : -1,
curr_strm->si[1].exp ? curr_strm->csb->si->exp ?
human_time(TICKS_TO_MS(curr_strm->si[1].exp - now_ms), human_time(TICKS_TO_MS(curr_strm->csb->si->exp - now_ms),
TICKS_TO_MS(1000)) : ""); TICKS_TO_MS(1000)) : "");
chunk_appendf(&trash, chunk_appendf(&trash,

View File

@ -28,6 +28,7 @@
#include <haproxy/http_htx.h> #include <haproxy/http_htx.h>
#include <haproxy/pipe-t.h> #include <haproxy/pipe-t.h>
#include <haproxy/pipe.h> #include <haproxy/pipe.h>
#include <haproxy/pool.h>
#include <haproxy/proxy.h> #include <haproxy/proxy.h>
#include <haproxy/stream-t.h> #include <haproxy/stream-t.h>
#include <haproxy/stream_interface.h> #include <haproxy/stream_interface.h>
@ -36,6 +37,9 @@
#include <haproxy/tools.h> #include <haproxy/tools.h>
DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface));
/* functions used by default on a detached stream-interface */ /* functions used by default on a detached stream-interface */
static void stream_int_shutr(struct stream_interface *si); static void stream_int_shutr(struct stream_interface *si);
static void stream_int_shutw(struct stream_interface *si); static void stream_int_shutw(struct stream_interface *si);
@ -98,6 +102,35 @@ struct data_cb si_conn_cb = {
.name = "STRM", .name = "STRM",
}; };
struct stream_interface *si_new(struct conn_stream *cs)
{
struct stream_interface *si;
si = pool_alloc(pool_head_streaminterface);
if (unlikely(!si))
return NULL;
si->flags = SI_FL_NONE;
if (si_reset(si) < 0) {
pool_free(pool_head_streaminterface, si);
return NULL;
}
si->cs = cs;
return si;
}
void si_free(struct stream_interface *si)
{
if (!si)
return;
b_free(&si->l7_buffer);
tasklet_free(si->wait_event.tasklet);
sockaddr_free(&si->src);
sockaddr_free(&si->dst);
pool_free(pool_head_streaminterface, si);
}
/* /*
* This function only has to be called once after a wakeup event in case of * This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets * suspected timeout. It controls the stream interface timeouts and sets
@ -309,7 +342,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a
appctx = appctx_new(app); appctx = appctx_new(app);
if (!appctx) if (!appctx)
return NULL; return NULL;
si_attach_appctx(si, appctx); cs_attach_endp(si->cs, &appctx->obj_type, appctx);
appctx->t->nice = si_strm(si)->task->nice; appctx->t->nice = si_strm(si)->task->nice;
si_cant_get(si); si_cant_get(si);
appctx_wakeup(appctx); appctx_wakeup(appctx);

View File

@ -288,7 +288,7 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct
* is present, returning with ERR will cause lingering to be disabled. * is present, returning with ERR will cause lingering to be disabled.
*/ */
if (strm) if (strm)
strm->si[0].flags |= SI_FL_NOLINGER; strm->csf->si->flags |= SI_FL_NOLINGER;
/* We're on the client-facing side, we must force to disable lingering to /* We're on the client-facing side, we must force to disable lingering to
* ensure we will use an RST exclusively and kill any pending data. * ensure we will use an RST exclusively and kill any pending data.

View File

@ -65,7 +65,7 @@ smp_fetch_src(const struct arg *args, struct sample *smp, const char *kw, void *
src = conn_src(conn); src = conn_src(conn);
} }
else /* src */ else /* src */
src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess)); src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src) if (!src)
return 0; return 0;
@ -109,7 +109,7 @@ smp_fetch_sport(const struct arg *args, struct sample *smp, const char *kw, void
src = conn_src(conn); src = conn_src(conn);
} }
else /* src_port */ else /* src_port */
src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess)); src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src) if (!src)
return 0; return 0;
@ -144,7 +144,7 @@ smp_fetch_dst(const struct arg *args, struct sample *smp, const char *kw, void *
dst = conn_dst(conn); dst = conn_dst(conn);
} }
else /* dst */ else /* dst */
dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess)); dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst) if (!dst)
return 0; return 0;
@ -181,7 +181,7 @@ int smp_fetch_dst_is_local(const struct arg *args, struct sample *smp, const cha
dst = conn_dst(conn); dst = conn_dst(conn);
} }
else /* dst_is_local */ else /* dst_is_local */
dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess)); dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst) if (!dst)
return 0; return 0;
@ -207,7 +207,7 @@ int smp_fetch_src_is_local(const struct arg *args, struct sample *smp, const cha
src = conn_src(conn); src = conn_src(conn);
} }
else /* src_is_local */ else /* src_is_local */
src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess)); src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src) if (!src)
return 0; return 0;
@ -240,7 +240,7 @@ smp_fetch_dport(const struct arg *args, struct sample *smp, const char *kw, void
dst = conn_dst(conn); dst = conn_dst(conn);
} }
else /* dst_port */ else /* dst_port */
dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess)); dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst) if (!dst)
return 0; return 0;

View File

@ -1093,7 +1093,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
/* No connection, prepare a new one */ /* No connection, prepare a new one */
conn = conn_new((s ? &s->obj_type : &proxy->obj_type)); conn = conn_new((s ? &s->obj_type : &proxy->obj_type));
if (conn) if (conn)
cs = cs_new(&conn->obj_type, conn, &check->obj_type, NULL, &check_conn_cb); cs = cs_new();
if (!conn || !cs) { if (!conn || !cs) {
chunk_printf(&trash, "TCPCHK error allocating connection at step %d", chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
tcpcheck_get_step_id(check, rule)); tcpcheck_get_step_id(check, rule));
@ -1106,7 +1106,18 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
conn_free(conn); conn_free(conn);
goto out; goto out;
} }
cs_attach_endp(cs, &conn->obj_type, conn);
if (cs_attach_app(cs, &check->obj_type) < 0) {
chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
tcpcheck_get_step_id(check, rule));
if (rule->comment)
chunk_appendf(&trash, " comment: '%s'", rule->comment);
set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area);
ret = TCPCHK_EVAL_STOP;
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
cs_destroy(cs);
goto out;
}
tasklet_set_tid(check->wait_list.tasklet, tid); tasklet_set_tid(check->wait_list.tasklet, tid);
check->cs = cs; check->cs = cs;