MEDIUM: conn-stream: No longer access connection field directly

To be able to handle applets as a conn-stream endpoint, we must be prepared
to handle different types of endpoints. First of all, the conn-strream's
connection must no longer be used directly.
This commit is contained in:
Christopher Faulet 2021-12-17 17:28:35 +01:00
parent 1329f2a12a
commit 897d612d68
9 changed files with 171 additions and 122 deletions

View File

@ -30,7 +30,7 @@
extern struct pool_head *pool_head_connstream; extern struct pool_head *pool_head_connstream;
#define IS_HTX_CS(cs) (IS_HTX_CONN((cs)->conn)) #define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(cs_conn(cs)))
struct conn_stream *cs_new(struct connection *conn, void *target); struct conn_stream *cs_new(struct connection *conn, void *target);
void cs_free(struct conn_stream *cs); void cs_free(struct conn_stream *cs);
@ -52,6 +52,16 @@ static inline struct connection *cs_conn(const struct conn_stream *cs)
return cs ? cs->conn : NULL; return cs ? cs->conn : NULL;
} }
/* Returns the mux of the connection from a cs if the endpoint is a
* connection. Otherwise NULL is returned.
*/
static inline const struct mux_ops *cs_conn_mux(const struct conn_stream *cs)
{
const struct connection *conn = cs_conn(cs);
return (conn ? conn->mux : NULL);
}
/* Attaches a conn_stream to a data layer and sets the relevant callbacks */ /* Attaches a conn_stream to a data layer and sets the relevant callbacks */
static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb) static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb)
{ {
@ -66,15 +76,15 @@ static inline void cs_attach(struct conn_stream *cs, void *data, const struct da
*/ */
static inline void cs_detach(struct conn_stream *cs) static inline void cs_detach(struct conn_stream *cs)
{ {
if (cs_conn(cs)) { struct connection *conn;
if (cs->conn->mux)
cs->conn->mux->detach(cs); if ((conn = cs_conn(cs))) {
if (conn->mux)
conn->mux->detach(cs);
else { else {
/* It's too early to have a mux, let's just destroy /* It's too early to have a mux, let's just destroy
* the connection * the connection
*/ */
struct connection *conn = cs->conn;
conn_stop_tracking(conn); conn_stop_tracking(conn);
conn_full_close(conn); conn_full_close(conn);
if (conn->destroy_cb) if (conn->destroy_cb)
@ -102,24 +112,30 @@ static inline const char *cs_get_data_name(const struct conn_stream *cs)
/* shut read */ /* shut read */
static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode) static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{ {
const struct mux_ops *mux;
if (!cs_conn(cs) || cs->flags & CS_FL_SHR) if (!cs_conn(cs) || cs->flags & CS_FL_SHR)
return; return;
/* clean data-layer shutdown */ /* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutr) mux = cs_conn_mux(cs);
cs->conn->mux->shutr(cs, mode); if (mux && mux->shutr)
mux->shutr(cs, mode);
cs->flags |= (mode == CS_SHR_DRAIN) ? CS_FL_SHRD : CS_FL_SHRR; cs->flags |= (mode == CS_SHR_DRAIN) ? CS_FL_SHRD : CS_FL_SHRR;
} }
/* shut write */ /* shut write */
static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode) static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{ {
const struct mux_ops *mux;
if (!cs_conn(cs) || cs->flags & CS_FL_SHW) if (!cs_conn(cs) || cs->flags & CS_FL_SHW)
return; return;
/* clean data-layer shutdown */ /* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutw) mux = cs_conn_mux(cs);
cs->conn->mux->shutw(cs, mode); if (mux && mux->shutw)
mux->shutw(cs, mode);
cs->flags |= (mode == CS_SHW_NORMAL) ? CS_FL_SHWN : CS_FL_SHWS; cs->flags |= (mode == CS_SHW_NORMAL) ? CS_FL_SHWN : CS_FL_SHWS;
} }

View File

@ -175,6 +175,7 @@ static inline enum obj_type *si_detach_endpoint(struct stream_interface *si)
static inline void si_reset_endpoint(struct stream_interface *si) static inline void si_reset_endpoint(struct stream_interface *si)
{ {
struct conn_stream *cs; struct conn_stream *cs;
struct connection *conn;
struct appctx *appctx; struct appctx *appctx;
if (!si->end) if (!si->end)
@ -187,9 +188,9 @@ static inline void si_reset_endpoint(struct stream_interface *si)
si_detach_endpoint(si); si_detach_endpoint(si);
} }
else if ((cs = objt_cs(si->end))) { else if ((cs = objt_cs(si->end))) {
if (cs_conn(cs) && si->wait_event.events != 0) if ((conn = cs_conn(cs)) && si->wait_event.events != 0)
cs->conn->mux->unsubscribe(cs, si->wait_event.events, conn->mux->unsubscribe(cs, si->wait_event.events,
&si->wait_event); &si->wait_event);
cs_detach(cs); cs_detach(cs);
si->ops = &si_embedded_ops; si->ops = &si_embedded_ops;
} }
@ -201,6 +202,7 @@ static inline void si_reset_endpoint(struct stream_interface *si)
static inline void si_release_endpoint(struct stream_interface *si) static inline void si_release_endpoint(struct stream_interface *si)
{ {
struct conn_stream *cs; struct conn_stream *cs;
struct connection *conn;
struct appctx *appctx; struct appctx *appctx;
if (!si->end) if (!si->end)
@ -212,9 +214,9 @@ static inline void si_release_endpoint(struct stream_interface *si)
appctx_free(appctx); appctx_free(appctx);
} }
else if ((cs = objt_cs(si->end))) { else if ((cs = objt_cs(si->end))) {
if (cs_conn(cs) && si->wait_event.events != 0) if ((conn = cs_conn(cs)) && si->wait_event.events != 0)
cs->conn->mux->unsubscribe(cs, si->wait_event.events, conn->mux->unsubscribe(cs, si->wait_event.events,
&si->wait_event); &si->wait_event);
cs_destroy(cs); cs_destroy(cs);
} }
si_detach_endpoint(si); si_detach_endpoint(si);
@ -527,7 +529,7 @@ static inline int si_sync_recv(struct stream_interface *si)
return 0; return 0;
cs = objt_cs(si->end); cs = objt_cs(si->end);
if (!cs_conn(cs) || !cs->conn->mux) if (!cs_conn_mux(cs))
return 0; // only conn_streams are supported return 0; // only conn_streams are supported
if (si->wait_event.events & SUB_RETRY_RECV) if (si->wait_event.events & SUB_RETRY_RECV)
@ -622,10 +624,10 @@ static inline const struct sockaddr_storage *si_src(struct stream_interface *si)
if (!(si->flags & SI_FL_ISBACK)) if (!(si->flags & SI_FL_ISBACK))
return sess_src(strm_sess(si_strm(si))); return sess_src(strm_sess(si_strm(si)));
else { else {
struct conn_stream *cs = objt_cs(si->end); struct connection *conn = cs_conn(objt_cs(si->end));
if (cs_conn(cs)) if (conn)
return conn_src(cs->conn); return conn_src(conn);
} }
return NULL; return NULL;
} }
@ -642,10 +644,10 @@ static inline const struct sockaddr_storage *si_dst(struct stream_interface *si)
if (!(si->flags & SI_FL_ISBACK)) if (!(si->flags & SI_FL_ISBACK))
return sess_dst(strm_sess(si_strm(si))); return sess_dst(strm_sess(si_strm(si)));
else { else {
struct conn_stream *cs = objt_cs(si->end); struct connection *conn = cs_conn(objt_cs(si->end));
if (cs_conn(cs)) if (conn)
return conn_dst(cs->conn); return conn_dst(conn);
} }
return NULL; return NULL;
} }
@ -666,10 +668,10 @@ static inline int si_get_src(struct stream_interface *si)
if (!(si->flags & SI_FL_ISBACK)) if (!(si->flags & SI_FL_ISBACK))
src = sess_src(strm_sess(si_strm(si))); src = sess_src(strm_sess(si_strm(si)));
else { else {
struct conn_stream *cs = objt_cs(si->end); struct connection *conn = cs_conn(objt_cs(si->end));
if (cs_conn(cs)) if (conn)
src = conn_src(cs->conn); src = conn_src(conn);
} }
if (!src) if (!src)
return 0; return 0;
@ -697,10 +699,10 @@ static inline int si_get_dst(struct stream_interface *si)
if (!(si->flags & SI_FL_ISBACK)) if (!(si->flags & SI_FL_ISBACK))
dst = sess_dst(strm_sess(si_strm(si))); dst = sess_dst(strm_sess(si_strm(si)));
else { else {
struct conn_stream *cs = objt_cs(si->end); struct connection *conn = cs_conn(objt_cs(si->end));
if (cs_conn(cs)) if (conn)
dst = conn_dst(cs->conn); dst = conn_dst(conn);
} }
if (!dst) if (!dst)
return 0; return 0;

View File

@ -3609,7 +3609,7 @@ static void fcgi_detach(struct conn_stream *cs)
/* this stream may be blocked waiting for some data to leave, so orphan /* this stream may be blocked waiting for some data to leave, so orphan
* it in this case. * it in this case.
*/ */
if (!(cs->conn->flags & CO_FL_ERROR) && if (!(fconn->conn->flags & CO_FL_ERROR) &&
(fconn->state != FCGI_CS_CLOSED) && (fconn->state != FCGI_CS_CLOSED) &&
(fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) && (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) &&
(fstrm->subs || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) { (fstrm->subs || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) {

View File

@ -3419,9 +3419,9 @@ static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
/* NOTE: Be sure to handle abort (cf. h2_shutr) */ /* NOTE: Be sure to handle abort (cf. h2_shutr) */
if (cs->flags & CS_FL_SHR) if (cs->flags & CS_FL_SHR)
goto end; goto end;
if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutr)
cs->conn->xprt->shutr(cs->conn, cs->conn->xprt_ctx, if (conn_xprt_ready(h1c->conn) && h1c->conn->xprt->shutr)
(mode == CS_SHR_DRAIN)); h1c->conn->xprt->shutr(h1c->conn, h1c->conn->xprt_ctx, (mode == CS_SHR_DRAIN));
end: end:
TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s); TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s);
} }
@ -3464,7 +3464,7 @@ static void h1_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
h1c->flags |= H1C_F_ST_SILENT_SHUT; h1c->flags |= H1C_F_ST_SILENT_SHUT;
if (!b_data(&h1c->obuf)) if (!b_data(&h1c->obuf))
h1_shutw_conn(cs->conn); h1_shutw_conn(h1c->conn);
end: end:
TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s); TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s);
} }
@ -3671,28 +3671,28 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res); struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
int ret = 0; int ret = 0;
TRACE_ENTER(H1_EV_STRM_RECV, cs->conn, h1s, 0, (size_t[]){count}); TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count});
if ((h1m->flags & H1_MF_CHNK) || (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL)) { if ((h1m->flags & H1_MF_CHNK) || (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL)) {
h1c->flags &= ~H1C_F_WANT_SPLICE; h1c->flags &= ~H1C_F_WANT_SPLICE;
TRACE_STATE("Allow xprt rcv_buf on !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, cs->conn, h1s); TRACE_STATE("Allow xprt rcv_buf on !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end; goto end;
} }
h1c->flags |= H1C_F_WANT_SPLICE; h1c->flags |= H1C_F_WANT_SPLICE;
if (h1s_data_pending(h1s)) { if (h1s_data_pending(h1s)) {
TRACE_STATE("flush input buffer before splicing", H1_EV_STRM_RECV, cs->conn, h1s); TRACE_STATE("flush input buffer before splicing", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end; goto end;
} }
if (!h1_recv_allowed(h1c)) { if (!h1_recv_allowed(h1c)) {
TRACE_DEVEL("leaving on !recv_allowed", H1_EV_STRM_RECV, cs->conn, h1s); TRACE_DEVEL("leaving on !recv_allowed", H1_EV_STRM_RECV, h1c->conn, h1s);
goto end; goto end;
} }
if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN) && count > h1m->curr_len) if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN) && count > h1m->curr_len)
count = h1m->curr_len; count = h1m->curr_len;
ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count); ret = h1c->conn->xprt->rcv_pipe(h1c->conn, h1c->conn->xprt_ctx, pipe, count);
if (ret >= 0) { if (ret >= 0) {
if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) { if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) {
if (ret > h1m->curr_len) { if (ret > h1m->curr_len) {
@ -3700,14 +3700,14 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c
h1c->flags |= H1C_F_ST_ERROR; h1c->flags |= H1C_F_ST_ERROR;
cs->flags |= CS_FL_ERROR; cs->flags |= CS_FL_ERROR;
TRACE_ERROR("too much payload, more than announced", TRACE_ERROR("too much payload, more than announced",
H1_EV_RX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, cs->conn, h1s); H1_EV_RX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
goto end; goto end;
} }
h1m->curr_len -= ret; h1m->curr_len -= ret;
if (!h1m->curr_len) { if (!h1m->curr_len) {
h1m->state = H1_MSG_DONE; h1m->state = H1_MSG_DONE;
h1c->flags &= ~H1C_F_WANT_SPLICE; h1c->flags &= ~H1C_F_WANT_SPLICE;
TRACE_STATE("payload fully received", H1_EV_STRM_RECV, cs->conn, h1s); TRACE_STATE("payload fully received", H1_EV_STRM_RECV, h1c->conn, h1s);
} }
} }
HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, ret); HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, ret);
@ -3715,22 +3715,22 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c
} }
end: end:
if (conn_xprt_read0_pending(cs->conn)) { if (conn_xprt_read0_pending(h1c->conn)) {
h1s->flags |= H1S_F_REOS; h1s->flags |= H1S_F_REOS;
h1c->flags &= ~H1C_F_WANT_SPLICE; h1c->flags &= ~H1C_F_WANT_SPLICE;
TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, cs->conn, h1s); TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s);
} }
if (!(h1c->flags & H1C_F_WANT_SPLICE)) { if (!(h1c->flags & H1C_F_WANT_SPLICE)) {
TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_STRM_RECV, h1c->conn, h1s); TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_STRM_RECV, h1c->conn, h1s);
cs->flags &= ~CS_FL_MAY_SPLICE; cs->flags &= ~CS_FL_MAY_SPLICE;
if (!(h1c->wait_event.events & SUB_RETRY_RECV)) { if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, cs->conn, h1s); TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, h1c->conn, h1s);
cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
} }
} }
TRACE_LEAVE(H1_EV_STRM_RECV, cs->conn, h1s, 0, (size_t[]){ret}); TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
return ret; return ret;
} }
@ -3741,37 +3741,37 @@ static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req); struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
int ret = 0; int ret = 0;
TRACE_ENTER(H1_EV_STRM_SEND, cs->conn, h1s, 0, (size_t[]){pipe->data}); TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){pipe->data});
if (b_data(&h1c->obuf)) { if (b_data(&h1c->obuf)) {
if (!(h1c->wait_event.events & SUB_RETRY_SEND)) { if (!(h1c->wait_event.events & SUB_RETRY_SEND)) {
TRACE_STATE("more data to send, subscribing", H1_EV_STRM_SEND, cs->conn, h1s); TRACE_STATE("more data to send, subscribing", H1_EV_STRM_SEND, h1c->conn, h1s);
cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event); h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event);
} }
goto end; goto end;
} }
ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe); ret = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, pipe);
if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) { if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) {
if (ret > h1m->curr_len) { if (ret > h1m->curr_len) {
h1s->flags |= H1S_F_PROCESSING_ERROR; h1s->flags |= H1S_F_PROCESSING_ERROR;
h1c->flags |= H1C_F_ST_ERROR; h1c->flags |= H1C_F_ST_ERROR;
cs->flags |= CS_FL_ERROR; cs->flags |= CS_FL_ERROR;
TRACE_ERROR("too much payload, more than announced", TRACE_ERROR("too much payload, more than announced",
H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, cs->conn, h1s); H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
goto end; goto end;
} }
h1m->curr_len -= ret; h1m->curr_len -= ret;
if (!h1m->curr_len) { if (!h1m->curr_len) {
h1m->state = H1_MSG_DONE; h1m->state = H1_MSG_DONE;
TRACE_STATE("payload fully xferred", H1_EV_TX_DATA|H1_EV_TX_BODY, cs->conn, h1s); TRACE_STATE("payload fully xferred", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s);
} }
} }
HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, ret); HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, ret);
HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, ret); HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, ret);
end: end:
TRACE_LEAVE(H1_EV_STRM_SEND, cs->conn, h1s, 0, (size_t[]){ret}); TRACE_LEAVE(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){ret});
return ret; return ret;
} }
#endif #endif

View File

@ -4289,7 +4289,7 @@ static void h2_detach(struct conn_stream *cs)
/* this stream may be blocked waiting for some data to leave (possibly /* this stream may be blocked waiting for some data to leave (possibly
* an ES or RST frame), so orphan it in this case. * an ES or RST frame), so orphan it in this case.
*/ */
if (!(cs->conn->flags & CO_FL_ERROR) && if (!(h2c->conn->flags & CO_FL_ERROR) &&
(h2c->st0 < H2_CS_ERROR) && (h2c->st0 < H2_CS_ERROR) &&
(h2s->flags & (H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL)) && (h2s->flags & (H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL)) &&
((h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) || h2s->subs)) { ((h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) || h2s->subs)) {

View File

@ -405,8 +405,8 @@ static void mux_pt_destroy_meth(void *ctx)
*/ */
static void mux_pt_detach(struct conn_stream *cs) static void mux_pt_detach(struct conn_stream *cs)
{ {
struct connection *conn = cs->conn; struct connection *conn = cs_conn(cs);
struct mux_pt_ctx *ctx = cs->conn->ctx; struct mux_pt_ctx *ctx = conn->ctx;
TRACE_ENTER(PT_EV_STRM_END, conn, cs); TRACE_ENTER(PT_EV_STRM_END, conn, cs);
@ -440,37 +440,41 @@ static int mux_pt_avail_streams(struct connection *conn)
static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode) static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{ {
TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs); struct connection *conn = cs_conn(cs);
TRACE_ENTER(PT_EV_STRM_SHUT, conn, cs);
if (cs->flags & CS_FL_SHR) if (cs->flags & CS_FL_SHR)
return; return;
cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM); cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutr) if (conn_xprt_ready(conn) && conn->xprt->shutr)
cs->conn->xprt->shutr(cs->conn, cs->conn->xprt_ctx, conn->xprt->shutr(conn, conn->xprt_ctx,
(mode == CS_SHR_DRAIN)); (mode == CS_SHR_DRAIN));
else if (mode == CS_SHR_DRAIN) else if (mode == CS_SHR_DRAIN)
conn_ctrl_drain(cs->conn); conn_ctrl_drain(conn);
if (cs->flags & CS_FL_SHW) if (cs->flags & CS_FL_SHW)
conn_full_close(cs->conn); conn_full_close(conn);
TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs); TRACE_LEAVE(PT_EV_STRM_SHUT, conn, cs);
} }
static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode) static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{ {
TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs); struct connection *conn = cs_conn(cs);
TRACE_ENTER(PT_EV_STRM_SHUT, conn, cs);
if (cs->flags & CS_FL_SHW) if (cs->flags & CS_FL_SHW)
return; return;
if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutw) if (conn_xprt_ready(conn) && conn->xprt->shutw)
cs->conn->xprt->shutw(cs->conn, cs->conn->xprt_ctx, conn->xprt->shutw(conn, conn->xprt_ctx,
(mode == CS_SHW_NORMAL)); (mode == CS_SHW_NORMAL));
if (!(cs->flags & CS_FL_SHR)) if (!(cs->flags & CS_FL_SHR))
conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL)); conn_sock_shutw(conn, (mode == CS_SHW_NORMAL));
else else
conn_full_close(cs->conn); conn_full_close(conn);
TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs); TRACE_LEAVE(PT_EV_STRM_SHUT, conn, cs);
} }
/* /*
@ -488,44 +492,46 @@ static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
*/ */
static size_t mux_pt_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) static size_t mux_pt_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{ {
struct connection *conn = cs_conn(cs);
size_t ret = 0; size_t ret = 0;
TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){count}); TRACE_ENTER(PT_EV_RX_DATA, conn, cs, buf, (size_t[]){count});
if (!count) { if (!count) {
cs->flags |= (CS_FL_RCV_MORE | CS_FL_WANT_ROOM); cs->flags |= (CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
goto end; goto end;
} }
b_realign_if_empty(buf); b_realign_if_empty(buf);
ret = cs->conn->xprt->rcv_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags); ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, count, flags);
if (conn_xprt_read0_pending(cs->conn)) { if (conn_xprt_read0_pending(conn)) {
cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM); cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
cs->flags |= CS_FL_EOS; cs->flags |= CS_FL_EOS;
TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs); TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, cs);
} }
if (cs->conn->flags & CO_FL_ERROR) { if (conn->flags & CO_FL_ERROR) {
cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM); cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
cs->flags |= CS_FL_ERROR; cs->flags |= CS_FL_ERROR;
TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs); TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, cs);
} }
end: end:
TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){ret}); TRACE_LEAVE(PT_EV_RX_DATA, conn, cs, buf, (size_t[]){ret});
return ret; return ret;
} }
/* Called from the upper layer, to send data */ /* Called from the upper layer, to send data */
static size_t mux_pt_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) static size_t mux_pt_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{ {
struct connection *conn = cs_conn(cs);
size_t ret; size_t ret;
TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){count}); TRACE_ENTER(PT_EV_TX_DATA, conn, cs, buf, (size_t[]){count});
ret = cs->conn->xprt->snd_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags); ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, count, flags);
if (ret > 0) if (ret > 0)
b_del(buf, ret); b_del(buf, ret);
TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){ret}); TRACE_LEAVE(PT_EV_TX_DATA, conn, cs, buf, (size_t[]){ret});
return ret; return ret;
} }
@ -536,8 +542,10 @@ static size_t mux_pt_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t
*/ */
static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{ {
TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type}); struct connection *conn = cs_conn(cs);
return cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, conn, cs, 0, (size_t[]){event_type});
return conn->xprt->subscribe(conn, conn->xprt_ctx, event_type, es);
} }
/* Called from the upper layer, to unsubscribe <es> from events <event_type>. /* Called from the upper layer, to unsubscribe <es> from events <event_type>.
@ -546,41 +554,45 @@ static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_
*/ */
static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es) static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{ {
TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type}); struct connection *conn = cs_conn(cs);
return cs->conn->xprt->unsubscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, conn, cs, 0, (size_t[]){event_type});
return conn->xprt->unsubscribe(conn, conn->xprt_ctx, event_type, es);
} }
#if defined(USE_LINUX_SPLICE) #if defined(USE_LINUX_SPLICE)
/* Send and get, using splicing */ /* Send and get, using splicing */
static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count) static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
{ {
struct connection *conn = cs_conn(cs);
int ret; int ret;
TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){count}); TRACE_ENTER(PT_EV_RX_DATA, conn, cs, 0, (size_t[]){count});
ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count); ret = conn->xprt->rcv_pipe(conn, conn->xprt_ctx, pipe, count);
if (conn_xprt_read0_pending(cs->conn)) { if (conn_xprt_read0_pending(conn)) {
cs->flags |= CS_FL_EOS; cs->flags |= CS_FL_EOS;
TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs); TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, cs);
} }
if (cs->conn->flags & CO_FL_ERROR) { if (conn->flags & CO_FL_ERROR) {
cs->flags |= CS_FL_ERROR; cs->flags |= CS_FL_ERROR;
TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs); TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, cs);
} }
TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){ret}); TRACE_LEAVE(PT_EV_RX_DATA, conn, cs, 0, (size_t[]){ret});
return (ret); return (ret);
} }
static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe) static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
{ {
struct connection *conn = cs_conn(cs);
int ret; int ret;
TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){pipe->data}); TRACE_ENTER(PT_EV_TX_DATA, conn, cs, 0, (size_t[]){pipe->data});
ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe); ret = conn->xprt->snd_pipe(conn, conn->xprt_ctx, pipe);
TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){ret}); TRACE_LEAVE(PT_EV_TX_DATA, conn, cs, 0, (size_t[]){ret});
return ret; return ret;
} }
#endif #endif

View File

@ -275,9 +275,13 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace
*/ */
int stream_create_from_cs(struct conn_stream *cs, struct buffer *input) int stream_create_from_cs(struct conn_stream *cs, struct buffer *input)
{ {
struct connection *conn = cs_conn(cs);
struct stream *strm; struct stream *strm;
strm = stream_new(cs->conn->owner, &cs->obj_type, input); if (!conn)
return -1;
strm = stream_new(conn->owner, &cs->obj_type, input);
if (strm == NULL) if (strm == NULL)
return -1; return -1;
@ -294,10 +298,14 @@ int stream_create_from_cs(struct conn_stream *cs, struct buffer *input)
*/ */
int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input) int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input)
{ {
struct connection *conn = cs_conn(cs);
struct stream_interface *si = cs->data; struct stream_interface *si = cs->data;
struct stream *s = si_strm(si); struct stream *s = si_strm(si);
if (cs->conn->mux->flags & MX_FL_HTX) if (!conn)
return -1;
if (conn->mux->flags & MX_FL_HTX)
s->flags |= SF_HTX; s->flags |= SF_HTX;
if (!b_is_null(input)) { if (!b_is_null(input)) {
@ -467,10 +475,12 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu
if (appctx) if (appctx)
si_attach_appctx(&s->si[0], appctx); si_attach_appctx(&s->si[0], appctx);
else if (cs) { else if (cs) {
if (cs_conn(cs) && cs->conn->mux) { const struct mux_ops *mux = cs_conn_mux(cs);
if (cs->conn->mux->flags & MX_FL_CLEAN_ABRT)
if (mux) {
if (mux->flags & MX_FL_CLEAN_ABRT)
s->si[0].flags |= SI_FL_CLEAN_ABRT; s->si[0].flags |= SI_FL_CLEAN_ABRT;
if (cs->conn->mux->flags & MX_FL_HTX) if (mux->flags & MX_FL_HTX)
s->flags |= SF_HTX; s->flags |= SF_HTX;
if (cs->flags & CS_FL_WEBSOCKET) if (cs->flags & CS_FL_WEBSOCKET)
@ -2170,10 +2180,10 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) && if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
req->to_forward && req->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) && (global.tune.options & GTUNE_USE_SPLICE) &&
(cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe && (cs_conn(objt_cs(si_f->end)) && cs_conn(__objt_cs(si_f->end))->xprt && cs_conn(__objt_cs(si_f->end))->xprt->rcv_pipe &&
__objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->rcv_pipe) && cs_conn(__objt_cs(si_f->end))->mux && cs_conn(__objt_cs(si_f->end))->mux->rcv_pipe) &&
(cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe && (cs_conn(objt_cs(si_b->end)) && cs_conn(__objt_cs(si_b->end))->xprt && cs_conn(__objt_cs(si_b->end))->xprt->snd_pipe &&
__objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->snd_pipe) && cs_conn(__objt_cs(si_b->end))->mux && cs_conn(__objt_cs(si_b->end))->mux->snd_pipe) &&
(pipes_used < global.maxpipes) && (pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@ -2363,10 +2373,10 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) && if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
res->to_forward && res->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) && (global.tune.options & GTUNE_USE_SPLICE) &&
(cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe && (cs_conn(objt_cs(si_f->end)) && cs_conn(__objt_cs(si_f->end))->xprt && cs_conn(__objt_cs(si_f->end))->xprt->snd_pipe &&
__objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->snd_pipe) && cs_conn(__objt_cs(si_f->end))->mux && cs_conn(__objt_cs(si_f->end))->mux->snd_pipe) &&
(cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe && (cs_conn(objt_cs(si_b->end)) && cs_conn(__objt_cs(si_b->end))->xprt && cs_conn(__objt_cs(si_b->end))->xprt->rcv_pipe &&
__objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->rcv_pipe) && cs_conn(__objt_cs(si_b->end))->mux && cs_conn(__objt_cs(si_b->end))->mux->rcv_pipe) &&
(pipes_used < global.maxpipes) && (pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@ -2443,8 +2453,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
si_b->prev_state == SI_ST_EST) { si_b->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n", chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
s->uniq_id, s->be->id, s->uniq_id, s->be->id,
cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1,
cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data)); DISGUISE(write(1, trash.area, trash.data));
} }
@ -2452,8 +2462,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
si_f->prev_state == SI_ST_EST) { si_f->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n", chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
s->uniq_id, s->be->id, s->uniq_id, s->be->id,
cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1,
cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data)); DISGUISE(write(1, trash.area, trash.data));
} }
} }
@ -2520,8 +2530,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n", chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
s->uniq_id, s->be->id, s->uniq_id, s->be->id,
cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1,
cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data)); DISGUISE(write(1, trash.area, trash.data));
} }
@ -3299,7 +3309,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
if (cs_conn(objt_cs(strm->si[0].end)) != NULL) { if (cs_conn(objt_cs(strm->si[0].end)) != NULL) {
cs = __objt_cs(strm->si[0].end); cs = __objt_cs(strm->si[0].end);
conn = cs->conn; conn = cs_conn(cs);
chunk_appendf(&trash, chunk_appendf(&trash,
" co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", " co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
@ -3336,7 +3346,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
if (cs_conn(objt_cs(strm->si[1].end)) != NULL) { if (cs_conn(objt_cs(strm->si[1].end)) != NULL) {
cs = __objt_cs(strm->si[1].end); cs = __objt_cs(strm->si[1].end);
conn = cs->conn; conn = cs_conn(cs);
chunk_appendf(&trash, chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",

View File

@ -562,11 +562,13 @@ static void stream_int_notify(struct stream_interface *si)
*/ */
static int si_cs_process(struct conn_stream *cs) static int si_cs_process(struct conn_stream *cs)
{ {
struct connection *conn = cs->conn; struct connection *conn = cs_conn(cs);
struct stream_interface *si = cs->data; struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
BUG_ON(!conn);
/* If we have data to send, try it now */ /* If we have data to send, try it now */
if (!channel_is_empty(oc) && !(si->wait_event.events & SUB_RETRY_SEND)) if (!channel_is_empty(oc) && !(si->wait_event.events & SUB_RETRY_SEND))
si_cs_send(cs); si_cs_send(cs);
@ -648,12 +650,14 @@ static int si_cs_process(struct conn_stream *cs)
*/ */
int si_cs_send(struct conn_stream *cs) int si_cs_send(struct conn_stream *cs)
{ {
struct connection *conn = cs->conn; struct connection *conn = cs_conn(cs);
struct stream_interface *si = cs->data; struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
int ret; int ret;
int did_send = 0; int did_send = 0;
BUG_ON(!conn);
if (conn->flags & CO_FL_ERROR || cs->flags & (CS_FL_ERROR|CS_FL_ERR_PENDING)) { if (conn->flags & CO_FL_ERROR || cs->flags & (CS_FL_ERROR|CS_FL_ERR_PENDING)) {
/* We're probably there because the tasklet was woken up, /* We're probably there because the tasklet was woken up,
* but process_stream() ran before, detected there were an * but process_stream() ran before, detected there were an
@ -752,7 +756,7 @@ int si_cs_send(struct conn_stream *cs)
} }
} }
ret = cs->conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag); ret = conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag);
if (ret > 0) { if (ret > 0) {
did_send = 1; did_send = 1;
co_set_data(oc, co_data(oc) - ret); co_set_data(oc, co_data(oc) - ret);
@ -926,7 +930,7 @@ void si_sync_send(struct stream_interface *si)
return; return;
cs = objt_cs(si->end); cs = objt_cs(si->end);
if (!cs_conn(cs) || !cs->conn->mux) if (!cs_conn_mux(cs))
return; return;
si_cs_send(cs); si_cs_send(cs);
@ -1117,6 +1121,9 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
{ {
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
struct conn_stream *cs = __objt_cs(si->end); struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs_conn(cs);
BUG_ON(!conn);
if (unlikely(!si_state_in(si->state, SI_SB_CON|SI_SB_RDY|SI_SB_EST) || if (unlikely(!si_state_in(si->state, SI_SB_CON|SI_SB_RDY|SI_SB_EST) ||
(oc->flags & CF_SHUTW))) (oc->flags & CF_SHUTW)))
@ -1132,7 +1139,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
si_cs_send(cs); si_cs_send(cs);
if (cs->flags & (CS_FL_ERROR|CS_FL_ERR_PENDING) || cs->conn->flags & CO_FL_ERROR) { if (cs->flags & (CS_FL_ERROR|CS_FL_ERR_PENDING) || conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */ /* Write error on the file descriptor */
if (si->state >= SI_ST_CON) if (si->state >= SI_ST_CON)
si->flags |= SI_FL_ERR; si->flags |= SI_FL_ERR;
@ -1209,13 +1216,15 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
*/ */
int si_cs_recv(struct conn_stream *cs) int si_cs_recv(struct conn_stream *cs)
{ {
struct connection *conn = cs->conn; struct connection *conn = cs_conn(cs);
struct stream_interface *si = cs->data; struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
int ret, max, cur_read = 0; int ret, max, cur_read = 0;
int read_poll = MAX_READ_POLL_LOOPS; int read_poll = MAX_READ_POLL_LOOPS;
int flags = 0; int flags = 0;
BUG_ON(!conn);
/* If not established yet, do nothing. */ /* If not established yet, do nothing. */
if (si->state != SI_ST_EST) if (si->state != SI_ST_EST)
return 0; return 0;
@ -1373,7 +1382,7 @@ int si_cs_recv(struct conn_stream *cs)
* CS_FL_RCV_MORE on the CS if more space is needed. * CS_FL_RCV_MORE on the CS if more space is needed.
*/ */
max = channel_recv_max(ic); max = channel_recv_max(ic);
ret = cs->conn->mux->rcv_buf(cs, &ic->buf, max, cur_flags); ret = conn->mux->rcv_buf(cs, &ic->buf, max, cur_flags);
if (cs->flags & CS_FL_WANT_ROOM) { if (cs->flags & CS_FL_WANT_ROOM) {
/* CS_FL_WANT_ROOM must not be reported if the channel's /* CS_FL_WANT_ROOM must not be reported if the channel's

View File

@ -1218,7 +1218,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
if (conn_ctrl_ready(conn) && (connect->options & TCPCHK_OPT_LINGER)) { if (conn_ctrl_ready(conn) && (connect->options & TCPCHK_OPT_LINGER)) {
/* Some servers don't like reset on close */ /* Some servers don't like reset on close */
HA_ATOMIC_AND(&fdtab[cs->conn->handle.fd].state, ~FD_LINGER_RISK); HA_ATOMIC_AND(&fdtab[conn->handle.fd].state, ~FD_LINGER_RISK);
} }
if (conn_ctrl_ready(conn) && (conn->flags & (CO_FL_SEND_PROXY | CO_FL_SOCKS4))) { if (conn_ctrl_ready(conn) && (conn->flags & (CO_FL_SEND_PROXY | CO_FL_SOCKS4))) {
@ -1495,7 +1495,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_send(struct check *check, struct tcpcheck_r
} }
} }
if ((IS_HTX_CONN(conn) && !htx_is_empty(htxbuf(&check->bo))) || (!IS_HTX_CONN(conn) && b_data(&check->bo))) { if ((IS_HTX_CONN(conn) && !htx_is_empty(htxbuf(&check->bo))) || (!IS_HTX_CONN(conn) && b_data(&check->bo))) {
cs->conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list); conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
ret = TCPCHK_EVAL_WAIT; ret = TCPCHK_EVAL_WAIT;
TRACE_DEVEL("data not fully sent, wait", CHK_EV_TCPCHK_SND|CHK_EV_TX_DATA, check); TRACE_DEVEL("data not fully sent, wait", CHK_EV_TCPCHK_SND|CHK_EV_TX_DATA, check);
goto out; goto out;