mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-21 05:41:26 +02:00
MAJOR: conn-stream: Share endpoint struct between the CS and the mux/applet
The conn-stream endpoint is now shared between the conn-stream and the applet or the multiplexer. If the mux or the applet is created first, it is responsible to also create the endpoint and share it with the conn-stream. If the conn-stream is created first, it is the opposite. When the endpoint is only owned by an applet or a mux, it is called an orphan endpoint (there is no conn-stream). When it is only owned by a conn-stream, it is called a detached endpoint (there is no mux/applet). The last entity that owns an endpoint is responsible to release it. When a mux or an applet is detached from a conn-stream, the conn-stream relinquishes the endpoint to recreate a new one. This way, the endpoint state is never lost for the mux or the applet.
This commit is contained in:
parent
cb2fa368e9
commit
9ec2f4dc7c
@ -196,6 +196,8 @@ void show_endp_flags(unsigned int f)
|
|||||||
SHOW_FLAG(f, CS_EP_SHWN);
|
SHOW_FLAG(f, CS_EP_SHWN);
|
||||||
SHOW_FLAG(f, CS_EP_SHRR);
|
SHOW_FLAG(f, CS_EP_SHRR);
|
||||||
SHOW_FLAG(f, CS_EP_SHRD);
|
SHOW_FLAG(f, CS_EP_SHRD);
|
||||||
|
SHOW_FLAG(f, CS_EP_ORPHAN);
|
||||||
|
SHOW_FLAG(f, CS_EP_DETACHED);
|
||||||
SHOW_FLAG(f, CS_EP_T_APPLET);
|
SHOW_FLAG(f, CS_EP_T_APPLET);
|
||||||
SHOW_FLAG(f, CS_EP_T_MUX);
|
SHOW_FLAG(f, CS_EP_T_MUX);
|
||||||
|
|
||||||
|
@ -58,7 +58,8 @@ struct appctx {
|
|||||||
struct buffer *chunk; /* used to store unfinished commands */
|
struct buffer *chunk; /* used to store unfinished commands */
|
||||||
unsigned int st2; /* output state for stats, unused by peers */
|
unsigned int st2; /* output state for stats, unused by peers */
|
||||||
struct applet *applet; /* applet this context refers to */
|
struct applet *applet; /* applet this context refers to */
|
||||||
void *owner; /* pointer to upper layer's entity (eg: conn_stream) */
|
struct conn_stream *owner;
|
||||||
|
struct cs_endpoint *endp;
|
||||||
struct act_rule *rule; /* rule associated with the applet. */
|
struct act_rule *rule; /* rule associated with the applet. */
|
||||||
int (*io_handler)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK */
|
int (*io_handler)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK */
|
||||||
void (*io_release)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
|
void (*io_release)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
|
|
||||||
#include <haproxy/api.h>
|
#include <haproxy/api.h>
|
||||||
#include <haproxy/applet-t.h>
|
#include <haproxy/applet-t.h>
|
||||||
|
#include <haproxy/conn_stream.h>
|
||||||
#include <haproxy/list.h>
|
#include <haproxy/list.h>
|
||||||
#include <haproxy/pool.h>
|
#include <haproxy/pool.h>
|
||||||
#include <haproxy/task.h>
|
#include <haproxy/task.h>
|
||||||
@ -36,7 +37,7 @@ extern struct pool_head *pool_head_appctx;
|
|||||||
struct task *task_run_applet(struct task *t, void *context, unsigned int state);
|
struct task *task_run_applet(struct task *t, void *context, unsigned int state);
|
||||||
int appctx_buf_available(void *arg);
|
int appctx_buf_available(void *arg);
|
||||||
|
|
||||||
struct appctx *appctx_new(struct applet *applet);
|
struct appctx *appctx_new(struct applet *applet, struct cs_endpoint *endp);
|
||||||
|
|
||||||
/* Releases an appctx previously allocated by appctx_new(). */
|
/* Releases an appctx previously allocated by appctx_new(). */
|
||||||
static inline void __appctx_free(struct appctx *appctx)
|
static inline void __appctx_free(struct appctx *appctx)
|
||||||
@ -45,6 +46,8 @@ static inline void __appctx_free(struct appctx *appctx)
|
|||||||
if (LIST_INLIST(&appctx->buffer_wait.list))
|
if (LIST_INLIST(&appctx->buffer_wait.list))
|
||||||
LIST_DEL_INIT(&appctx->buffer_wait.list);
|
LIST_DEL_INIT(&appctx->buffer_wait.list);
|
||||||
|
|
||||||
|
BUG_ON(appctx->endp && !(appctx->endp->flags & CS_EP_ORPHAN));
|
||||||
|
cs_endpoint_free(appctx->endp);
|
||||||
pool_free(pool_head_appctx, appctx);
|
pool_free(pool_head_appctx, appctx);
|
||||||
_HA_ATOMIC_DEC(&nb_applets);
|
_HA_ATOMIC_DEC(&nb_applets);
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,13 @@ struct stream_interface;
|
|||||||
CS_EP_T_MUX = 0x00000001, /* The endpoint is a mux (the target may be NULL before the mux init) */
|
CS_EP_T_MUX = 0x00000001, /* The endpoint is a mux (the target may be NULL before the mux init) */
|
||||||
CS_EP_T_APPLET = 0x00000002, /* The endpoint is an applet */
|
CS_EP_T_APPLET = 0x00000002, /* The endpoint is an applet */
|
||||||
|
|
||||||
/* unused: 0x00000004 .. 0x00000080 */
|
/* unused: 0x00000004 .. 0x00000008 */
|
||||||
|
|
||||||
|
/* Endpoint states: none == attached to a mux with a conn-stream */
|
||||||
|
CS_EP_DETACHED = 0x00000010, /* The endpoint is detached (no mux/no applet) */
|
||||||
|
CS_EP_ORPHAN = 0x00000020, /* The endpoint is orphan (no conn-stream) */
|
||||||
|
|
||||||
|
/* unused: 0x00000040 .. 0x00000080 */
|
||||||
|
|
||||||
CS_EP_SHRD = 0x00000100, /* read shut, draining extra data */
|
CS_EP_SHRD = 0x00000100, /* read shut, draining extra data */
|
||||||
CS_EP_SHRR = 0x00000200, /* read shut, resetting extra data */
|
CS_EP_SHRR = 0x00000200, /* read shut, resetting extra data */
|
||||||
|
@ -50,6 +50,7 @@ void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx);
|
|||||||
void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx);
|
void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx);
|
||||||
int cs_attach_strm(struct conn_stream *cs, struct stream *strm);
|
int cs_attach_strm(struct conn_stream *cs, struct stream *strm);
|
||||||
|
|
||||||
|
int cs_reset_endp(struct conn_stream *cs);
|
||||||
void cs_detach_endp(struct conn_stream *cs);
|
void cs_detach_endp(struct conn_stream *cs);
|
||||||
void cs_detach_app(struct conn_stream *cs);
|
void cs_detach_app(struct conn_stream *cs);
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
#include <haproxy/buf-t.h>
|
#include <haproxy/buf-t.h>
|
||||||
#include <haproxy/connection-t.h>
|
#include <haproxy/connection-t.h>
|
||||||
#include <haproxy/xprt_quic-t.h>
|
#include <haproxy/xprt_quic-t.h>
|
||||||
|
#include <haproxy/conn_stream-t.h>
|
||||||
|
|
||||||
/* Stream types */
|
/* Stream types */
|
||||||
enum qcs_type {
|
enum qcs_type {
|
||||||
@ -88,6 +89,7 @@ struct qcc {
|
|||||||
struct qcs {
|
struct qcs {
|
||||||
struct qcc *qcc;
|
struct qcc *qcc;
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
|
struct cs_endpoint *endp;
|
||||||
uint32_t flags; /* QC_SF_* */
|
uint32_t flags; /* QC_SF_* */
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
|
@ -107,20 +107,11 @@ static inline struct qc_stream_desc *qcc_get_stream(struct qcc *qcc, uint64_t id
|
|||||||
|
|
||||||
static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *buf)
|
static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *buf)
|
||||||
{
|
{
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
cs = cs_new_from_mux(qcs->endp, qcs->qcc->conn->owner, buf);
|
||||||
if (!endp)
|
if (!cs)
|
||||||
return NULL;
|
return NULL;
|
||||||
endp->target = qcs;
|
|
||||||
endp->ctx = qcs->qcc->conn;
|
|
||||||
endp->flags |= CS_EP_T_MUX;
|
|
||||||
cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf);
|
|
||||||
if (!cs) {
|
|
||||||
cs_endpoint_free(endp);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
qcs->cs = cs;
|
qcs->cs = cs;
|
||||||
++qcs->qcc->nb_cs;
|
++qcs->qcc->nb_cs;
|
||||||
|
|
||||||
|
35
src/applet.c
35
src/applet.c
@ -47,28 +47,47 @@ static inline void appctx_init(struct appctx *appctx)
|
|||||||
* appctx_free(). <applet> is assigned as the applet, but it can be NULL. The
|
* appctx_free(). <applet> is assigned as the applet, but it can be NULL. The
|
||||||
* applet's task is always created on the current thread.
|
* applet's task is always created on the current thread.
|
||||||
*/
|
*/
|
||||||
struct appctx *appctx_new(struct applet *applet)
|
struct appctx *appctx_new(struct applet *applet, struct cs_endpoint *endp)
|
||||||
{
|
{
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
|
|
||||||
appctx = pool_alloc(pool_head_appctx);
|
appctx = pool_alloc(pool_head_appctx);
|
||||||
if (likely(appctx != NULL)) {
|
if (unlikely(!appctx))
|
||||||
|
goto fail_appctx;
|
||||||
|
|
||||||
|
appctx_init(appctx);
|
||||||
appctx->obj_type = OBJ_TYPE_APPCTX;
|
appctx->obj_type = OBJ_TYPE_APPCTX;
|
||||||
appctx->applet = applet;
|
appctx->applet = applet;
|
||||||
appctx_init(appctx);
|
|
||||||
appctx->t = task_new_here();
|
if (!endp) {
|
||||||
if (unlikely(appctx->t == NULL)) {
|
endp = cs_endpoint_new();
|
||||||
pool_free(pool_head_appctx, appctx);
|
if (!endp)
|
||||||
return NULL;
|
goto fail_endp;
|
||||||
|
endp->target = appctx;
|
||||||
|
endp->ctx = appctx;
|
||||||
|
endp->flags |= (CS_EP_T_APPLET|CS_EP_ORPHAN);
|
||||||
}
|
}
|
||||||
|
appctx->endp = endp;
|
||||||
|
|
||||||
|
appctx->t = task_new_here();
|
||||||
|
if (unlikely(!appctx->t))
|
||||||
|
goto fail_task;
|
||||||
appctx->t->process = task_run_applet;
|
appctx->t->process = task_run_applet;
|
||||||
appctx->t->context = appctx;
|
appctx->t->context = appctx;
|
||||||
|
|
||||||
LIST_INIT(&appctx->buffer_wait.list);
|
LIST_INIT(&appctx->buffer_wait.list);
|
||||||
appctx->buffer_wait.target = appctx;
|
appctx->buffer_wait.target = appctx;
|
||||||
appctx->buffer_wait.wakeup_cb = appctx_buf_available;
|
appctx->buffer_wait.wakeup_cb = appctx_buf_available;
|
||||||
|
|
||||||
_HA_ATOMIC_INC(&nb_applets);
|
_HA_ATOMIC_INC(&nb_applets);
|
||||||
}
|
|
||||||
return appctx;
|
return appctx;
|
||||||
|
|
||||||
|
fail_task:
|
||||||
|
cs_endpoint_free(appctx->endp);
|
||||||
|
fail_endp:
|
||||||
|
pool_free(pool_head_appctx, appctx);
|
||||||
|
fail_appctx:
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Callback used to wake up an applet when a buffer is available. The applet
|
/* Callback used to wake up an applet when a buffer is available. The applet
|
||||||
|
@ -1496,8 +1496,9 @@ static int connect_server(struct stream *s)
|
|||||||
|
|
||||||
if (avail >= 1) {
|
if (avail >= 1) {
|
||||||
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
|
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
|
||||||
cs_detach_endp(s->csb);
|
|
||||||
srv_conn = NULL;
|
srv_conn = NULL;
|
||||||
|
if (cs_reset_endp(s->csb) < 0)
|
||||||
|
return SF_ERR_INTERNAL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -2290,7 +2291,29 @@ void back_handle_st_cer(struct stream *s)
|
|||||||
* Note: the stream-interface will be switched to ST_REQ, ST_ASS or
|
* Note: the stream-interface will be switched to ST_REQ, ST_ASS or
|
||||||
* ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset.
|
* ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset.
|
||||||
*/
|
*/
|
||||||
cs_detach_endp(s->csb);
|
if (cs_reset_endp(s->csb) < 0) {
|
||||||
|
if (!si->err_type)
|
||||||
|
si->err_type = SI_ET_CONN_OTHER;
|
||||||
|
|
||||||
|
if (objt_server(s->target))
|
||||||
|
_HA_ATOMIC_INC(&objt_server(s->target)->counters.internal_errors);
|
||||||
|
_HA_ATOMIC_INC(&s->be->be_counters.internal_errors);
|
||||||
|
sess_change_server(s, NULL);
|
||||||
|
if (may_dequeue_tasks(objt_server(s->target), s->be))
|
||||||
|
process_srv_queue(objt_server(s->target));
|
||||||
|
|
||||||
|
/* shutw is enough so stop a connecting socket */
|
||||||
|
si_shutw(si);
|
||||||
|
s->req.flags |= CF_WRITE_ERROR;
|
||||||
|
s->res.flags |= CF_READ_ERROR;
|
||||||
|
|
||||||
|
si->state = SI_ST_CLO;
|
||||||
|
if (s->srv_error)
|
||||||
|
s->srv_error(s, si);
|
||||||
|
|
||||||
|
DBG_TRACE_STATE("error resetting endpoint", STRM_EV_STRM_PROC|STRM_EV_SI_ST|STRM_EV_STRM_ERR, s);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
stream_choose_redispatch(s);
|
stream_choose_redispatch(s);
|
||||||
|
|
||||||
|
22
src/check.c
22
src/check.c
@ -1134,6 +1134,15 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
|
|||||||
task_set_affinity(t, tid_bit);
|
task_set_affinity(t, tid_bit);
|
||||||
|
|
||||||
check->current_step = NULL;
|
check->current_step = NULL;
|
||||||
|
|
||||||
|
if (check->cs->flags & CS_FL_ERROR) {
|
||||||
|
check->cs->flags &= ~CS_FL_ERROR;
|
||||||
|
check->cs->endp = cs_endpoint_new();
|
||||||
|
if (!check->cs->endp)
|
||||||
|
check->cs->flags |= CS_FL_ERROR;
|
||||||
|
else
|
||||||
|
check->cs->endp->flags |= CS_EP_DETACHED;
|
||||||
|
}
|
||||||
tcpcheck_main(check);
|
tcpcheck_main(check);
|
||||||
expired = 0;
|
expired = 0;
|
||||||
}
|
}
|
||||||
@ -1155,9 +1164,10 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
|
|||||||
else {
|
else {
|
||||||
if (check->state & CHK_ST_CLOSE_CONN) {
|
if (check->state & CHK_ST_CLOSE_CONN) {
|
||||||
TRACE_DEVEL("closing current connection", CHK_EV_TASK_WAKE|CHK_EV_HCHK_RUN, check);
|
TRACE_DEVEL("closing current connection", CHK_EV_TASK_WAKE|CHK_EV_HCHK_RUN, check);
|
||||||
cs_detach_endp(check->cs);
|
|
||||||
conn = NULL;
|
|
||||||
check->state &= ~CHK_ST_CLOSE_CONN;
|
check->state &= ~CHK_ST_CLOSE_CONN;
|
||||||
|
if (cs_reset_endp(check->cs) < 0)
|
||||||
|
check->cs->flags |= CS_FL_ERROR;
|
||||||
|
conn = NULL;
|
||||||
tcpcheck_main(check);
|
tcpcheck_main(check);
|
||||||
}
|
}
|
||||||
if (check->result == CHK_RES_UNKNOWN) {
|
if (check->result == CHK_RES_UNKNOWN) {
|
||||||
@ -1190,7 +1200,13 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
|
|||||||
* the tasklet
|
* the tasklet
|
||||||
*/
|
*/
|
||||||
tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
|
tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
|
||||||
cs_detach_endp(check->cs);
|
|
||||||
|
if (cs_reset_endp(check->cs) < 0) {
|
||||||
|
/* If an error occurred at this stage, it will be fixed by the
|
||||||
|
* next check
|
||||||
|
*/
|
||||||
|
check->cs->flags |= CS_FL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
if (check->sess != NULL) {
|
if (check->sess != NULL) {
|
||||||
vars_prune(&check->vars, check->sess, NULL);
|
vars_prune(&check->vars, check->sess, NULL);
|
||||||
|
@ -2753,8 +2753,14 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
|
|||||||
* connection.
|
* connection.
|
||||||
*/
|
*/
|
||||||
if (!si_conn_ready(cs_si(s->csb))) {
|
if (!si_conn_ready(cs_si(s->csb))) {
|
||||||
cs_detach_endp(s->csb);
|
|
||||||
s->srv_conn = NULL;
|
s->srv_conn = NULL;
|
||||||
|
if (cs_reset_endp(s->csb) < 0) {
|
||||||
|
if (!cs_si(s->csb)->err_type)
|
||||||
|
cs_si(s->csb)->err_type = SI_ET_CONN_OTHER;
|
||||||
|
if (s->srv_error)
|
||||||
|
s->srv_error(s, cs_si(s->csb));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sockaddr_free(&(cs_si(s->csb)->dst));
|
sockaddr_free(&(cs_si(s->csb)->dst));
|
||||||
|
@ -86,6 +86,7 @@ struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *se
|
|||||||
pool_free(pool_head_connstream, cs);
|
pool_free(pool_head_connstream, cs);
|
||||||
cs = NULL;
|
cs = NULL;
|
||||||
}
|
}
|
||||||
|
endp->flags &= ~CS_EP_ORPHAN;
|
||||||
return cs;
|
return cs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,6 +103,7 @@ struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session
|
|||||||
pool_free(pool_head_connstream, cs);
|
pool_free(pool_head_connstream, cs);
|
||||||
cs = NULL;
|
cs = NULL;
|
||||||
}
|
}
|
||||||
|
endp->flags &= ~CS_EP_ORPHAN;
|
||||||
return cs;
|
return cs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,6 +115,7 @@ struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
|
|||||||
if (unlikely(!cs))
|
if (unlikely(!cs))
|
||||||
return NULL;
|
return NULL;
|
||||||
cs->flags |= flags;
|
cs->flags |= flags;
|
||||||
|
cs->endp->flags |= CS_EP_DETACHED;
|
||||||
cs->si = si_new(cs);
|
cs->si = si_new(cs);
|
||||||
if (unlikely(!cs->si)) {
|
if (unlikely(!cs->si)) {
|
||||||
cs_free(cs);
|
cs_free(cs);
|
||||||
@ -132,6 +135,7 @@ struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags)
|
|||||||
if (unlikely(!cs))
|
if (unlikely(!cs))
|
||||||
return NULL;
|
return NULL;
|
||||||
cs->flags |= flags;
|
cs->flags |= flags;
|
||||||
|
cs->endp->flags |= CS_EP_DETACHED;
|
||||||
cs->app = &check->obj_type;
|
cs->app = &check->obj_type;
|
||||||
cs->data_cb = &check_conn_cb;
|
cs->data_cb = &check_conn_cb;
|
||||||
return cs;
|
return cs;
|
||||||
@ -144,6 +148,7 @@ void cs_free(struct conn_stream *cs)
|
|||||||
{
|
{
|
||||||
si_free(cs->si);
|
si_free(cs->si);
|
||||||
if (cs->endp) {
|
if (cs->endp) {
|
||||||
|
BUG_ON(!(cs->endp->flags & CS_EP_DETACHED));
|
||||||
cs_endpoint_free(cs->endp);
|
cs_endpoint_free(cs->endp);
|
||||||
}
|
}
|
||||||
pool_free(pool_head_connstream, cs);
|
pool_free(pool_head_connstream, cs);
|
||||||
@ -158,6 +163,7 @@ void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
|
|||||||
cs->endp->target = target;
|
cs->endp->target = target;
|
||||||
cs->endp->ctx = ctx;
|
cs->endp->ctx = ctx;
|
||||||
cs->endp->flags |= CS_EP_T_MUX;
|
cs->endp->flags |= CS_EP_T_MUX;
|
||||||
|
cs->endp->flags &= ~CS_EP_DETACHED;
|
||||||
if (!conn->ctx)
|
if (!conn->ctx)
|
||||||
conn->ctx = cs;
|
conn->ctx = cs;
|
||||||
if (cs_strm(cs)) {
|
if (cs_strm(cs)) {
|
||||||
@ -176,8 +182,9 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
|
|||||||
cs->endp->target = target;
|
cs->endp->target = target;
|
||||||
cs->endp->ctx = ctx;
|
cs->endp->ctx = ctx;
|
||||||
cs->endp->flags |= CS_EP_T_APPLET;
|
cs->endp->flags |= CS_EP_T_APPLET;
|
||||||
|
cs->endp->flags &= ~CS_EP_DETACHED;
|
||||||
appctx->owner = cs;
|
appctx->owner = cs;
|
||||||
if (cs->si) {
|
if (cs_strm(cs)) {
|
||||||
cs->si->ops = &si_applet_ops;
|
cs->si->ops = &si_applet_ops;
|
||||||
cs->data_cb = NULL;
|
cs->data_cb = NULL;
|
||||||
}
|
}
|
||||||
@ -191,7 +198,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
|
|||||||
cs->si = si_new(cs);
|
cs->si = si_new(cs);
|
||||||
if (unlikely(!cs->si))
|
if (unlikely(!cs->si))
|
||||||
return -1;
|
return -1;
|
||||||
|
cs->endp->flags &= ~CS_EP_ORPHAN;
|
||||||
if (cs->endp->flags & CS_EP_T_MUX) {
|
if (cs->endp->flags & CS_EP_T_MUX) {
|
||||||
cs->si->ops = &si_conn_ops;
|
cs->si->ops = &si_conn_ops;
|
||||||
cs->data_cb = &si_conn_cb;
|
cs->data_cb = &si_conn_cb;
|
||||||
@ -220,9 +227,11 @@ void cs_detach_endp(struct conn_stream *cs)
|
|||||||
|
|
||||||
if (conn->mux) {
|
if (conn->mux) {
|
||||||
/* TODO: handle unsubscribe for healthchecks too */
|
/* TODO: handle unsubscribe for healthchecks too */
|
||||||
|
cs->endp->flags |= CS_EP_ORPHAN;
|
||||||
if (cs->si && cs->si->wait_event.events != 0)
|
if (cs->si && cs->si->wait_event.events != 0)
|
||||||
conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
|
conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
|
||||||
conn->mux->detach(cs);
|
conn->mux->detach(cs);
|
||||||
|
cs->endp = NULL;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
/* It's too early to have a mux, let's just destroy
|
/* It's too early to have a mux, let's just destroy
|
||||||
@ -238,13 +247,17 @@ void cs_detach_endp(struct conn_stream *cs)
|
|||||||
else if (cs->endp->flags & CS_EP_T_APPLET) {
|
else if (cs->endp->flags & CS_EP_T_APPLET) {
|
||||||
struct appctx *appctx = cs_appctx(cs);
|
struct appctx *appctx = cs_appctx(cs);
|
||||||
|
|
||||||
|
cs->endp->flags |= CS_EP_ORPHAN;
|
||||||
if (cs->si)
|
if (cs->si)
|
||||||
si_applet_release(cs->si);
|
si_applet_release(cs->si);
|
||||||
appctx_free(appctx);
|
appctx_free(appctx);
|
||||||
|
cs->endp = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cs->endp) {
|
if (cs->endp) {
|
||||||
|
/* the cs is the only one one the endpoint */
|
||||||
cs_endpoint_init(cs->endp);
|
cs_endpoint_init(cs->endp);
|
||||||
|
cs->endp->flags |= CS_EP_DETACHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* FIXME: Rest CS for now but must be reviewed. CS flags are only
|
/* FIXME: Rest CS for now but must be reviewed. CS flags are only
|
||||||
@ -266,6 +279,21 @@ void cs_detach_app(struct conn_stream *cs)
|
|||||||
cs->si = NULL;
|
cs->si = NULL;
|
||||||
cs->data_cb = NULL;
|
cs->data_cb = NULL;
|
||||||
|
|
||||||
if (!cs->endp || !cs->endp->target)
|
if (!cs->endp || (cs->endp->flags & CS_EP_DETACHED))
|
||||||
cs_free(cs);
|
cs_free(cs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int cs_reset_endp(struct conn_stream *cs)
|
||||||
|
{
|
||||||
|
BUG_ON(!cs->app);
|
||||||
|
cs_detach_endp(cs);
|
||||||
|
if (!cs->endp) {
|
||||||
|
cs->endp = cs_endpoint_new();
|
||||||
|
if (!cs->endp) {
|
||||||
|
cs->flags |= CS_FL_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
cs->endp->flags |= CS_EP_DETACHED;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
13
src/dns.c
13
src/dns.c
@ -887,13 +887,12 @@ static struct appctx *dns_session_create(struct dns_session *ds)
|
|||||||
{
|
{
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
struct applet *applet = &dns_session_applet;
|
struct applet *applet = &dns_session_applet;
|
||||||
struct sockaddr_storage *addr = NULL;
|
struct sockaddr_storage *addr = NULL;
|
||||||
|
|
||||||
appctx = appctx_new(applet);
|
appctx = appctx_new(applet, NULL);
|
||||||
if (!appctx)
|
if (!appctx)
|
||||||
goto out_close;
|
goto out_close;
|
||||||
appctx->ctx.sft.ptr = (void *)ds;
|
appctx->ctx.sft.ptr = (void *)ds;
|
||||||
@ -907,17 +906,9 @@ static struct appctx *dns_session_create(struct dns_session *ds)
|
|||||||
if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
|
if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
|
||||||
if (!endp)
|
|
||||||
goto out_free_addr;
|
|
||||||
endp->target = appctx;
|
|
||||||
endp->ctx = appctx;
|
|
||||||
endp->flags |= CS_EP_T_APPLET;
|
|
||||||
|
|
||||||
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
|
|
||||||
if (!cs) {
|
if (!cs) {
|
||||||
ha_alert("Failed to initialize stream in dns_session_create().\n");
|
ha_alert("Failed to initialize stream in dns_session_create().\n");
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto out_free_addr;
|
goto out_free_addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1988,11 +1988,10 @@ spoe_create_appctx(struct spoe_config *conf)
|
|||||||
{
|
{
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
struct stream *strm;
|
struct stream *strm;
|
||||||
|
|
||||||
if ((appctx = appctx_new(&spoe_applet)) == NULL)
|
if ((appctx = appctx_new(&spoe_applet, NULL)) == NULL)
|
||||||
goto out_error;
|
goto out_error;
|
||||||
|
|
||||||
appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx);
|
appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx);
|
||||||
@ -2025,18 +2024,9 @@ spoe_create_appctx(struct spoe_config *conf)
|
|||||||
if (!sess)
|
if (!sess)
|
||||||
goto out_free_spoe;
|
goto out_free_spoe;
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
|
||||||
if (!endp)
|
if (!cs)
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
endp->target = appctx;
|
|
||||||
endp->ctx = appctx;
|
|
||||||
endp->flags |= CS_EP_T_APPLET;
|
|
||||||
|
|
||||||
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
|
|
||||||
if (!cs) {
|
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto out_free_sess;
|
|
||||||
}
|
|
||||||
|
|
||||||
strm = DISGUISE(cs_strm(cs));
|
strm = DISGUISE(cs_strm(cs));
|
||||||
stream_set_backend(strm, conf->agent->b.be);
|
stream_set_backend(strm, conf->agent->b.be);
|
||||||
|
1
src/h3.c
1
src/h3.c
@ -176,7 +176,6 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
|
|||||||
cs = qc_attach_cs(qcs, &htx_buf);
|
cs = qc_attach_cs(qcs, &htx_buf);
|
||||||
if (!cs)
|
if (!cs)
|
||||||
return 1;
|
return 1;
|
||||||
cs->endp->flags |= CS_EP_NOT_FIRST;
|
|
||||||
|
|
||||||
/* buffer is transferred to conn_stream and set to NULL
|
/* buffer is transferred to conn_stream and set to NULL
|
||||||
* except on stream creation error.
|
* except on stream creation error.
|
||||||
|
13
src/hlua.c
13
src/hlua.c
@ -2918,7 +2918,6 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
|||||||
struct hlua_socket *socket;
|
struct hlua_socket *socket;
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
|
|
||||||
@ -2946,7 +2945,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
|||||||
lua_setmetatable(L, -2);
|
lua_setmetatable(L, -2);
|
||||||
|
|
||||||
/* Create the applet context */
|
/* Create the applet context */
|
||||||
appctx = appctx_new(&update_applet);
|
appctx = appctx_new(&update_applet, NULL);
|
||||||
if (!appctx) {
|
if (!appctx) {
|
||||||
hlua_pusherror(L, "socket: out of memory");
|
hlua_pusherror(L, "socket: out of memory");
|
||||||
goto out_fail_conf;
|
goto out_fail_conf;
|
||||||
@ -2964,17 +2963,9 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
|||||||
goto out_fail_appctx;
|
goto out_fail_appctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
|
||||||
if (!endp)
|
|
||||||
goto out_fail_sess;
|
|
||||||
endp->target = appctx;
|
|
||||||
endp->ctx = appctx;
|
|
||||||
endp->flags |= CS_EP_T_APPLET;
|
|
||||||
|
|
||||||
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
|
|
||||||
if (!cs) {
|
if (!cs) {
|
||||||
hlua_pusherror(L, "socket: out of memory");
|
hlua_pusherror(L, "socket: out of memory");
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto out_fail_sess;
|
goto out_fail_sess;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1254,7 +1254,13 @@ static __inline int do_l7_retry(struct stream *s, struct stream_interface *si)
|
|||||||
res->to_forward = 0;
|
res->to_forward = 0;
|
||||||
res->analyse_exp = TICK_ETERNITY;
|
res->analyse_exp = TICK_ETERNITY;
|
||||||
res->total = 0;
|
res->total = 0;
|
||||||
cs_detach_endp(s->csb);
|
|
||||||
|
if (cs_reset_endp(s->csb) < 0) {
|
||||||
|
s->csb->flags |= CS_FL_ERROR;
|
||||||
|
if (!(s->flags & SF_ERR_MASK))
|
||||||
|
s->flags |= SF_ERR_INTERNAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
b_free(&req->buf);
|
b_free(&req->buf);
|
||||||
/* Swap the L7 buffer with the channel buffer */
|
/* Swap the L7 buffer with the channel buffer */
|
||||||
|
@ -455,7 +455,6 @@ struct appctx *httpclient_start(struct httpclient *hc)
|
|||||||
struct applet *applet = &httpclient_applet;
|
struct applet *applet = &httpclient_applet;
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
struct sockaddr_storage *addr = NULL;
|
struct sockaddr_storage *addr = NULL;
|
||||||
@ -480,7 +479,7 @@ struct appctx *httpclient_start(struct httpclient *hc)
|
|||||||
|
|
||||||
/* The HTTP client will be created in the same thread as the caller,
|
/* The HTTP client will be created in the same thread as the caller,
|
||||||
* avoiding threading issues */
|
* avoiding threading issues */
|
||||||
appctx = appctx_new(applet);
|
appctx = appctx_new(applet, NULL);
|
||||||
if (!appctx)
|
if (!appctx)
|
||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
@ -499,17 +498,9 @@ struct appctx *httpclient_start(struct httpclient *hc)
|
|||||||
if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst)))
|
if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst)))
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
cs = cs_new_from_applet(appctx->endp, sess, &hc->req.buf);
|
||||||
if (!endp)
|
|
||||||
goto out_free_addr;
|
|
||||||
endp->target = appctx;
|
|
||||||
endp->ctx = appctx;
|
|
||||||
endp->flags |= CS_EP_T_APPLET;
|
|
||||||
|
|
||||||
cs = cs_new_from_applet(endp, sess, &hc->req.buf);
|
|
||||||
if (!cs) {
|
if (!cs) {
|
||||||
ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
|
ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto out_free_addr;
|
goto out_free_addr;
|
||||||
}
|
}
|
||||||
s = DISGUISE(cs_strm(cs));
|
s = DISGUISE(cs_strm(cs));
|
||||||
|
@ -155,6 +155,7 @@ enum fcgi_strm_st {
|
|||||||
/* FCGI stream descriptor */
|
/* FCGI stream descriptor */
|
||||||
struct fcgi_strm {
|
struct fcgi_strm {
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
|
struct cs_endpoint *endp;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct fcgi_conn *fconn;
|
struct fcgi_conn *fconn;
|
||||||
|
|
||||||
@ -1042,6 +1043,8 @@ static void fcgi_strm_destroy(struct fcgi_strm *fstrm)
|
|||||||
*/
|
*/
|
||||||
LIST_DEL_INIT(&fstrm->send_list);
|
LIST_DEL_INIT(&fstrm->send_list);
|
||||||
tasklet_free(fstrm->shut_tl);
|
tasklet_free(fstrm->shut_tl);
|
||||||
|
BUG_ON(fstrm->endp && !(fstrm->endp->flags & CS_EP_ORPHAN));
|
||||||
|
cs_endpoint_free(fstrm->endp);
|
||||||
pool_free(pool_head_fcgi_strm, fstrm);
|
pool_free(pool_head_fcgi_strm, fstrm);
|
||||||
|
|
||||||
TRACE_LEAVE(FCGI_EV_FSTRM_END, conn);
|
TRACE_LEAVE(FCGI_EV_FSTRM_END, conn);
|
||||||
@ -1077,6 +1080,7 @@ static struct fcgi_strm *fcgi_strm_new(struct fcgi_conn *fconn, int id)
|
|||||||
LIST_INIT(&fstrm->send_list);
|
LIST_INIT(&fstrm->send_list);
|
||||||
fstrm->fconn = fconn;
|
fstrm->fconn = fconn;
|
||||||
fstrm->cs = NULL;
|
fstrm->cs = NULL;
|
||||||
|
fstrm->endp = NULL;
|
||||||
fstrm->flags = FCGI_SF_NONE;
|
fstrm->flags = FCGI_SF_NONE;
|
||||||
fstrm->proto_status = 0;
|
fstrm->proto_status = 0;
|
||||||
fstrm->state = FCGI_SS_IDLE;
|
fstrm->state = FCGI_SS_IDLE;
|
||||||
@ -1132,6 +1136,7 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co
|
|||||||
}
|
}
|
||||||
cs_attach_mux(cs, fstrm, fconn->conn);
|
cs_attach_mux(cs, fstrm, fconn->conn);
|
||||||
fstrm->cs = cs;
|
fstrm->cs = cs;
|
||||||
|
fstrm->endp = cs->endp;
|
||||||
fstrm->sess = sess;
|
fstrm->sess = sess;
|
||||||
fconn->nb_cs++;
|
fconn->nb_cs++;
|
||||||
|
|
||||||
@ -3117,7 +3122,7 @@ static int fcgi_process(struct fcgi_conn *fconn)
|
|||||||
|
|
||||||
while (node) {
|
while (node) {
|
||||||
fstrm = container_of(node, struct fcgi_strm, by_id);
|
fstrm = container_of(node, struct fcgi_strm, by_id);
|
||||||
if (fstrm->cs && fstrm->cs->endp->flags & CS_EP_WAIT_FOR_HS)
|
if (fstrm->cs && fstrm->endp->flags & CS_EP_WAIT_FOR_HS)
|
||||||
fcgi_strm_notify_recv(fstrm);
|
fcgi_strm_notify_recv(fstrm);
|
||||||
node = eb32_next(node);
|
node = eb32_next(node);
|
||||||
}
|
}
|
||||||
|
51
src/mux_h1.c
51
src/mux_h1.c
@ -119,6 +119,7 @@ struct h1c {
|
|||||||
struct h1s {
|
struct h1s {
|
||||||
struct h1c *h1c;
|
struct h1c *h1c;
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
|
struct cs_endpoint *endp;
|
||||||
uint32_t flags; /* Connection flags: H1S_F_* */
|
uint32_t flags; /* Connection flags: H1S_F_* */
|
||||||
|
|
||||||
struct wait_event *subs; /* Address of the wait_event the conn_stream associated is waiting on */
|
struct wait_event *subs; /* Address of the wait_event the conn_stream associated is waiting on */
|
||||||
@ -716,27 +717,17 @@ static inline size_t h1s_data_pending(const struct h1s *h1s)
|
|||||||
static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
|
static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
|
||||||
{
|
{
|
||||||
struct h1c *h1c = h1s->h1c;
|
struct h1c *h1c = h1s->h1c;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
|
|
||||||
TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
|
TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
|
||||||
if (!endp) {
|
|
||||||
TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
|
|
||||||
goto err;
|
|
||||||
}
|
|
||||||
endp->target = h1s;
|
|
||||||
endp->ctx = h1c->conn;
|
|
||||||
endp->flags |= CS_EP_T_MUX;
|
|
||||||
if (h1s->flags & H1S_F_NOT_FIRST)
|
if (h1s->flags & H1S_F_NOT_FIRST)
|
||||||
endp->flags |= CS_EP_NOT_FIRST;
|
h1s->endp->flags |= CS_EP_NOT_FIRST;
|
||||||
if (h1s->req.flags & H1_MF_UPG_WEBSOCKET)
|
if (h1s->req.flags & H1_MF_UPG_WEBSOCKET)
|
||||||
endp->flags |= CS_EP_WEBSOCKET;
|
h1s->endp->flags |= CS_EP_WEBSOCKET;
|
||||||
|
|
||||||
h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input);
|
h1s->cs = cs_new_from_mux(h1s->endp, h1c->conn->owner, input);
|
||||||
if (!h1s->cs) {
|
if (!h1s->cs) {
|
||||||
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
|
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -785,6 +776,7 @@ static struct h1s *h1s_new(struct h1c *h1c)
|
|||||||
h1c->h1s = h1s;
|
h1c->h1s = h1s;
|
||||||
h1s->sess = NULL;
|
h1s->sess = NULL;
|
||||||
h1s->cs = NULL;
|
h1s->cs = NULL;
|
||||||
|
h1s->endp = NULL;
|
||||||
h1s->flags = H1S_F_WANT_KAL;
|
h1s->flags = H1S_F_WANT_KAL;
|
||||||
h1s->subs = NULL;
|
h1s->subs = NULL;
|
||||||
h1s->rxbuf = BUF_NULL;
|
h1s->rxbuf = BUF_NULL;
|
||||||
@ -811,9 +803,8 @@ static struct h1s *h1s_new(struct h1c *h1c)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct h1s *h1c_frt_stream_new(struct h1c *h1c)
|
static struct h1s *h1c_frt_stream_new(struct h1c *h1c, struct conn_stream *cs, struct session *sess)
|
||||||
{
|
{
|
||||||
struct session *sess = h1c->conn->owner;
|
|
||||||
struct h1s *h1s;
|
struct h1s *h1s;
|
||||||
|
|
||||||
TRACE_ENTER(H1_EV_H1S_NEW, h1c->conn);
|
TRACE_ENTER(H1_EV_H1S_NEW, h1c->conn);
|
||||||
@ -822,6 +813,20 @@ static struct h1s *h1c_frt_stream_new(struct h1c *h1c)
|
|||||||
if (!h1s)
|
if (!h1s)
|
||||||
goto fail;
|
goto fail;
|
||||||
|
|
||||||
|
if (cs) {
|
||||||
|
cs_attach_mux(cs, h1s, h1c->conn);
|
||||||
|
h1s->cs = cs;
|
||||||
|
h1s->endp = cs->endp;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
h1s->endp = cs_endpoint_new();
|
||||||
|
if (!h1s->endp)
|
||||||
|
goto fail;
|
||||||
|
h1s->endp->target = h1s;
|
||||||
|
h1s->endp->ctx = h1c->conn;
|
||||||
|
h1s->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN);
|
||||||
|
}
|
||||||
|
|
||||||
h1s->sess = sess;
|
h1s->sess = sess;
|
||||||
|
|
||||||
if (h1c->px->options2 & PR_O2_REQBUG_OK)
|
if (h1c->px->options2 & PR_O2_REQBUG_OK)
|
||||||
@ -834,6 +839,7 @@ static struct h1s *h1c_frt_stream_new(struct h1c *h1c)
|
|||||||
|
|
||||||
fail:
|
fail:
|
||||||
TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn);
|
TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn);
|
||||||
|
pool_free(pool_head_h1s, h1s);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -850,6 +856,7 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s
|
|||||||
cs_attach_mux(cs, h1s, h1c->conn);
|
cs_attach_mux(cs, h1s, h1c->conn);
|
||||||
h1s->flags |= H1S_F_RX_BLK;
|
h1s->flags |= H1S_F_RX_BLK;
|
||||||
h1s->cs = cs;
|
h1s->cs = cs;
|
||||||
|
h1s->endp = cs->endp;
|
||||||
h1s->sess = sess;
|
h1s->sess = sess;
|
||||||
|
|
||||||
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
|
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
|
||||||
@ -903,6 +910,8 @@ static void h1s_destroy(struct h1s *h1s)
|
|||||||
}
|
}
|
||||||
|
|
||||||
HA_ATOMIC_DEC(&h1c->px_counters->open_streams);
|
HA_ATOMIC_DEC(&h1c->px_counters->open_streams);
|
||||||
|
BUG_ON(h1s->endp && !(h1s->endp->flags & CS_EP_ORPHAN));
|
||||||
|
cs_endpoint_free(h1s->endp);
|
||||||
pool_free(pool_head_h1s, h1s);
|
pool_free(pool_head_h1s, h1s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -990,12 +999,8 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
|
|||||||
}
|
}
|
||||||
else if (conn_ctx) {
|
else if (conn_ctx) {
|
||||||
/* Upgraded frontend connection (from TCP) */
|
/* Upgraded frontend connection (from TCP) */
|
||||||
struct conn_stream *cs = conn_ctx;
|
if (!h1c_frt_stream_new(h1c, conn_ctx, h1c->conn->owner))
|
||||||
|
|
||||||
if (!h1c_frt_stream_new(h1c))
|
|
||||||
goto fail;
|
goto fail;
|
||||||
h1c->h1s->cs = cs;
|
|
||||||
cs_attach_mux(cs, h1c->h1s, conn);
|
|
||||||
|
|
||||||
/* Attach the CS but Not ready yet */
|
/* Attach the CS but Not ready yet */
|
||||||
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
|
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
|
||||||
@ -1879,11 +1884,11 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count
|
|||||||
/* Here h1s->cs is always defined */
|
/* Here h1s->cs is always defined */
|
||||||
if (!(h1m->flags & H1_MF_CHNK) && (h1m->state == H1_MSG_DATA || (h1m->state == H1_MSG_TUNNEL))) {
|
if (!(h1m->flags & H1_MF_CHNK) && (h1m->state == H1_MSG_DATA || (h1m->state == H1_MSG_TUNNEL))) {
|
||||||
TRACE_STATE("notify the mux can use splicing", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
|
TRACE_STATE("notify the mux can use splicing", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
|
||||||
h1s->cs->endp->flags |= CS_EP_MAY_SPLICE;
|
h1s->endp->flags |= CS_EP_MAY_SPLICE;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
|
TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
|
||||||
h1s->cs->endp->flags &= ~CS_EP_MAY_SPLICE;
|
h1s->endp->flags &= ~CS_EP_MAY_SPLICE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Set EOI on conn-stream in DONE state iff:
|
/* Set EOI on conn-stream in DONE state iff:
|
||||||
@ -2948,7 +2953,7 @@ static int h1_process(struct h1c * h1c)
|
|||||||
|
|
||||||
/* Create the H1 stream if not already there */
|
/* Create the H1 stream if not already there */
|
||||||
if (!h1s) {
|
if (!h1s) {
|
||||||
h1s = h1c_frt_stream_new(h1c);
|
h1s = h1c_frt_stream_new(h1c, NULL, h1c->conn->owner);
|
||||||
if (!h1s) {
|
if (!h1s) {
|
||||||
b_reset(&h1c->ibuf);
|
b_reset(&h1c->ibuf);
|
||||||
h1c->flags = (h1c->flags & ~(H1C_F_ST_IDLE|H1C_F_WAIT_NEXT_REQ)) | H1C_F_ST_ERROR;
|
h1c->flags = (h1c->flags & ~(H1C_F_ST_IDLE|H1C_F_WAIT_NEXT_REQ)) | H1C_F_ST_ERROR;
|
||||||
|
28
src/mux_h2.c
28
src/mux_h2.c
@ -214,6 +214,7 @@ enum h2_ss {
|
|||||||
*/
|
*/
|
||||||
struct h2s {
|
struct h2s {
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
|
struct cs_endpoint *endp;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct h2c *h2c;
|
struct h2c *h2c;
|
||||||
struct eb32_node by_id; /* place in h2c's streams_by_id */
|
struct eb32_node by_id; /* place in h2c's streams_by_id */
|
||||||
@ -1520,6 +1521,8 @@ static void h2s_destroy(struct h2s *h2s)
|
|||||||
|
|
||||||
/* ditto, calling tasklet_free() here should be ok */
|
/* ditto, calling tasklet_free() here should be ok */
|
||||||
tasklet_free(h2s->shut_tl);
|
tasklet_free(h2s->shut_tl);
|
||||||
|
BUG_ON(h2s->endp && !(h2s->endp->flags & CS_EP_ORPHAN));
|
||||||
|
cs_endpoint_free(h2s->endp);
|
||||||
pool_free(pool_head_h2s, h2s);
|
pool_free(pool_head_h2s, h2s);
|
||||||
|
|
||||||
TRACE_LEAVE(H2_EV_H2S_END, conn);
|
TRACE_LEAVE(H2_EV_H2S_END, conn);
|
||||||
@ -1552,6 +1555,7 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
|
|||||||
LIST_INIT(&h2s->list);
|
LIST_INIT(&h2s->list);
|
||||||
h2s->h2c = h2c;
|
h2s->h2c = h2c;
|
||||||
h2s->cs = NULL;
|
h2s->cs = NULL;
|
||||||
|
h2s->endp = NULL;
|
||||||
h2s->sws = 0;
|
h2s->sws = 0;
|
||||||
h2s->flags = H2_SF_NONE;
|
h2s->flags = H2_SF_NONE;
|
||||||
h2s->errcode = H2_ERR_NO_ERROR;
|
h2s->errcode = H2_ERR_NO_ERROR;
|
||||||
@ -1590,7 +1594,6 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
|
|||||||
static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *input, uint32_t flags)
|
static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *input, uint32_t flags)
|
||||||
{
|
{
|
||||||
struct session *sess = h2c->conn->owner;
|
struct session *sess = h2c->conn->owner;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct h2s *h2s;
|
struct h2s *h2s;
|
||||||
|
|
||||||
TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn);
|
TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn);
|
||||||
@ -1602,17 +1605,18 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
|
|||||||
if (!h2s)
|
if (!h2s)
|
||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
h2s->endp = cs_endpoint_new();
|
||||||
if (!endp)
|
if (!h2s->endp)
|
||||||
goto out_close;
|
goto out_close;
|
||||||
endp->target = h2s;
|
h2s->endp->target = h2s;
|
||||||
endp->ctx = h2c->conn;
|
h2s->endp->ctx = h2c->conn;
|
||||||
endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST);
|
h2s->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST);
|
||||||
|
|
||||||
/* FIXME wrong analogy between ext-connect and websocket, this need to
|
/* FIXME wrong analogy between ext-connect and websocket, this need to
|
||||||
* be refine.
|
* be refine.
|
||||||
*/
|
*/
|
||||||
if (flags & H2_SF_EXT_CONNECT_RCVD)
|
if (flags & H2_SF_EXT_CONNECT_RCVD)
|
||||||
endp->flags |= CS_EP_WEBSOCKET;
|
h2s->endp->flags |= CS_EP_WEBSOCKET;
|
||||||
|
|
||||||
/* The stream will record the request's accept date (which is either the
|
/* The stream will record the request's accept date (which is either the
|
||||||
* end of the connection's or the date immediately after the previous
|
* end of the connection's or the date immediately after the previous
|
||||||
@ -1621,9 +1625,8 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
|
|||||||
*/
|
*/
|
||||||
sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake;
|
sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake;
|
||||||
|
|
||||||
h2s->cs = cs_new_from_mux(endp, sess, input);
|
h2s->cs = cs_new_from_mux(h2s->endp, sess, input);
|
||||||
if (!h2s->cs) {
|
if (!h2s->cs) {
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto out_close;
|
goto out_close;
|
||||||
}
|
}
|
||||||
h2c->nb_cs++;
|
h2c->nb_cs++;
|
||||||
@ -1675,6 +1678,7 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s
|
|||||||
|
|
||||||
cs_attach_mux(cs, h2s, h2c->conn);
|
cs_attach_mux(cs, h2s, h2c->conn);
|
||||||
h2s->cs = cs;
|
h2s->cs = cs;
|
||||||
|
h2s->endp = cs->endp;
|
||||||
h2s->sess = sess;
|
h2s->sess = sess;
|
||||||
h2c->nb_cs++;
|
h2c->nb_cs++;
|
||||||
|
|
||||||
@ -4080,7 +4084,7 @@ static int h2_process(struct h2c *h2c)
|
|||||||
|
|
||||||
while (node) {
|
while (node) {
|
||||||
h2s = container_of(node, struct h2s, by_id);
|
h2s = container_of(node, struct h2s, by_id);
|
||||||
if (h2s->cs && h2s->cs->endp->flags & CS_EP_WAIT_FOR_HS)
|
if (h2s->cs && h2s->endp->flags & CS_EP_WAIT_FOR_HS)
|
||||||
h2s_notify_recv(h2s);
|
h2s_notify_recv(h2s);
|
||||||
node = eb32_next(node);
|
node = eb32_next(node);
|
||||||
}
|
}
|
||||||
@ -5322,7 +5326,7 @@ static size_t h2s_frt_make_resp_headers(struct h2s *h2s, struct htx *htx)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!h2s->cs || h2s->cs->endp->flags & CS_EP_SHW) {
|
if (!h2s->cs || h2s->endp->flags & CS_EP_SHW) {
|
||||||
/* Response already closed: add END_STREAM */
|
/* Response already closed: add END_STREAM */
|
||||||
es_now = 1;
|
es_now = 1;
|
||||||
}
|
}
|
||||||
@ -5742,7 +5746,7 @@ static size_t h2s_bck_make_req_headers(struct h2s *h2s, struct htx *htx)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!h2s->cs || h2s->cs->endp->flags & CS_EP_SHW) {
|
if (!h2s->cs || h2s->endp->flags & CS_EP_SHW) {
|
||||||
/* Request already closed: add END_STREAM */
|
/* Request already closed: add END_STREAM */
|
||||||
es_now = 1;
|
es_now = 1;
|
||||||
}
|
}
|
||||||
|
44
src/mux_pt.c
44
src/mux_pt.c
@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
struct mux_pt_ctx {
|
struct mux_pt_ctx {
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
|
struct cs_endpoint *endp;
|
||||||
struct connection *conn;
|
struct connection *conn;
|
||||||
struct wait_event wait_event;
|
struct wait_event wait_event;
|
||||||
};
|
};
|
||||||
@ -207,6 +208,8 @@ static void mux_pt_destroy(struct mux_pt_ctx *ctx)
|
|||||||
if (conn && ctx->wait_event.events != 0)
|
if (conn && ctx->wait_event.events != 0)
|
||||||
conn->xprt->unsubscribe(conn, conn->xprt_ctx, ctx->wait_event.events,
|
conn->xprt->unsubscribe(conn, conn->xprt_ctx, ctx->wait_event.events,
|
||||||
&ctx->wait_event);
|
&ctx->wait_event);
|
||||||
|
BUG_ON(ctx->endp && !(ctx->endp->flags & CS_EP_ORPHAN));
|
||||||
|
cs_endpoint_free(ctx->endp);
|
||||||
pool_free(pool_head_pt_ctx, ctx);
|
pool_free(pool_head_pt_ctx, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,7 +275,6 @@ struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status)
|
|||||||
static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess,
|
static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess,
|
||||||
struct buffer *input)
|
struct buffer *input)
|
||||||
{
|
{
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs = conn->ctx;
|
struct conn_stream *cs = conn->ctx;
|
||||||
struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
|
struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
|
||||||
|
|
||||||
@ -292,21 +294,27 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
|
|||||||
ctx->conn = conn;
|
ctx->conn = conn;
|
||||||
|
|
||||||
if (!cs) {
|
if (!cs) {
|
||||||
endp = cs_endpoint_new();
|
ctx->endp = cs_endpoint_new();
|
||||||
if (!endp)
|
if (!ctx->endp) {
|
||||||
goto fail_free_ctx;
|
|
||||||
endp->target = ctx;
|
|
||||||
endp->ctx = conn;
|
|
||||||
endp->flags |= CS_EP_T_MUX;
|
|
||||||
|
|
||||||
cs = cs_new_from_mux(endp, sess, input);
|
|
||||||
if (!cs) {
|
|
||||||
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
|
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto fail_free_ctx;
|
goto fail_free_ctx;
|
||||||
}
|
}
|
||||||
|
ctx->endp->target = ctx;
|
||||||
|
ctx->endp->ctx = conn;
|
||||||
|
ctx->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN);
|
||||||
|
|
||||||
|
cs = cs_new_from_mux(ctx->endp, sess, input);
|
||||||
|
if (!cs) {
|
||||||
|
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
|
||||||
|
goto fail_free_endp;
|
||||||
|
}
|
||||||
TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
|
TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
cs_attach_mux(cs, ctx, conn);
|
||||||
|
ctx->cs = cs;
|
||||||
|
ctx->endp = cs->endp;
|
||||||
|
}
|
||||||
conn->ctx = ctx;
|
conn->ctx = ctx;
|
||||||
ctx->cs = cs;
|
ctx->cs = cs;
|
||||||
cs->flags |= CS_FL_RCV_MORE;
|
cs->flags |= CS_FL_RCV_MORE;
|
||||||
@ -316,6 +324,8 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
|
|||||||
TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
|
TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
fail_free_endp:
|
||||||
|
cs_endpoint_free(ctx->endp);
|
||||||
fail_free_ctx:
|
fail_free_ctx:
|
||||||
if (ctx->wait_event.tasklet)
|
if (ctx->wait_event.tasklet)
|
||||||
tasklet_free(ctx->wait_event.tasklet);
|
tasklet_free(ctx->wait_event.tasklet);
|
||||||
@ -399,8 +409,11 @@ static void mux_pt_destroy_meth(void *ctx)
|
|||||||
struct mux_pt_ctx *pt = ctx;
|
struct mux_pt_ctx *pt = ctx;
|
||||||
|
|
||||||
TRACE_POINT(PT_EV_CONN_END, pt->conn, pt->cs);
|
TRACE_POINT(PT_EV_CONN_END, pt->conn, pt->cs);
|
||||||
if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt)
|
if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt) {
|
||||||
|
if (pt->conn->ctx != pt)
|
||||||
|
pt->endp = NULL;
|
||||||
mux_pt_destroy(pt);
|
mux_pt_destroy(pt);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -411,15 +424,14 @@ static void mux_pt_detach(struct conn_stream *cs)
|
|||||||
struct connection *conn = __cs_conn(cs);
|
struct connection *conn = __cs_conn(cs);
|
||||||
struct mux_pt_ctx *ctx;
|
struct mux_pt_ctx *ctx;
|
||||||
|
|
||||||
ALREADY_CHECKED(conn);
|
|
||||||
ctx = conn->ctx;
|
|
||||||
|
|
||||||
TRACE_ENTER(PT_EV_STRM_END, conn, cs);
|
TRACE_ENTER(PT_EV_STRM_END, conn, cs);
|
||||||
|
|
||||||
|
ctx = conn->ctx;
|
||||||
|
ctx->cs = NULL;
|
||||||
|
|
||||||
/* Subscribe, to know if we got disconnected */
|
/* Subscribe, to know if we got disconnected */
|
||||||
if (!conn_is_back(conn) && conn->owner != NULL &&
|
if (!conn_is_back(conn) && conn->owner != NULL &&
|
||||||
!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
|
!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
|
||||||
ctx->cs = NULL;
|
|
||||||
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
|
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
|
||||||
} else {
|
} else {
|
||||||
/* There's no session attached to that connection, destroy it */
|
/* There's no session attached to that connection, destroy it */
|
||||||
|
@ -117,9 +117,19 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
qcs->cs = NULL;
|
qcs->cs = NULL;
|
||||||
qcs->flags = QC_SF_NONE;
|
qcs->flags = QC_SF_NONE;
|
||||||
|
|
||||||
|
qcs->endp = cs_endpoint_new();
|
||||||
|
if (!qcs->endp) {
|
||||||
|
pool_free(pool_head_qcs, qcs);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
qcs->endp->target = qcs;
|
||||||
|
qcs->endp->ctx = qcc->conn;
|
||||||
|
qcs->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST);
|
||||||
|
|
||||||
qcs->id = id;
|
qcs->id = id;
|
||||||
/* store transport layer stream descriptor in qcc tree */
|
/* store transport layer stream descriptor in qcc tree */
|
||||||
eb64_insert(&qcc->streams_by_id, &stream->by_id);
|
eb64_insert(&qcc->streams_by_id, &stream->by_id);
|
||||||
|
|
||||||
qcc->strms[type].nb_streams++;
|
qcc->strms[type].nb_streams++;
|
||||||
|
|
||||||
/* If stream is local, use peer remote-limit, or else the opposite. */
|
/* If stream is local, use peer remote-limit, or else the opposite. */
|
||||||
@ -160,6 +170,8 @@ void qcs_free(struct qcs *qcs)
|
|||||||
/* stream desc must be removed from MUX tree before release it */
|
/* stream desc must be removed from MUX tree before release it */
|
||||||
eb64_delete(&qcs->stream->by_id);
|
eb64_delete(&qcs->stream->by_id);
|
||||||
qc_stream_desc_release(qcs->stream, qcs->qcc->conn->handle.qc);
|
qc_stream_desc_release(qcs->stream, qcs->qcc->conn->handle.qc);
|
||||||
|
BUG_ON(qcs->endp && !(qcs->endp->flags & CS_EP_ORPHAN));
|
||||||
|
cs_endpoint_free(qcs->endp);
|
||||||
pool_free(pool_head_qcs, qcs);
|
pool_free(pool_head_qcs, qcs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
13
src/peers.c
13
src/peers.c
@ -3181,7 +3181,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
struct proxy *p = peers->peers_fe; /* attached frontend */
|
struct proxy *p = peers->peers_fe; /* attached frontend */
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
struct sockaddr_storage *addr = NULL;
|
struct sockaddr_storage *addr = NULL;
|
||||||
@ -3193,7 +3192,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
peer->last_hdshk = now_ms;
|
peer->last_hdshk = now_ms;
|
||||||
s = NULL;
|
s = NULL;
|
||||||
|
|
||||||
appctx = appctx_new(&peer_applet);
|
appctx = appctx_new(&peer_applet, NULL);
|
||||||
if (!appctx)
|
if (!appctx)
|
||||||
goto out_close;
|
goto out_close;
|
||||||
|
|
||||||
@ -3209,17 +3208,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
|
if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
|
||||||
if (!endp)
|
|
||||||
goto out_free_addr;
|
|
||||||
endp->target = appctx;
|
|
||||||
endp->ctx = appctx;
|
|
||||||
endp->flags |= CS_EP_T_APPLET;
|
|
||||||
|
|
||||||
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
|
|
||||||
if (!cs) {
|
if (!cs) {
|
||||||
ha_alert("Failed to initialize stream in peer_session_create().\n");
|
ha_alert("Failed to initialize stream in peer_session_create().\n");
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto out_free_addr;
|
goto out_free_addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
13
src/sink.c
13
src/sink.c
@ -636,7 +636,6 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
|
|||||||
struct proxy *p = sink->forward_px;
|
struct proxy *p = sink->forward_px;
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct cs_endpoint *endp;
|
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
struct applet *applet = &sink_forward_applet;
|
struct applet *applet = &sink_forward_applet;
|
||||||
@ -645,7 +644,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
|
|||||||
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
|
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
|
||||||
applet = &sink_forward_oc_applet;
|
applet = &sink_forward_oc_applet;
|
||||||
|
|
||||||
appctx = appctx_new(applet);
|
appctx = appctx_new(applet, NULL);
|
||||||
if (!appctx)
|
if (!appctx)
|
||||||
goto out_close;
|
goto out_close;
|
||||||
|
|
||||||
@ -660,17 +659,9 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
|
|||||||
if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
|
if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
|
|
||||||
endp = cs_endpoint_new();
|
cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
|
||||||
if (!endp)
|
|
||||||
goto out_free_addr;
|
|
||||||
endp->target = appctx;
|
|
||||||
endp->ctx = appctx;
|
|
||||||
endp->flags |= CS_EP_T_APPLET;
|
|
||||||
|
|
||||||
cs = cs_new_from_applet(endp, sess, &BUF_NULL);
|
|
||||||
if (!cs) {
|
if (!cs) {
|
||||||
ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
|
ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
|
||||||
cs_endpoint_free(endp);
|
|
||||||
goto out_free_addr;
|
goto out_free_addr;
|
||||||
}
|
}
|
||||||
s = DISGUISE(cs_strm(cs));
|
s = DISGUISE(cs_strm(cs));
|
||||||
|
@ -339,7 +339,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a
|
|||||||
|
|
||||||
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
|
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
|
||||||
|
|
||||||
appctx = appctx_new(app);
|
appctx = appctx_new(app, si->cs->endp);
|
||||||
if (!appctx)
|
if (!appctx)
|
||||||
return NULL;
|
return NULL;
|
||||||
cs_attach_applet(si->cs, appctx, appctx);
|
cs_attach_applet(si->cs, appctx, appctx);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user