MAJOR: stream/conn_stream: Move the stream-interface into the conn-stream

Thanks to all previous changes, it is now possible to move the
stream-interface into the conn-stream. To do so, some SI functions are
removed and their conn-stream counterparts are added. In addition, the
conn-stream is now responsible to create and release the
stream-interface. While the stream-interfaces were inlined in the stream
structure, there is now a pointer in the conn-stream. stream-interfaces are
now dynamically allocated. Thus a dedicated pool is added. It is a temporary
change because, at the end, the stream-interface structure will most
probably disappear.
This commit is contained in:
Christopher Faulet 2021-12-23 17:28:17 +01:00
parent 9a86f6399f
commit cda94accb1
27 changed files with 261 additions and 242 deletions

View File

@ -68,18 +68,18 @@ static inline struct stream *chn_strm(const struct channel *chn)
static inline struct stream_interface *chn_prod(const struct channel *chn)
{
if (chn->flags & CF_ISRESP)
return &LIST_ELEM(chn, struct stream *, res)->si[1];
return LIST_ELEM(chn, struct stream *, res)->csb->si;
else
return &LIST_ELEM(chn, struct stream *, req)->si[0];
return LIST_ELEM(chn, struct stream *, req)->csf->si;
}
/* returns a pointer to the stream interface consuming the channel (producer) */
static inline struct stream_interface *chn_cons(const struct channel *chn)
{
if (chn->flags & CF_ISRESP)
return &LIST_ELEM(chn, struct stream *, res)->si[0];
return LIST_ELEM(chn, struct stream *, res)->csf->si;
else
return &LIST_ELEM(chn, struct stream *, req)->si[1];
return LIST_ELEM(chn, struct stream *, req)->csb->si;
}
/* c_orig() : returns the pointer to the channel buffer's origin */

View File

@ -25,6 +25,8 @@
#include <haproxy/obj_type-t.h>
struct stream_interface;
/* conn_stream flags */
enum {
CS_FL_NONE = 0x00000000, /* Just for initialization purposes */
@ -92,7 +94,7 @@ struct conn_stream {
unsigned int flags; /* CS_FL_* */
enum obj_type *end; /* points to the end point (connection or appctx) */
enum obj_type *app; /* points to the applicative point (stream or check) */
void *data; /* pointer to upper layer's entity (eg: stream interface) */
struct stream_interface *si;
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
void *ctx; /* mux-specific context */
};

View File

@ -29,13 +29,16 @@
#include <haproxy/obj_type.h>
struct stream;
struct stream_interface;
struct check;
#define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(cs_conn(cs)))
struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb);
struct conn_stream *cs_new();
void cs_free(struct conn_stream *cs);
void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx);
int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
void cs_detach_endp(struct conn_stream *cs);
/*
* Initializes all required fields for a new conn_strema.
@ -47,7 +50,7 @@ static inline void cs_init(struct conn_stream *cs)
cs->end = NULL;
cs->app = NULL;
cs->ctx = NULL;
cs->data = NULL;
cs->si = NULL;
cs->data_cb = NULL;
}
@ -77,11 +80,6 @@ static inline struct appctx *cs_appctx(const struct conn_stream *cs)
return (cs ? objt_appctx(cs->end) : NULL);
}
static inline struct stream_interface *cs_si(const struct conn_stream *cs)
{
return (cs ? cs->data : NULL);
}
static inline struct stream *cs_strm(const struct conn_stream *cs)
{
return (cs ? objt_stream(cs->app) : NULL);
@ -92,57 +90,9 @@ static inline struct check *cs_check(const struct conn_stream *cs)
return (cs ? objt_check(cs->app) : NULL);
}
/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */
static inline void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
static inline struct stream_interface *cs_si(const struct conn_stream *cs)
{
cs->end = endp;
cs->ctx = ctx;
}
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
static inline void cs_attach_app(struct conn_stream *cs, enum obj_type *app, void *data, const struct data_cb *data_cb)
{
cs->app = app;
cs->data = data;
cs->data_cb = data_cb;
}
/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux
* owns the connection ->detach() callback is called. Otherwise, it means the
* conn-stream owns the connection. In this case the connection is closed and
* released. For an applet, the appctx is released. At the end, the conn-stream
* is not released but some fields a reset.
*/
static inline void cs_detach_endp(struct conn_stream *cs)
{
struct connection *conn;
struct appctx *appctx;
if ((conn = cs_conn(cs))) {
if (conn->mux)
conn->mux->detach(cs);
else {
/* It's too early to have a mux, let's just destroy
* the connection
*/
conn_stop_tracking(conn);
conn_full_close(conn);
if (conn->destroy_cb)
conn->destroy_cb(conn);
conn_free(conn);
}
}
else if ((appctx = cs_appctx(cs))) {
if (appctx->applet->release)
appctx->applet->release(appctx);
appctx_free(appctx);
}
/* Rest CS */
cs->flags = CS_FL_NONE;
cs->end = NULL;
cs->ctx = NULL;
cs->data_cb = NULL;
return (cs_strm(cs) ? cs->si : NULL);
}
/* Release a conn_stream */

View File

@ -167,7 +167,6 @@ struct stream {
struct conn_stream *csf; /* frontend conn-stream */
struct conn_stream *csb; /* backend conn-stream */
struct stream_interface si[2]; /* client and server stream interfaces */
struct strm_logs logs; /* logs for this stream */
void (*do_log)(struct stream *s); /* the function to call in order to log (or NULL) */

View File

@ -33,6 +33,10 @@ extern struct si_ops si_embedded_ops;
extern struct si_ops si_conn_ops;
extern struct si_ops si_applet_ops;
extern struct data_cb si_conn_cb;
extern struct data_cb check_conn_cb;
struct stream_interface *si_new(struct conn_stream *cs);
void si_free(struct stream_interface *si);
/* main event functions used to move data between sockets and buffers */
int si_check_timeouts(struct stream_interface *si);
@ -87,7 +91,7 @@ static inline struct task *si_task(struct stream_interface *si)
/* returns the stream interface on the other side. Used during forwarding. */
static inline struct stream_interface *si_opposite(struct stream_interface *si)
{
return ((si->flags & SI_FL_ISBACK) ? &(cs_strm(si->cs)->si[0]) : &(cs_strm(si->cs)->si[1]));
return ((si->flags & SI_FL_ISBACK) ? cs_strm(si->cs)->csf->si : cs_strm(si->cs)->csb->si);
}
/* initializes a stream interface in the SI_ST_INI state. It's detached from
@ -105,6 +109,7 @@ static inline int si_reset(struct stream_interface *si)
si->cs = NULL;
si->state = si->prev_state = SI_ST_INI;
si->ops = &si_embedded_ops;
si->l7_buffer = BUF_NULL;
si->wait_event.tasklet = tasklet_new();
if (!si->wait_event.tasklet)
return -1;
@ -137,81 +142,6 @@ static inline int si_state_in(enum si_state state, enum si_state_bit mask)
return !!(si_state_bit(state) & mask);
}
/* Reset the endpoint detaching it from the conn-stream. For a connection
* attached to a mux, it is unsubscribe from any event.
*/
static inline void si_reset_endpoint(struct stream_interface *si)
{
if (!si->cs)
return;
if (cs_conn_mux(si->cs) && si->wait_event.events != 0)
(cs_conn_mux(si->cs))->unsubscribe(si->cs, si->wait_event.events, &si->wait_event);
cs_detach_endp(si->cs);
si->ops = &si_embedded_ops;
}
/* Release the endpoint if it's a connection or an applet, then nullify it.
* Note: released connections are closed then freed.
*/
static inline void si_release_endpoint(struct stream_interface *si)
{
if (!si->cs)
return;
si_reset_endpoint(si);
cs_free(si->cs);
si->cs = NULL;
si->ops = &si_embedded_ops;
}
/* Attach conn_stream <cs> to the stream interface <si>. */
static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs)
{
si->cs = cs;
if (cs_conn(cs)) {
si->ops = &si_conn_ops;
cs_attach_app(cs, &si_strm(si)->obj_type, si, &si_conn_cb);
}
else if (cs_appctx(cs)) {
struct appctx *appctx = cs_appctx(cs);
si->ops = &si_applet_ops;
appctx->owner = cs;
cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL);
}
else {
si->ops = &si_embedded_ops;
cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL);
}
}
/* Attach connection <conn> to the stream interface <si>. The stream interface
* is configured to work with a connection context.
*/
static inline void si_attach_conn(struct stream_interface *si, struct connection *conn)
{
si_reset_endpoint(si);
if (!conn->ctx)
conn->ctx = si->cs;
si->ops = &si_conn_ops;
cs_attach_endp(si->cs, &conn->obj_type, conn);
cs_attach_app(si->cs, &si_strm(si)->obj_type, si, &si_conn_cb);
}
/* Attach appctx <appctx> to the stream interface <si>. The stream interface
* is configured to work with an applet context.
*/
static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx)
{
si_reset_endpoint(si);
appctx->owner = si->cs;
si->ops = &si_applet_ops;
cs_attach_endp(si->cs, &appctx->obj_type, appctx);
cs_attach_app(si->cs, &si_strm(si)->obj_type, si, NULL);
}
/* call the applet's release function if any. Needs to be called upon close() */
static inline void si_applet_release(struct stream_interface *si)
{

View File

@ -1495,9 +1495,9 @@ int connect_server(struct stream *s)
}
if (avail >= 1) {
si_attach_conn(cs_si(s->csb), srv_conn);
cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
si_reset_endpoint(cs_si(s->csb));
cs_detach_endp(s->csb);
srv_conn = NULL;
}
}
@ -1571,7 +1571,7 @@ int connect_server(struct stream *s)
return SF_ERR_INTERNAL; /* how did we get there ? */
}
si_attach_conn(cs_si(s->csb), srv_conn);
cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
if (!srv ||
(srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
@ -2289,7 +2289,7 @@ void back_handle_st_cer(struct stream *s)
* Note: the stream-interface will be switched to ST_REQ, ST_ASS or
* ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset.
*/
si_reset_endpoint(cs_si(s->csb));
cs_detach_endp(s->csb);
stream_choose_redispatch(s);

View File

@ -2715,7 +2715,7 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
* connection.
*/
if (!si_conn_ready(cs_si(s->csb))) {
si_reset_endpoint(cs_si(s->csb));
cs_detach_endp(s->csb);
s->srv_conn = NULL;
}

View File

@ -14,7 +14,7 @@
#include <haproxy/connection.h>
#include <haproxy/conn_stream.h>
#include <haproxy/pool.h>
//#include <haproxy/stream_interface.h>
#include <haproxy/stream_interface.h>
DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
@ -22,7 +22,7 @@ DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
/* Tries to allocate a new conn_stream and initialize its main fields. On
* failure, nothing is allocated and NULL is returned.
*/
struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb)
struct conn_stream *cs_new()
{
struct conn_stream *cs;
@ -30,8 +30,6 @@ struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, v
if (unlikely(!cs))
return NULL;
cs_init(cs);
cs_attach_endp(cs, endp, ctx);
cs_attach_app(cs, app, data, data_cb);
return cs;
}
@ -40,5 +38,106 @@ struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, v
*/
void cs_free(struct conn_stream *cs)
{
si_free(cs->si);
pool_free(pool_head_connstream, cs);
}
/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */
void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
{
struct connection *conn;
struct appctx *appctx;
cs->end = endp;
cs->ctx = ctx;
if ((conn = objt_conn(endp)) != NULL) {
if (!conn->ctx)
conn->ctx = cs;
if (cs_strm(cs)) {
cs->si->ops = &si_conn_ops;
cs->data_cb = &si_conn_cb;
}
else if (cs_check(cs))
cs->data_cb = &check_conn_cb;
}
else if ((appctx = objt_appctx(endp)) != NULL) {
appctx->owner = cs;
if (cs->si) {
cs->si->ops = &si_applet_ops;
cs->data_cb = NULL;
}
}
}
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
int cs_attach_app(struct conn_stream *cs, enum obj_type *app)
{
cs->app = app;
if (objt_stream(app)) {
if (!cs->si)
cs->si = si_new(cs);
if (unlikely(!cs->si))
return -1;
if (cs_conn(cs)) {
cs->si->ops = &si_conn_ops;
cs->data_cb = &si_conn_cb;
}
else if (cs_appctx(cs)) {
cs->si->ops = &si_applet_ops;
cs->data_cb = NULL;
}
else {
cs->si->ops = &si_embedded_ops;
cs->data_cb = NULL;
}
}
else if (objt_check(app))
cs->data_cb = &check_conn_cb;
return 0;
}
/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux
* owns the connection ->detach() callback is called. Otherwise, it means the
* conn-stream owns the connection. In this case the connection is closed and
* released. For an applet, the appctx is released. At the end, the conn-stream
* is not released but some fields a reset.
*/
void cs_detach_endp(struct conn_stream *cs)
{
struct connection *conn;
struct appctx *appctx;
if ((conn = cs_conn(cs))) {
if (conn->mux) {
if (cs->si && cs->si->wait_event.events != 0)
conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
conn->mux->detach(cs);
}
else {
/* It's too early to have a mux, let's just destroy
* the connection
*/
conn_stop_tracking(conn);
conn_full_close(conn);
if (conn->destroy_cb)
conn->destroy_cb(conn);
conn_free(conn);
}
}
else if ((appctx = cs_appctx(cs))) {
if (appctx->applet->release)
appctx->applet->release(appctx);
appctx_free(appctx);
}
/* Rest CS */
cs->flags = CS_FL_NONE;
cs->end = NULL;
cs->ctx = NULL;
if (cs->si)
cs->si->ops = &si_embedded_ops;
cs->data_cb = NULL;
}

View File

@ -1738,8 +1738,8 @@ static int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct
memcpy(hdr->sig, pp2_signature, PP2_SIGNATURE_LEN);
if (strm) {
src = si_src(&strm->si[0]);
dst = si_dst(&strm->si[0]);
src = si_src(strm->csf->si);
dst = si_dst(strm->csf->si);
}
else if (remote && conn_get_src(remote) && conn_get_dst(remote)) {
src = conn_src(remote);
@ -1937,8 +1937,8 @@ int make_proxy_line(char *buf, int buf_len, struct server *srv, struct connectio
const struct sockaddr_storage *dst = NULL;
if (strm) {
src = si_src(&strm->si[0]);
dst = si_dst(&strm->si[0]);
src = si_src(strm->csf->si);
dst = si_dst(strm->csf->si);
}
else if (remote && conn_get_src(remote) && conn_get_dst(remote)) {
src = conn_src(remote);

View File

@ -903,11 +903,12 @@ static struct appctx *dns_session_create(struct dns_session *ds)
goto out_free_appctx;
}
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
cs = cs_new();
if (!cs) {
ha_alert("out of memory in dns_session_create().\n");
goto out_free_sess;
}
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in dns_session_create().\n");

View File

@ -2024,9 +2024,10 @@ spoe_create_appctx(struct spoe_config *conf)
if (!sess)
goto out_free_spoe;
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
cs = cs_new();
if (!cs)
goto out_free_sess;
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
goto out_free_cs;
@ -2034,7 +2035,7 @@ spoe_create_appctx(struct spoe_config *conf)
stream_set_backend(strm, conf->agent->b.be);
/* applet is waiting for data */
si_cant_get(&strm->si[0]);
si_cant_get(strm->csf->si);
appctx_wakeup(appctx);
strm->do_log = NULL;

View File

@ -176,9 +176,10 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
if (fin)
htx->flags |= HTX_FL_EOM;
cs = cs_new(qcs->qcc->conn->obj_type);
cs = cs_new();
if (!cs)
return 1;
cs_attach_endp(&qcs->qcc->conn->obj_type, qcs);
cs->flags |= CS_FL_NOT_FIRST;
cs->ctx = qcs;

View File

@ -2961,11 +2961,12 @@ __LJMP static int hlua_socket_new(lua_State *L)
goto out_fail_appctx;
}
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
cs = cs_new();
if (!cs) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_sess;
}
cs_attach_endp(cs, &appctx->obj_type, appctx);
strm = stream_new(sess, cs, &BUF_NULL);
if (!strm) {
@ -2980,7 +2981,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
* and retrieve data from the server. The connection is initialized
* with the "struct server".
*/
si_set_state(&strm->si[1], SI_ST_ASS);
si_set_state(strm->csb->si, SI_ST_ASS);
/* Force destination server. */
strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;

View File

@ -72,10 +72,10 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
htx_add_endof(htx, HTX_BLK_EOH);
htx_to_buf(htx, &htx_buf);
cs = cs_new(&qcs->qcc->conn->obj_type);
cs = cs_new();
if (!cs)
return -1;
cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs);
cs->ctx = qcs;
stream_create_from_cs(cs, &htx_buf);

View File

@ -1257,7 +1257,7 @@ static __inline int do_l7_retry(struct stream *s, struct stream_interface *si)
res->to_forward = 0;
res->analyse_exp = TICK_ETERNITY;
res->total = 0;
si_reset_endpoint(cs_si(s->csb));
cs_detach_endp(s->csb);
b_free(&req->buf);
/* Swap the L7 buffer with the channel buffer */

View File

@ -486,11 +486,12 @@ struct appctx *httpclient_start(struct httpclient *hc)
ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_appctx;
}
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
cs = cs_new();
if (!cs) {
ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_sess;
}
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) {
ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_cs;

View File

@ -1186,7 +1186,7 @@ static int smp_fetch_base32(const struct arg *args, struct sample *smp, const ch
*/
static int smp_fetch_base32_src(const struct arg *args, struct sample *smp, const char *kw, void *private)
{
const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL);
const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL);
struct buffer *temp;
if (!src)
@ -2053,7 +2053,7 @@ static int smp_fetch_url32(const struct arg *args, struct sample *smp, const cha
*/
static int smp_fetch_url32_src(const struct arg *args, struct sample *smp, const char *kw, void *private)
{
const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL);
const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL);
struct buffer *temp;
if (!src)

View File

@ -682,11 +682,12 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
struct conn_stream *cs;
TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
cs = cs_new(&h1c->conn->obj_type, h1s, NULL, NULL, NULL);
cs = cs_new();
if (!cs) {
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
goto err;
}
cs_attach_endp(cs, &h1c->conn->obj_type, h1s);
h1s->cs = cs;
if (h1s->flags & H1S_F_NOT_FIRST)

View File

@ -1529,11 +1529,11 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
if (!h2s)
goto out;
cs = cs_new(&h2c->conn->obj_type, h2s, NULL, NULL, NULL);
cs = cs_new();
if (!cs)
goto out_close;
cs->flags |= CS_FL_NOT_FIRST;
cs_attach_endp(cs, &h2c->conn->obj_type, h2s);
h2s->cs = cs;
h2c->nb_cs++;

View File

@ -291,11 +291,12 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
ctx->conn = conn;
if (!cs) {
cs = cs_new(&conn->obj_type, NULL, NULL, NULL, NULL);
cs = cs_new();
if (!cs) {
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
goto fail_free_ctx;
}
cs_attach_endp(cs, &conn->obj_type, NULL);
if (!stream_new(conn->owner, cs, &BUF_NULL)) {
TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);

View File

@ -3204,11 +3204,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
goto out_free_appctx;
}
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
cs = cs_new();
if (!cs) {
ha_alert("out of memory in peer_session_create().\n");
goto out_free_sess;
}
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in peer_session_create().\n");

View File

@ -655,11 +655,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
goto out_free_appctx;
}
cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
cs = cs_new();
if (!cs) {
ha_alert("out of memory in sink_forward_session_create");
goto out_free_sess;
}
cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in sink_forward_session_create().\n");

View File

@ -438,30 +438,26 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
if (sess->fe->mode == PR_MODE_HTTP)
s->flags |= SF_HTX;
cs->app = &s->obj_type;
s->csf = cs;
s->csb = cs_new(NULL, NULL, &s->obj_type, &s->si[1], NULL);
s->csb = cs_new();
if (!s->csb)
goto out_fail_alloc_cs;
goto out_fail_alloc_csb;
s->si[0].flags = SI_FL_NONE;
if (si_reset(&s->si[0]) < 0)
goto out_fail_reset_si0;
si_attach_cs(&s->si[0], s->csf);
si_set_state(&s->si[0], SI_ST_EST);
s->si[0].hcto = sess->fe->timeout.clientfin;
if (cs_attach_app(s->csf, &s->obj_type) < 0)
goto out_fail_attach_csf;
if (cs_attach_app(s->csb, &s->obj_type) < 0)
goto out_fail_attach_csb;
si_set_state(cs_si(s->csf), SI_ST_EST);
cs_si(s->csf)->hcto = sess->fe->timeout.clientfin;
if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
s->si[0].flags |= SI_FL_INDEP_STR;
s->si[1].flags = SI_FL_ISBACK;
if (si_reset(&s->si[1]) < 0)
goto out_fail_reset_si1;
si_attach_cs(&s->si[1], s->csb);
s->si[1].hcto = TICK_ETERNITY;
cs_si(s->csf)->flags |= SI_FL_INDEP_STR;
cs_si(s->csb)->flags = SI_FL_ISBACK;
cs_si(s->csb)->hcto = TICK_ETERNITY;
if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
s->si[1].flags |= SI_FL_INDEP_STR;
cs_si(s->csb)->flags |= SI_FL_INDEP_STR;
if (cs->flags & CS_FL_WEBSOCKET)
s->flags |= SF_WEBSOCKET;
@ -470,7 +466,7 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
if (mux) {
if (mux->flags & MX_FL_CLEAN_ABRT)
s->si[0].flags |= SI_FL_CLEAN_ABRT;
cs_si(s->csf)->flags |= SI_FL_CLEAN_ABRT;
if (mux->flags & MX_FL_HTX)
s->flags |= SF_HTX;
}
@ -539,10 +535,9 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
goto out_fail_accept;
s->si[1].l7_buffer = BUF_NULL;
/* finish initialization of the accepted file descriptor */
if (cs_appctx(cs))
si_want_get(&s->si[0]);
si_want_get(cs_si(s->csf));
if (sess->fe->accept && sess->fe->accept(s) < 0)
goto out_fail_accept;
@ -571,13 +566,12 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
/* Error unrolling */
out_fail_accept:
flt_stream_release(s, 0);
tasklet_free(s->si[1].wait_event.tasklet);
LIST_DELETE(&s->list);
out_fail_reset_si1:
tasklet_free(s->si[0].wait_event.tasklet);
out_fail_reset_si0:
si_release_endpoint(&s->si[1]);
out_fail_alloc_cs:
out_fail_attach_csb:
si_free(cs_si(s->csf));
out_fail_attach_csf:
cs_free(s->csb);
out_fail_alloc_csb:
task_destroy(t);
out_fail_alloc:
pool_free(pool_head_stream, s);
@ -722,23 +716,15 @@ static void stream_free(struct stream *s)
/* FIXME: Handle it in appctx_free ??? */
must_free_sess = objt_appctx(sess->origin) && sess->origin == s->csf->end;
/* FIXME: ATTENTION, si CSF est librérer avant, ça plante !!!! */
cs_destroy(s->csb);
cs_destroy(s->csf);
si_release_endpoint(cs_si(s->csb));
si_release_endpoint(cs_si(s->csf));
tasklet_free(s->si[0].wait_event.tasklet);
tasklet_free(s->si[1].wait_event.tasklet);
b_free(&s->si[1].l7_buffer);
if (must_free_sess) {
sess->origin = NULL;
session_free(sess);
}
sockaddr_free(&s->si[0].src);
sockaddr_free(&s->si[0].dst);
sockaddr_free(&s->si[1].src);
sockaddr_free(&s->si[1].dst);
pool_free(pool_head_stream, s);
/* We may want to free the maximum amount of pools if the proxy is stopping */
@ -2187,7 +2173,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
}
}
else {
si_reset_endpoint(si_b);
cs_detach_endp(s->csb);
si_b->state = SI_ST_CLO; /* shutw+ini = abort */
channel_shutw_now(req); /* fix buffer flags upon abort */
channel_shutr_now(res);
@ -3157,7 +3143,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
chunk_appendf(&trash,
" flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n",
strm->flags, strm->si[1].conn_retries, strm->srv_conn, strm->pend_pos,
strm->flags, strm->csb->si->conn_retries, strm->srv_conn, strm->pend_pos,
LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch);
chunk_appendf(&trash,
@ -3253,29 +3239,29 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
chunk_appendf(&trash,
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s et=0x%03x sub=%d)\n",
&strm->si[0],
si_state_str(strm->si[0].state),
strm->si[0].flags,
strm->csf->si,
si_state_str(strm->csf->si->state),
strm->csf->si->flags,
obj_type_name(strm->csf->end),
obj_base_ptr(strm->csf->end),
strm->si[0].exp ?
tick_is_expired(strm->si[0].exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->si[0].exp - now_ms),
strm->csf->si->exp ?
tick_is_expired(strm->csf->si->exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->csf->si->exp - now_ms),
TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[0].err_type, strm->si[0].wait_event.events);
strm->csf->si->err_type, strm->csf->si->wait_event.events);
chunk_appendf(&trash,
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s et=0x%03x sub=%d)\n",
&strm->si[1],
si_state_str(strm->si[1].state),
strm->si[1].flags,
strm->csb->si,
si_state_str(strm->csb->si->state),
strm->csb->si->flags,
obj_type_name(strm->csb->end),
obj_base_ptr(strm->csb->end),
strm->si[1].exp ?
tick_is_expired(strm->si[1].exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->si[1].exp - now_ms),
strm->csb->si->exp ?
tick_is_expired(strm->csb->si->exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->csb->si->exp - now_ms),
TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[1].err_type, strm->si[1].wait_event.events);
strm->csb->si->err_type, strm->csb->si->wait_event.events);
cs = strm->csf;
chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx);
@ -3650,21 +3636,21 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
conn = cs_conn(curr_strm->csf);
chunk_appendf(&trash,
" s0=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[0].state,
curr_strm->si[0].flags,
curr_strm->csf->si->state,
curr_strm->csf->si->flags,
conn ? conn->handle.fd : -1,
curr_strm->si[0].exp ?
human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms),
curr_strm->csf->si->exp ?
human_time(TICKS_TO_MS(curr_strm->csf->si->exp - now_ms),
TICKS_TO_MS(1000)) : "");
conn = cs_conn(curr_strm->csb);
chunk_appendf(&trash,
" s1=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[1].state,
curr_strm->si[1].flags,
curr_strm->csb->si->state,
curr_strm->csb->si->flags,
conn ? conn->handle.fd : -1,
curr_strm->si[1].exp ?
human_time(TICKS_TO_MS(curr_strm->si[1].exp - now_ms),
curr_strm->csb->si->exp ?
human_time(TICKS_TO_MS(curr_strm->csb->si->exp - now_ms),
TICKS_TO_MS(1000)) : "");
chunk_appendf(&trash,

View File

@ -28,6 +28,7 @@
#include <haproxy/http_htx.h>
#include <haproxy/pipe-t.h>
#include <haproxy/pipe.h>
#include <haproxy/pool.h>
#include <haproxy/proxy.h>
#include <haproxy/stream-t.h>
#include <haproxy/stream_interface.h>
@ -36,6 +37,9 @@
#include <haproxy/tools.h>
DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface));
/* functions used by default on a detached stream-interface */
static void stream_int_shutr(struct stream_interface *si);
static void stream_int_shutw(struct stream_interface *si);
@ -98,6 +102,35 @@ struct data_cb si_conn_cb = {
.name = "STRM",
};
struct stream_interface *si_new(struct conn_stream *cs)
{
struct stream_interface *si;
si = pool_alloc(pool_head_streaminterface);
if (unlikely(!si))
return NULL;
si->flags = SI_FL_NONE;
if (si_reset(si) < 0) {
pool_free(pool_head_streaminterface, si);
return NULL;
}
si->cs = cs;
return si;
}
void si_free(struct stream_interface *si)
{
if (!si)
return;
b_free(&si->l7_buffer);
tasklet_free(si->wait_event.tasklet);
sockaddr_free(&si->src);
sockaddr_free(&si->dst);
pool_free(pool_head_streaminterface, si);
}
/*
* This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets
@ -309,7 +342,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a
appctx = appctx_new(app);
if (!appctx)
return NULL;
si_attach_appctx(si, appctx);
cs_attach_endp(si->cs, &appctx->obj_type, appctx);
appctx->t->nice = si_strm(si)->task->nice;
si_cant_get(si);
appctx_wakeup(appctx);

View File

@ -288,7 +288,7 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct
* is present, returning with ERR will cause lingering to be disabled.
*/
if (strm)
strm->si[0].flags |= SI_FL_NOLINGER;
strm->csf->si->flags |= SI_FL_NOLINGER;
/* We're on the client-facing side, we must force to disable lingering to
* ensure we will use an RST exclusively and kill any pending data.

View File

@ -65,7 +65,7 @@ smp_fetch_src(const struct arg *args, struct sample *smp, const char *kw, void *
src = conn_src(conn);
}
else /* src */
src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess));
src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src)
return 0;
@ -109,7 +109,7 @@ smp_fetch_sport(const struct arg *args, struct sample *smp, const char *kw, void
src = conn_src(conn);
}
else /* src_port */
src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess));
src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src)
return 0;
@ -144,7 +144,7 @@ smp_fetch_dst(const struct arg *args, struct sample *smp, const char *kw, void *
dst = conn_dst(conn);
}
else /* dst */
dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess));
dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst)
return 0;
@ -181,7 +181,7 @@ int smp_fetch_dst_is_local(const struct arg *args, struct sample *smp, const cha
dst = conn_dst(conn);
}
else /* dst_is_local */
dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess));
dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst)
return 0;
@ -207,7 +207,7 @@ int smp_fetch_src_is_local(const struct arg *args, struct sample *smp, const cha
src = conn_src(conn);
}
else /* src_is_local */
src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess));
src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src)
return 0;
@ -240,7 +240,7 @@ smp_fetch_dport(const struct arg *args, struct sample *smp, const char *kw, void
dst = conn_dst(conn);
}
else /* dst_port */
dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess));
dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst)
return 0;

View File

@ -1093,7 +1093,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
/* No connection, prepare a new one */
conn = conn_new((s ? &s->obj_type : &proxy->obj_type));
if (conn)
cs = cs_new(&conn->obj_type, conn, &check->obj_type, NULL, &check_conn_cb);
cs = cs_new();
if (!conn || !cs) {
chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
tcpcheck_get_step_id(check, rule));
@ -1106,7 +1106,18 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
conn_free(conn);
goto out;
}
cs_attach_endp(cs, &conn->obj_type, conn);
if (cs_attach_app(cs, &check->obj_type) < 0) {
chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
tcpcheck_get_step_id(check, rule));
if (rule->comment)
chunk_appendf(&trash, " comment: '%s'", rule->comment);
set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area);
ret = TCPCHK_EVAL_STOP;
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
cs_destroy(cs);
goto out;
}
tasklet_set_tid(check->wait_list.tasklet, tid);
check->cs = cs;