mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 07:37:02 +02:00
MINOR: spoe: Remove SPOE details from the appctx structure
Now, as for peers, we use an opaque pointer to store information related to the SPOE filter in appctx structure. These information are now stored in a dedicated structure (spoe_appctx) and allocated, using a pool, when the applet is created. This removes the dependency between applets and the SPOE filter and avoids to eventually inflate the appctx structure.
This commit is contained in:
parent
f95b111dde
commit
42bfa46234
@ -84,13 +84,7 @@ struct appctx {
|
|||||||
struct task *task;
|
struct task *task;
|
||||||
} hlua_apphttp; /* used by the Lua HTTP services */
|
} hlua_apphttp; /* used by the Lua HTTP services */
|
||||||
struct {
|
struct {
|
||||||
struct task *task;
|
void *ptr; /* private pointer for SPOE filter */
|
||||||
void *agent;
|
|
||||||
unsigned int version;
|
|
||||||
unsigned int max_frame_size;
|
|
||||||
unsigned int flags;
|
|
||||||
struct list waiting_queue;
|
|
||||||
struct list list;
|
|
||||||
} spoe; /* used by SPOE filter */
|
} spoe; /* used by SPOE filter */
|
||||||
struct {
|
struct {
|
||||||
const char *msg; /* pointer to a persistent message to be returned in CLI_ST_PRINT state */
|
const char *msg; /* pointer to a persistent message to be returned in CLI_ST_PRINT state */
|
||||||
|
216
src/flt_spoe.c
216
src/flt_spoe.c
@ -49,9 +49,6 @@
|
|||||||
#define SPOE_PRINTF(x...)
|
#define SPOE_PRINTF(x...)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Helper to get ctx inside an appctx */
|
|
||||||
#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
|
|
||||||
|
|
||||||
/* Minimal size for a frame */
|
/* Minimal size for a frame */
|
||||||
#define MIN_FRAME_SIZE 256
|
#define MIN_FRAME_SIZE 256
|
||||||
|
|
||||||
@ -252,6 +249,22 @@ struct spoe_context {
|
|||||||
unsigned int process_exp; /* expiration date to process an event */
|
unsigned int process_exp; /* expiration date to process an event */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* SPOE context inside a appctx */
|
||||||
|
struct spoe_appctx {
|
||||||
|
struct appctx *owner; /* the owner */
|
||||||
|
struct task *task; /* task to handle applet timeouts */
|
||||||
|
struct spoe_agent *agent; /* agent on which the applet is attached */
|
||||||
|
|
||||||
|
unsigned int version; /* the negotiated version */
|
||||||
|
unsigned int max_frame_size; /* the negotiated max-frame-size value */
|
||||||
|
unsigned int flags; /* SPOE_APPCTX_FL_* */
|
||||||
|
|
||||||
|
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
|
||||||
|
struct list list; /* next spoe appctx for the same agent */
|
||||||
|
};
|
||||||
|
|
||||||
|
#define SPOE_APPCTX(appctx) ((struct spoe_appctx *)((appctx)->ctx.spoe.ptr))
|
||||||
|
|
||||||
/* SPOE filter id. Used to identify SPOE filters */
|
/* SPOE filter id. Used to identify SPOE filters */
|
||||||
const char *spoe_filter_id = "SPOE filter";
|
const char *spoe_filter_id = "SPOE filter";
|
||||||
|
|
||||||
@ -274,8 +287,9 @@ struct spoe_message *curmsg = NULL;
|
|||||||
struct list curmsgs;
|
struct list curmsgs;
|
||||||
struct list curmps;
|
struct list curmps;
|
||||||
|
|
||||||
/* Pool used to allocate new SPOE contexts */
|
/* Pools used to allocate SPOE structs */
|
||||||
static struct pool_head *pool2_spoe_ctx = NULL;
|
static struct pool_head *pool2_spoe_ctx = NULL;
|
||||||
|
static struct pool_head *pool2_spoe_appctx = NULL;
|
||||||
|
|
||||||
/* Temporary variables used to ease error processing */
|
/* Temporary variables used to ease error processing */
|
||||||
int spoe_status_code = SPOE_FRM_ERR_NONE;
|
int spoe_status_code = SPOE_FRM_ERR_NONE;
|
||||||
@ -771,7 +785,7 @@ skip_spoe_action(char *frame, char *end)
|
|||||||
static int
|
static int
|
||||||
prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
|
prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
|
||||||
{
|
{
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
size_t max = (7 /* TYPE + METADATA */
|
size_t max = (7 /* TYPE + METADATA */
|
||||||
+ 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
|
+ 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
|
||||||
@ -804,7 +818,7 @@ prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
|
|||||||
/* "max-fram-size" K/V item */
|
/* "max-fram-size" K/V item */
|
||||||
idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
|
idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
|
||||||
frame[idx++] = SPOE_DATA_T_UINT32;
|
frame[idx++] = SPOE_DATA_T_UINT32;
|
||||||
idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
|
idx += encode_spoe_varint(SPOE_APPCTX(appctx)->max_frame_size, frame+idx);
|
||||||
|
|
||||||
/* "capabilities" K/V item */
|
/* "capabilities" K/V item */
|
||||||
idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
|
idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
|
||||||
@ -876,7 +890,7 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
|
|||||||
{
|
{
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
|
|
||||||
if (size < APPCTX_SPOE(appctx).max_frame_size)
|
if (size < SPOE_APPCTX(appctx)->max_frame_size)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
|
frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
|
||||||
@ -992,7 +1006,7 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
idx += i;
|
idx += i;
|
||||||
if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
|
if (sz < MIN_FRAME_SIZE || sz > SPOE_APPCTX(appctx)->max_frame_size) {
|
||||||
spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
|
spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -1054,9 +1068,9 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
APPCTX_SPOE(appctx).version = (unsigned int)vsn;
|
SPOE_APPCTX(appctx)->version = (unsigned int)vsn;
|
||||||
APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
|
SPOE_APPCTX(appctx)->max_frame_size = (unsigned int)max_frame_size;
|
||||||
APPCTX_SPOE(appctx).flags |= flags;
|
SPOE_APPCTX(appctx)->flags |= flags;
|
||||||
return idx;
|
return idx;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1159,7 +1173,7 @@ handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
|
|||||||
static int
|
static int
|
||||||
handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
|
handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
|
||||||
{
|
{
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
struct spoe_context *ctx, *back;
|
struct spoe_context *ctx, *back;
|
||||||
uint64_t stream_id, frame_id;
|
uint64_t stream_id, frame_id;
|
||||||
int i, idx = 0;
|
int i, idx = 0;
|
||||||
@ -1185,7 +1199,7 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
|
|||||||
return 0;
|
return 0;
|
||||||
idx += i;
|
idx += i;
|
||||||
|
|
||||||
if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) {
|
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
|
||||||
list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
|
list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
|
||||||
if (ctx->stream_id == (unsigned int)stream_id &&
|
if (ctx->stream_id == (unsigned int)stream_id &&
|
||||||
ctx->frame_id == (unsigned int)frame_id)
|
ctx->frame_id == (unsigned int)frame_id)
|
||||||
@ -1193,7 +1207,7 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
|
list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
|
||||||
if (ctx->stream_id == (unsigned int)stream_id &&
|
if (ctx->stream_id == (unsigned int)stream_id &&
|
||||||
ctx->frame_id == (unsigned int)frame_id)
|
ctx->frame_id == (unsigned int)frame_id)
|
||||||
goto found;
|
goto found;
|
||||||
@ -1226,17 +1240,21 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
|
|||||||
int
|
int
|
||||||
prepare_spoe_healthcheck_request(char **req, int *len)
|
prepare_spoe_healthcheck_request(char **req, int *len)
|
||||||
{
|
{
|
||||||
struct appctx a;
|
struct appctx appctx;
|
||||||
char *frame, buf[global.tune.bufsize];
|
struct spoe_appctx spoe_appctx;
|
||||||
unsigned int framesz;
|
char *frame, buf[global.tune.bufsize];
|
||||||
int idx;
|
unsigned int framesz;
|
||||||
|
int idx;
|
||||||
|
|
||||||
memset(&a, 0, sizeof(a));
|
memset(&appctx, 0, sizeof(appctx));
|
||||||
|
memset(&spoe_appctx, 0, sizeof(spoe_appctx));
|
||||||
memset(buf, 0, sizeof(buf));
|
memset(buf, 0, sizeof(buf));
|
||||||
APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
|
|
||||||
|
appctx.ctx.spoe.ptr = &spoe_appctx;
|
||||||
|
SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4;
|
||||||
|
|
||||||
frame = buf+4;
|
frame = buf+4;
|
||||||
idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
|
idx = prepare_spoe_hahello_frame(&appctx, frame, global.tune.bufsize-4);
|
||||||
if (idx <= 0)
|
if (idx <= 0)
|
||||||
return -1;
|
return -1;
|
||||||
if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
|
if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
|
||||||
@ -1262,15 +1280,19 @@ prepare_spoe_healthcheck_request(char **req, int *len)
|
|||||||
int
|
int
|
||||||
handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
|
handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
|
||||||
{
|
{
|
||||||
struct appctx a;
|
struct appctx appctx;
|
||||||
int r;
|
struct spoe_appctx spoe_appctx;
|
||||||
|
int r;
|
||||||
|
|
||||||
memset(&a, 0, sizeof(a));
|
memset(&appctx, 0, sizeof(appctx));
|
||||||
APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
|
memset(&spoe_appctx, 0, sizeof(spoe_appctx));
|
||||||
|
|
||||||
if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
|
appctx.ctx.spoe.ptr = &spoe_appctx;
|
||||||
|
SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4;
|
||||||
|
|
||||||
|
if (handle_spoe_agentdiscon_frame(&appctx, frame, size) != 0)
|
||||||
goto error;
|
goto error;
|
||||||
if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
|
if ((r = handle_spoe_agenthello_frame(&appctx, frame, size)) <= 0) {
|
||||||
if (r == 0)
|
if (r == 0)
|
||||||
spoe_status_code = SPOE_FRM_ERR_INVALID;
|
spoe_status_code = SPOE_FRM_ERR_INVALID;
|
||||||
goto error;
|
goto error;
|
||||||
@ -1326,7 +1348,7 @@ recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
|
|||||||
ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
|
ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
framesz = ntohl(netint);
|
framesz = ntohl(netint);
|
||||||
if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
|
if (framesz > SPOE_APPCTX(appctx)->max_frame_size) {
|
||||||
spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
|
spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -1368,7 +1390,7 @@ static void
|
|||||||
release_spoe_applet(struct appctx *appctx)
|
release_spoe_applet(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
struct stream_interface *si = appctx->owner;
|
struct stream_interface *si = appctx->owner;
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
struct spoe_context *ctx, *back;
|
struct spoe_context *ctx, *back;
|
||||||
|
|
||||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
|
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
|
||||||
@ -1376,9 +1398,9 @@ release_spoe_applet(struct appctx *appctx)
|
|||||||
__FUNCTION__, appctx);
|
__FUNCTION__, appctx);
|
||||||
|
|
||||||
agent->applets_act--;
|
agent->applets_act--;
|
||||||
if (!LIST_ISEMPTY(&APPCTX_SPOE(appctx).list)) {
|
if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->list)) {
|
||||||
LIST_DEL(&APPCTX_SPOE(appctx).list);
|
LIST_DEL(&SPOE_APPCTX(appctx)->list);
|
||||||
LIST_INIT(&APPCTX_SPOE(appctx).list);
|
LIST_INIT(&SPOE_APPCTX(appctx)->list);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (appctx->st0 != SPOE_APPCTX_ST_END) {
|
if (appctx->st0 != SPOE_APPCTX_ST_END) {
|
||||||
@ -1391,18 +1413,20 @@ release_spoe_applet(struct appctx *appctx)
|
|||||||
appctx->st0 = SPOE_APPCTX_ST_END;
|
appctx->st0 = SPOE_APPCTX_ST_END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (APPCTX_SPOE(appctx).task) {
|
if (SPOE_APPCTX(appctx)->task) {
|
||||||
task_delete(APPCTX_SPOE(appctx).task);
|
task_delete(SPOE_APPCTX(appctx)->task);
|
||||||
task_free(APPCTX_SPOE(appctx).task);
|
task_free(SPOE_APPCTX(appctx)->task);
|
||||||
}
|
}
|
||||||
|
|
||||||
list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
|
list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
|
||||||
LIST_DEL(&ctx->list);
|
LIST_DEL(&ctx->list);
|
||||||
LIST_INIT(&ctx->list);
|
LIST_INIT(&ctx->list);
|
||||||
ctx->state = SPOE_CTX_ST_ERROR;
|
ctx->state = SPOE_CTX_ST_ERROR;
|
||||||
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
|
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
|
||||||
|
|
||||||
if (!LIST_ISEMPTY(&agent->applets))
|
if (!LIST_ISEMPTY(&agent->applets))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -1425,7 +1449,7 @@ static int
|
|||||||
handle_connect_spoe_applet(struct appctx *appctx)
|
handle_connect_spoe_applet(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
struct stream_interface *si = appctx->owner;
|
struct stream_interface *si = appctx->owner;
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
char *frame = trash.str;
|
char *frame = trash.str;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
@ -1443,10 +1467,10 @@ handle_connect_spoe_applet(struct appctx *appctx)
|
|||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (APPCTX_SPOE(appctx).task->expire == TICK_ETERNITY)
|
if (SPOE_APPCTX(appctx)->task->expire == TICK_ETERNITY)
|
||||||
APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
|
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
|
||||||
|
|
||||||
ret = prepare_spoe_hahello_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
|
ret = prepare_spoe_hahello_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
|
||||||
if (ret > 1)
|
if (ret > 1)
|
||||||
ret = send_spoe_frame(appctx, frame, ret);
|
ret = send_spoe_frame(appctx, frame, ret);
|
||||||
|
|
||||||
@ -1479,7 +1503,7 @@ static int
|
|||||||
handle_connecting_spoe_applet(struct appctx *appctx)
|
handle_connecting_spoe_applet(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
struct stream_interface *si = appctx->owner;
|
struct stream_interface *si = appctx->owner;
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
char *frame = trash.str;
|
char *frame = trash.str;
|
||||||
int ret, framesz = 0;
|
int ret, framesz = 0;
|
||||||
|
|
||||||
@ -1493,7 +1517,7 @@ handle_connecting_spoe_applet(struct appctx *appctx)
|
|||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
|
ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
|
||||||
if (ret > 1) {
|
if (ret > 1) {
|
||||||
if (*frame == SPOE_FRM_T_AGENT_DISCON) {
|
if (*frame == SPOE_FRM_T_AGENT_DISCON) {
|
||||||
appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
|
appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
|
||||||
@ -1527,11 +1551,13 @@ handle_connecting_spoe_applet(struct appctx *appctx)
|
|||||||
bo_skip(si_oc(si), framesz+4);
|
bo_skip(si_oc(si), framesz+4);
|
||||||
agent->applets_idle++;
|
agent->applets_idle++;
|
||||||
appctx->st0 = SPOE_APPCTX_ST_IDLE;
|
appctx->st0 = SPOE_APPCTX_ST_IDLE;
|
||||||
|
LIST_DEL(&SPOE_APPCTX(appctx)->list);
|
||||||
|
LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
|
||||||
goto next;
|
goto next;
|
||||||
}
|
}
|
||||||
|
|
||||||
next:
|
next:
|
||||||
APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
||||||
return 0;
|
return 0;
|
||||||
stop:
|
stop:
|
||||||
return 1;
|
return 1;
|
||||||
@ -1544,7 +1570,7 @@ static int
|
|||||||
handle_processing_spoe_applet(struct appctx *appctx)
|
handle_processing_spoe_applet(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
struct stream_interface *si = appctx->owner;
|
struct stream_interface *si = appctx->owner;
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
struct spoe_context *ctx;
|
struct spoe_context *ctx;
|
||||||
char *frame = trash.str;
|
char *frame = trash.str;
|
||||||
unsigned int fpa = 0;
|
unsigned int fpa = 0;
|
||||||
@ -1566,8 +1592,8 @@ handle_processing_spoe_applet(struct appctx *appctx)
|
|||||||
|
|
||||||
/* Frames must be handled synchronously and a the applet is waiting for
|
/* Frames must be handled synchronously and a the applet is waiting for
|
||||||
* a ACK frame */
|
* a ACK frame */
|
||||||
if (!(APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
|
if (!(SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
|
||||||
!LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
|
!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
|
||||||
if (skip_receiving)
|
if (skip_receiving)
|
||||||
goto stop;
|
goto stop;
|
||||||
goto recv_frame;
|
goto recv_frame;
|
||||||
@ -1579,7 +1605,7 @@ handle_processing_spoe_applet(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
|
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
|
||||||
ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
|
ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
|
||||||
if (ret > 1)
|
if (ret > 1)
|
||||||
ret = send_spoe_frame(appctx, frame, ret);
|
ret = send_spoe_frame(appctx, frame, ret);
|
||||||
|
|
||||||
@ -1609,10 +1635,10 @@ handle_processing_spoe_applet(struct appctx *appctx)
|
|||||||
release_spoe_buffer(ctx);
|
release_spoe_buffer(ctx);
|
||||||
LIST_DEL(&ctx->list);
|
LIST_DEL(&ctx->list);
|
||||||
LIST_INIT(&ctx->list);
|
LIST_INIT(&ctx->list);
|
||||||
if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC)
|
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
|
||||||
LIST_ADDQ(&agent->waiting_queue, &ctx->list);
|
LIST_ADDQ(&agent->waiting_queue, &ctx->list);
|
||||||
else
|
else
|
||||||
LIST_ADDQ(&APPCTX_SPOE(appctx).waiting_queue, &ctx->list);
|
LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
|
||||||
fpa++;
|
fpa++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1624,7 +1650,7 @@ handle_processing_spoe_applet(struct appctx *appctx)
|
|||||||
goto process;
|
goto process;
|
||||||
|
|
||||||
framesz = 0;
|
framesz = 0;
|
||||||
ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
|
ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
|
||||||
if (ret > 1) {
|
if (ret > 1) {
|
||||||
if (*frame == SPOE_FRM_T_AGENT_DISCON) {
|
if (*frame == SPOE_FRM_T_AGENT_DISCON) {
|
||||||
appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
|
appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
|
||||||
@ -1659,19 +1685,19 @@ handle_processing_spoe_applet(struct appctx *appctx)
|
|||||||
goto process;
|
goto process;
|
||||||
|
|
||||||
next:
|
next:
|
||||||
APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
||||||
return 0;
|
return 0;
|
||||||
stop:
|
stop:
|
||||||
if ((APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
|
if ((SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
|
||||||
LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
|
LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
|
||||||
agent->applets_idle++;
|
agent->applets_idle++;
|
||||||
appctx->st0 = SPOE_APPCTX_ST_IDLE;
|
appctx->st0 = SPOE_APPCTX_ST_IDLE;
|
||||||
}
|
}
|
||||||
if (fpa || (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_PERSIST)) {
|
if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
|
||||||
LIST_DEL(&APPCTX_SPOE(appctx).list);
|
LIST_DEL(&SPOE_APPCTX(appctx)->list);
|
||||||
LIST_ADD(&agent->applets, &APPCTX_SPOE(appctx).list);
|
LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
|
||||||
if (fpa)
|
if (fpa)
|
||||||
APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
@ -1684,7 +1710,7 @@ static int
|
|||||||
handle_disconnect_spoe_applet(struct appctx *appctx)
|
handle_disconnect_spoe_applet(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
struct stream_interface *si = appctx->owner;
|
struct stream_interface *si = appctx->owner;
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
char *frame = trash.str;
|
char *frame = trash.str;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
@ -1694,7 +1720,7 @@ handle_disconnect_spoe_applet(struct appctx *appctx)
|
|||||||
if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
|
if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
|
||||||
goto exit;
|
goto exit;
|
||||||
|
|
||||||
ret = prepare_spoe_hadiscon_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
|
ret = prepare_spoe_hadiscon_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
|
||||||
if (ret > 1)
|
if (ret > 1)
|
||||||
ret = send_spoe_frame(appctx, frame, ret);
|
ret = send_spoe_frame(appctx, frame, ret);
|
||||||
|
|
||||||
@ -1721,7 +1747,7 @@ handle_disconnect_spoe_applet(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
next:
|
next:
|
||||||
APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
||||||
return 0;
|
return 0;
|
||||||
stop:
|
stop:
|
||||||
return 1;
|
return 1;
|
||||||
@ -1744,7 +1770,7 @@ handle_disconnecting_spoe_applet(struct appctx *appctx)
|
|||||||
goto exit;
|
goto exit;
|
||||||
|
|
||||||
framesz = 0;
|
framesz = 0;
|
||||||
ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
|
ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
|
||||||
if (ret > 1) {
|
if (ret > 1) {
|
||||||
framesz = ret;
|
framesz = ret;
|
||||||
ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
|
ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
|
||||||
@ -1757,7 +1783,7 @@ handle_disconnecting_spoe_applet(struct appctx *appctx)
|
|||||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
|
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
|
||||||
" - error on frame (%s)\n",
|
" - error on frame (%s)\n",
|
||||||
(int)now.tv_sec, (int)now.tv_usec,
|
(int)now.tv_sec, (int)now.tv_usec,
|
||||||
((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
|
((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
|
||||||
__FUNCTION__, appctx,
|
__FUNCTION__, appctx,
|
||||||
spoe_frm_err_reasons[spoe_status_code]);
|
spoe_frm_err_reasons[spoe_status_code]);
|
||||||
goto exit;
|
goto exit;
|
||||||
@ -1776,7 +1802,7 @@ handle_disconnecting_spoe_applet(struct appctx *appctx)
|
|||||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
|
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
|
||||||
" - disconnected by peer (%d): %s\n",
|
" - disconnected by peer (%d): %s\n",
|
||||||
(int)now.tv_sec, (int)now.tv_usec,
|
(int)now.tv_sec, (int)now.tv_usec,
|
||||||
((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
|
((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
|
||||||
__FUNCTION__, appctx, spoe_status_code,
|
__FUNCTION__, appctx, spoe_status_code,
|
||||||
spoe_reason);
|
spoe_reason);
|
||||||
goto exit;
|
goto exit;
|
||||||
@ -1796,7 +1822,7 @@ static void
|
|||||||
handle_spoe_applet(struct appctx *appctx)
|
handle_spoe_applet(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
struct stream_interface *si = appctx->owner;
|
struct stream_interface *si = appctx->owner;
|
||||||
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
|
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
|
||||||
|
|
||||||
switchstate:
|
switchstate:
|
||||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
|
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
|
||||||
@ -1819,8 +1845,8 @@ handle_spoe_applet(struct appctx *appctx)
|
|||||||
case SPOE_APPCTX_ST_IDLE:
|
case SPOE_APPCTX_ST_IDLE:
|
||||||
if (stopping &&
|
if (stopping &&
|
||||||
LIST_ISEMPTY(&agent->sending_queue) &&
|
LIST_ISEMPTY(&agent->sending_queue) &&
|
||||||
LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
|
LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
|
||||||
APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
|
||||||
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
|
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1848,15 +1874,15 @@ handle_spoe_applet(struct appctx *appctx)
|
|||||||
si_shutr(si);
|
si_shutr(si);
|
||||||
si_ic(si)->flags |= CF_READ_NULL;
|
si_ic(si)->flags |= CF_READ_NULL;
|
||||||
appctx->st0 = SPOE_APPCTX_ST_END;
|
appctx->st0 = SPOE_APPCTX_ST_END;
|
||||||
APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
|
SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
|
||||||
/* fall through */
|
/* fall through */
|
||||||
|
|
||||||
case SPOE_APPCTX_ST_END:
|
case SPOE_APPCTX_ST_END:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
out:
|
out:
|
||||||
if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
|
if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
|
||||||
task_queue(APPCTX_SPOE(appctx).task);
|
task_queue(SPOE_APPCTX(appctx)->task);
|
||||||
si_oc(si)->flags |= CF_READ_DONTWAIT;
|
si_oc(si)->flags |= CF_READ_DONTWAIT;
|
||||||
task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
|
task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
|
||||||
}
|
}
|
||||||
@ -1881,19 +1907,25 @@ create_spoe_appctx(struct spoe_config *conf)
|
|||||||
if ((appctx = appctx_new(&spoe_applet)) == NULL)
|
if ((appctx = appctx_new(&spoe_applet)) == NULL)
|
||||||
goto out_error;
|
goto out_error;
|
||||||
|
|
||||||
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
|
appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx);
|
||||||
if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
|
if (SPOE_APPCTX(appctx) == NULL)
|
||||||
goto out_free_appctx;
|
goto out_free_appctx;
|
||||||
APPCTX_SPOE(appctx).task->process = process_spoe_applet;
|
|
||||||
APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;//tick_add_ifset(now_ms, conf->agent->timeout.hello);
|
|
||||||
APPCTX_SPOE(appctx).task->context = appctx;
|
|
||||||
APPCTX_SPOE(appctx).agent = conf->agent;
|
|
||||||
APPCTX_SPOE(appctx).version = 0;
|
|
||||||
APPCTX_SPOE(appctx).max_frame_size = conf->agent->max_frame_size;
|
|
||||||
APPCTX_SPOE(appctx).flags = 0;
|
|
||||||
|
|
||||||
LIST_INIT(&APPCTX_SPOE(appctx).list);
|
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
|
||||||
LIST_INIT(&APPCTX_SPOE(appctx).waiting_queue);
|
if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
|
||||||
|
goto out_free_spoe_appctx;
|
||||||
|
|
||||||
|
SPOE_APPCTX(appctx)->owner = appctx;
|
||||||
|
SPOE_APPCTX(appctx)->task->process = process_spoe_applet;
|
||||||
|
SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
|
||||||
|
SPOE_APPCTX(appctx)->task->context = appctx;
|
||||||
|
SPOE_APPCTX(appctx)->agent = conf->agent;
|
||||||
|
SPOE_APPCTX(appctx)->version = 0;
|
||||||
|
SPOE_APPCTX(appctx)->max_frame_size = conf->agent->max_frame_size;
|
||||||
|
SPOE_APPCTX(appctx)->flags = 0;
|
||||||
|
|
||||||
|
LIST_INIT(&SPOE_APPCTX(appctx)->list);
|
||||||
|
LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
|
||||||
|
|
||||||
sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
|
sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
|
||||||
if (!sess)
|
if (!sess)
|
||||||
@ -1918,8 +1950,8 @@ create_spoe_appctx(struct spoe_config *conf)
|
|||||||
jobs++;
|
jobs++;
|
||||||
totalconn++;
|
totalconn++;
|
||||||
|
|
||||||
task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
|
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
|
||||||
LIST_ADDQ(&conf->agent->applets, &APPCTX_SPOE(appctx).list);
|
LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
|
||||||
conf->agent->applets_act++;
|
conf->agent->applets_act++;
|
||||||
return appctx;
|
return appctx;
|
||||||
|
|
||||||
@ -1929,7 +1961,9 @@ create_spoe_appctx(struct spoe_config *conf)
|
|||||||
out_free_sess:
|
out_free_sess:
|
||||||
session_free(sess);
|
session_free(sess);
|
||||||
out_free_spoe:
|
out_free_spoe:
|
||||||
task_free(APPCTX_SPOE(appctx).task);
|
task_free(SPOE_APPCTX(appctx)->task);
|
||||||
|
out_free_spoe_appctx:
|
||||||
|
pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
|
||||||
out_free_appctx:
|
out_free_appctx:
|
||||||
appctx_free(appctx);
|
appctx_free(appctx);
|
||||||
out_error:
|
out_error:
|
||||||
@ -1942,6 +1976,7 @@ queue_spoe_context(struct spoe_context *ctx)
|
|||||||
struct spoe_config *conf = FLT_CONF(ctx->filter);
|
struct spoe_config *conf = FLT_CONF(ctx->filter);
|
||||||
struct spoe_agent *agent = conf->agent;
|
struct spoe_agent *agent = conf->agent;
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
|
struct spoe_appctx *spoe_appctx;
|
||||||
unsigned int min_applets;
|
unsigned int min_applets;
|
||||||
|
|
||||||
min_applets = min_applets_act(agent);
|
min_applets = min_applets_act(agent);
|
||||||
@ -1986,7 +2021,7 @@ queue_spoe_context(struct spoe_context *ctx)
|
|||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (agent->applets_act <= min_applets)
|
if (agent->applets_act <= min_applets)
|
||||||
APPCTX_SPOE(appctx).flags |= SPOE_APPCTX_FL_PERSIST;
|
SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
|
||||||
|
|
||||||
/* Increase the per-process number of cumulated connections */
|
/* Increase the per-process number of cumulated connections */
|
||||||
if (agent->cps_max > 0)
|
if (agent->cps_max > 0)
|
||||||
@ -2011,13 +2046,14 @@ queue_spoe_context(struct spoe_context *ctx)
|
|||||||
|
|
||||||
/* Finally try to wakeup the first IDLE applet found and move it at the
|
/* Finally try to wakeup the first IDLE applet found and move it at the
|
||||||
* end of the list. */
|
* end of the list. */
|
||||||
list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
|
list_for_each_entry(spoe_appctx, &agent->applets, list) {
|
||||||
|
appctx = spoe_appctx->owner;
|
||||||
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
|
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
|
||||||
si_applet_want_get(appctx->owner);
|
si_applet_want_get(appctx->owner);
|
||||||
si_applet_want_put(appctx->owner);
|
si_applet_want_put(appctx->owner);
|
||||||
appctx_wakeup(appctx);
|
appctx_wakeup(appctx);
|
||||||
LIST_DEL(&APPCTX_SPOE(appctx).list);
|
LIST_DEL(&spoe_appctx->list);
|
||||||
LIST_ADDQ(&agent->applets, &APPCTX_SPOE(appctx).list);
|
LIST_ADDQ(&agent->applets, &spoe_appctx->list);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2569,6 +2605,7 @@ sig_stop_spoe(struct sig_handler *sh)
|
|||||||
struct spoe_config *conf;
|
struct spoe_config *conf;
|
||||||
struct spoe_agent *agent;
|
struct spoe_agent *agent;
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
|
struct spoe_appctx *spoe_appctx;
|
||||||
|
|
||||||
if (fconf->id != spoe_filter_id)
|
if (fconf->id != spoe_filter_id)
|
||||||
continue;
|
continue;
|
||||||
@ -2576,7 +2613,8 @@ sig_stop_spoe(struct sig_handler *sh)
|
|||||||
conf = fconf->conf;
|
conf = fconf->conf;
|
||||||
agent = conf->agent;
|
agent = conf->agent;
|
||||||
|
|
||||||
list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
|
list_for_each_entry(spoe_appctx, &agent->applets, list) {
|
||||||
|
appctx = spoe_appctx->owner;
|
||||||
si_applet_want_get(appctx->owner);
|
si_applet_want_get(appctx->owner);
|
||||||
si_applet_want_put(appctx->owner);
|
si_applet_want_put(appctx->owner);
|
||||||
appctx_wakeup(appctx);
|
appctx_wakeup(appctx);
|
||||||
@ -3491,6 +3529,7 @@ static void __spoe_init(void)
|
|||||||
LIST_INIT(&curmsgs);
|
LIST_INIT(&curmsgs);
|
||||||
LIST_INIT(&curmps);
|
LIST_INIT(&curmps);
|
||||||
pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
|
pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
|
||||||
|
pool2_spoe_appctx = create_pool("spoe_appctx", sizeof(struct spoe_appctx), MEM_F_SHARED);
|
||||||
}
|
}
|
||||||
|
|
||||||
__attribute__((destructor))
|
__attribute__((destructor))
|
||||||
@ -3498,4 +3537,5 @@ static void
|
|||||||
__spoe_deinit(void)
|
__spoe_deinit(void)
|
||||||
{
|
{
|
||||||
pool_destroy2(pool2_spoe_ctx);
|
pool_destroy2(pool2_spoe_ctx);
|
||||||
|
pool_destroy2(pool2_spoe_appctx);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user