diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 507a6d152..95b995db1 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -55,38 +55,52 @@ static inline void si_prepare_none(struct stream_interface *si) { si->ops = &si_embedded_ops; si->end = NULL; - conn_prepare(si->conn, NULL, NULL, NULL, si); - si->conn->target = NULL; si->appctx.applet = NULL; } static inline void si_prepare_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt) { si->ops = &si_conn_ops; - si->end = NULL; + si->end = &si->conn->obj_type; conn_prepare(si->conn, &si_conn_cb, ctrl, xprt, si); } static inline void si_takeover_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt) { si->ops = &si_conn_ops; - si->end = NULL; + si->end = &si->conn->obj_type; conn_assign(si->conn, &si_conn_cb, ctrl, xprt, si); } static inline void si_prepare_applet(struct stream_interface *si, struct si_applet *applet) { si->ops = &si_embedded_ops; - si->end = NULL; - conn_prepare(si->conn, NULL, NULL, NULL, si); - si->conn->target = &applet->obj_type; si->appctx.applet = applet; + si->appctx.obj_type = OBJ_TYPE_APPCTX; + si->end = &si->appctx.obj_type; } /* returns a pointer to the applet being run in the SI or NULL if none */ static inline const struct si_applet *si_applet(struct stream_interface *si) { - return objt_applet(si->conn->target); + if (objt_appctx(si->end)) + return si->appctx.applet; + return NULL; +} + +/* Call the applet's main function when an appctx is attached to the stream + * interface. Returns zero if no call was made, or non-zero if a call was made. + */ +static inline int si_applet_call(struct stream_interface *si) +{ + const struct si_applet *applet; + + applet = si_applet(si); + if (applet) { + applet->fct(si); + return 1; + } + return 0; } /* call the applet's release function if any. Needs to be called upon close() */ diff --git a/src/dumpstats.c b/src/dumpstats.c index 5e54d5de2..629bf96b7 100644 --- a/src/dumpstats.c +++ b/src/dumpstats.c @@ -158,7 +158,7 @@ static int stats_accept(struct session *s) { /* we have a dedicated I/O handler for the stats */ stream_int_register_handler(&s->si[1], &cli_applet); - s->target = s->si[1].conn->target; // for logging only + s->target = &cli_applet.obj_type; // for logging only s->si[1].appctx.st1 = 0; s->si[1].appctx.st0 = STAT_CLI_INIT; @@ -3910,7 +3910,8 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se sess->uniq_id, sess->listener && sess->listener->proto->name ? sess->listener->proto->name : "?"); - switch (addr_to_str(&sess->si[0].conn->addr.from, pn, sizeof(pn))) { + switch ((obj_type(sess->si[0].end) == OBJ_TYPE_CONN) ? + addr_to_str(&sess->si[0].conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) { case AF_INET: case AF_INET6: chunk_appendf(&trash, " source=%s:%d\n", @@ -3935,8 +3936,11 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se sess->listener ? sess->listener->name ? sess->listener->name : "?" : "?", sess->listener ? sess->listener->luid : 0); - conn_get_to_addr(sess->si[0].conn); - switch (addr_to_str(&sess->si[0].conn->addr.to, pn, sizeof(pn))) { + if (obj_type(sess->si[0].end) == OBJ_TYPE_CONN) + conn_get_to_addr(sess->si[0].conn); + + switch ((obj_type(sess->si[0].end) == OBJ_TYPE_CONN) ? + addr_to_str(&sess->si[0].conn->addr.to, pn, sizeof(pn)) : AF_UNSPEC) { case AF_INET: case AF_INET6: chunk_appendf(&trash, " addr=%s:%d\n", @@ -3959,8 +3963,11 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se else chunk_appendf(&trash, " backend= (id=-1 mode=-)"); - conn_get_from_addr(sess->si[1].conn); - switch (addr_to_str(&sess->si[1].conn->addr.from, pn, sizeof(pn))) { + if (obj_type(sess->si[1].end) == OBJ_TYPE_CONN) + conn_get_from_addr(sess->si[1].conn); + + switch ((obj_type(sess->si[1].end) == OBJ_TYPE_CONN) ? + addr_to_str(&sess->si[1].conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) { case AF_INET: case AF_INET6: chunk_appendf(&trash, " addr=%s:%d\n", @@ -3983,8 +3990,11 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se else chunk_appendf(&trash, " server= (id=-1)"); - conn_get_to_addr(sess->si[1].conn); - switch (addr_to_str(&sess->si[1].conn->addr.to, pn, sizeof(pn))) { + if (obj_type(sess->si[1].end) == OBJ_TYPE_CONN) + conn_get_to_addr(sess->si[1].conn); + + switch ((obj_type(sess->si[1].end) == OBJ_TYPE_CONN) ? + addr_to_str(&sess->si[1].conn->addr.to, pn, sizeof(pn)) : AF_UNSPEC) { case AF_INET: case AF_INET6: chunk_appendf(&trash, " addr=%s:%d\n", @@ -4020,10 +4030,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se http_msg_state_str(sess->txn.req.msg_state), http_msg_state_str(sess->txn.rsp.msg_state)); chunk_appendf(&trash, - " si[0]=%p (state=%s flags=0x%02x conn0=%p exp=%s, et=0x%03x)\n", + " si[0]=%p (state=%s flags=0x%02x endp0=%s:%p conn0=%p exp=%s, et=0x%03x)\n", &sess->si[0], si_state_str(sess->si[0].state), sess->si[0].flags, + obj_type_name(sess->si[0].end), + obj_base_ptr(sess->si[0].end), sess->si[0].conn, sess->si[0].exp ? tick_is_expired(sess->si[0].exp, now_ms) ? "" : @@ -4032,10 +4044,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se sess->si[0].err_type); chunk_appendf(&trash, - " si[1]=%p (state=%s flags=0x%02x conn1=%p exp=%s, et=0x%03x)\n", + " si[1]=%p (state=%s flags=0x%02x endp1=%s:%p conn1=%p exp=%s, et=0x%03x)\n", &sess->si[1], si_state_str(sess->si[1].state), sess->si[1].flags, + obj_type_name(sess->si[1].end), + obj_base_ptr(sess->si[1].end), sess->si[1].conn, sess->si[1].exp ? tick_is_expired(sess->si[1].exp, now_ms) ? "" : @@ -4043,39 +4057,43 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se TICKS_TO_MS(1000)) : "", sess->si[1].err_type); - chunk_appendf(&trash, - " co0=%p ctrl=%s xprt=%s data=%s target=%s:%p\n", - sess->si[0].conn, - get_conn_ctrl_name(sess->si[0].conn), - get_conn_xprt_name(sess->si[0].conn), - get_conn_data_name(sess->si[0].conn), - obj_type_name(sess->si[0].conn->target), - obj_base_ptr(sess->si[0].conn->target)); + if (obj_type(sess->si[0].end) == OBJ_TYPE_CONN) { + chunk_appendf(&trash, + " co0=%p ctrl=%s xprt=%s data=%s target=%s:%p\n", + sess->si[0].conn, + get_conn_ctrl_name(sess->si[0].conn), + get_conn_xprt_name(sess->si[0].conn), + get_conn_data_name(sess->si[0].conn), + obj_type_name(sess->si[0].conn->target), + obj_base_ptr(sess->si[0].conn->target)); - chunk_appendf(&trash, - " flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n", - sess->si[0].conn->flags, - sess->si[0].conn->t.sock.fd, - sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_e : 0, - sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_p : 0, - sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].updated : 0); + chunk_appendf(&trash, + " flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n", + sess->si[0].conn->flags, + sess->si[0].conn->t.sock.fd, + sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_e : 0, + sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].spec_p : 0, + sess->si[0].conn->t.sock.fd >= 0 ? fdtab[sess->si[0].conn->t.sock.fd].updated : 0); + } - chunk_appendf(&trash, - " co1=%p ctrl=%s xprt=%s data=%s target=%s:%p\n", - sess->si[1].conn, - get_conn_ctrl_name(sess->si[1].conn), - get_conn_xprt_name(sess->si[1].conn), - get_conn_data_name(sess->si[1].conn), - obj_type_name(sess->si[1].conn->target), - obj_base_ptr(sess->si[1].conn->target)); + if (obj_type(sess->si[1].end) == OBJ_TYPE_CONN) { + chunk_appendf(&trash, + " co1=%p ctrl=%s xprt=%s data=%s target=%s:%p\n", + sess->si[1].conn, + get_conn_ctrl_name(sess->si[1].conn), + get_conn_xprt_name(sess->si[1].conn), + get_conn_data_name(sess->si[1].conn), + obj_type_name(sess->si[1].conn->target), + obj_base_ptr(sess->si[1].conn->target)); - chunk_appendf(&trash, - " flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n", - sess->si[1].conn->flags, - sess->si[1].conn->t.sock.fd, - sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_e : 0, - sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_p : 0, - sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].updated : 0); + chunk_appendf(&trash, + " flags=0x%08x fd=%d fd_spec_e=%02x fd_spec_p=%d updt=%d\n", + sess->si[1].conn->flags, + sess->si[1].conn->t.sock.fd, + sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_e : 0, + sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].spec_p : 0, + sess->si[1].conn->t.sock.fd >= 0 ? fdtab[sess->si[1].conn->t.sock.fd].updated : 0); + } chunk_appendf(&trash, " req=%p (f=0x%06x an=0x%x pipe=%d tofwd=%d total=%lld)\n" @@ -4222,7 +4240,8 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si) curr_sess->listener->proto->name); - switch (addr_to_str(&curr_sess->si[0].conn->addr.from, pn, sizeof(pn))) { + switch ((obj_type(curr_sess->si[0].end) == OBJ_TYPE_CONN) ? + addr_to_str(&curr_sess->si[0].conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) { case AF_INET: case AF_INET6: chunk_appendf(&trash, @@ -4297,7 +4316,8 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si) " s0=[%d,%1xh,fd=%d,ex=%s]", curr_sess->si[0].state, curr_sess->si[0].flags, - curr_sess->si[0].conn->t.sock.fd, + (obj_type(curr_sess->si[0].end) == OBJ_TYPE_CONN) ? + curr_sess->si[0].conn->t.sock.fd : -1, curr_sess->si[0].exp ? human_time(TICKS_TO_MS(curr_sess->si[0].exp - now_ms), TICKS_TO_MS(1000)) : ""); @@ -4306,7 +4326,8 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si) " s1=[%d,%1xh,fd=%d,ex=%s]", curr_sess->si[1].state, curr_sess->si[1].flags, - curr_sess->si[1].conn->t.sock.fd, + (obj_type(curr_sess->si[1].end) == OBJ_TYPE_CONN) ? + curr_sess->si[1].conn->t.sock.fd : -1, curr_sess->si[1].exp ? human_time(TICKS_TO_MS(curr_sess->si[1].exp - now_ms), TICKS_TO_MS(1000)) : ""); diff --git a/src/peers.c b/src/peers.c index 979767165..d265f483c 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1062,7 +1062,7 @@ static void peer_session_forceshutdown(struct session * session) { struct stream_interface *oldsi; - if (objt_applet(session->si[0].conn->target) == &peer_applet) { + if (si_applet(&session->si[0]) == &peer_applet) { oldsi = &session->si[0]; } else { @@ -1086,7 +1086,7 @@ int peer_accept(struct session *s) { /* we have a dedicated I/O handler for the stats */ stream_int_register_handler(&s->si[1], &peer_applet); - s->target = s->si[1].conn->target; // for logging only + s->target = &peer_applet.obj_type; // for logging only s->si[1].appctx.ctx.peers.ptr = s; s->si[1].appctx.st0 = PEER_SESSION_ACCEPT; diff --git a/src/session.c b/src/session.c index c86df3c47..472e5d238 100644 --- a/src/session.c +++ b/src/session.c @@ -2182,9 +2182,9 @@ struct task *process_session(struct task *t) if (s->req->cons->state == SI_ST_INI) { if (!(s->req->flags & CF_SHUTW)) { if ((s->req->flags & CF_AUTO_CONNECT) || !channel_is_empty(s->req)) { - /* If we have an applet without a connect method, we immediately - * switch to the connected state, otherwise we perform a connection - * request. + /* If we have an appctx, there is no connect method, so we + * immediately switch to the connected state, otherwise we + * perform a connection request. */ s->req->cons->state = SI_ST_REQ; /* new connection requested */ s->req->cons->conn_retries = s->be->conn_retries; @@ -2366,10 +2366,10 @@ struct task *process_session(struct task *t) if ((s->fe->options & PR_O_CONTSTATS) && (s->flags & SN_BE_ASSIGNED)) session_process_counters(s); - if (s->rep->cons->state == SI_ST_EST && obj_type(s->rep->cons->conn->target) != OBJ_TYPE_APPLET) + if (s->rep->cons->state == SI_ST_EST && obj_type(s->rep->cons->end) != OBJ_TYPE_APPCTX) si_update(s->rep->cons); - if (s->req->cons->state == SI_ST_EST && obj_type(s->req->cons->conn->target) != OBJ_TYPE_APPLET) + if (s->req->cons->state == SI_ST_EST && obj_type(s->req->cons->end) != OBJ_TYPE_APPCTX) si_update(s->req->cons); s->req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED); @@ -2393,20 +2393,17 @@ struct task *process_session(struct task *t) s->req->rex = TICK_ETERNITY; } - /* Call the stream interfaces' I/O handlers when embedded. - * Note that this one may wake the task up again. + /* When any of the stream interfaces is attached to an applet, + * we have to call it here. Note that this one may wake the + * task up again. If at least one applet was called, the current + * task might have been woken up, in which case we don't want it + * to be requeued to the wait queue but rather to the run queue + * to run ASAP. The bitwise "or" in the condition ensures that + * both functions are always called and that we wake up if at + * least one did something. */ - if (obj_type(s->req->cons->conn->target) == OBJ_TYPE_APPLET || - obj_type(s->rep->cons->conn->target) == OBJ_TYPE_APPLET) { - if (objt_applet(s->req->cons->conn->target)) - objt_applet(s->req->cons->conn->target)->fct(s->req->cons); - if (objt_applet(s->rep->cons->conn->target)) - objt_applet(s->rep->cons->conn->target)->fct(s->rep->cons); + if ((si_applet_call(s->req->cons) | si_applet_call(s->rep->cons)) != 0) { if (task_in_rq(t)) { - /* If we woke up, we don't want to requeue the - * task to the wait queue, but rather requeue - * it into the runqueue ASAP. - */ t->expire = TICK_ETERNITY; return t; } diff --git a/src/stream_interface.c b/src/stream_interface.c index 4f62e2d8c..d66d301fd 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -365,7 +365,6 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_ void stream_int_unregister_handler(struct stream_interface *si) { si->owner = NULL; - si->conn->target = NULL; si->end = NULL; }