mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-10 17:17:06 +02:00
MEDIUM: spoe: Directly xfer NOTIFY frame when SPOE applet is created
Instead of using a buffer from the SPOE filter to store the NOTIFY frame, to copy it in a trash buffer in the SPOE applet to add meta-data and then tranfer it to the channel, the original buffer is directly transfered to the channel during the SPOE applet creation. The SPOE applet is thus simplied, the I/O handler is now only responsible to retrieve the ACK reply. The related issue is #2502.
This commit is contained in:
parent
6b9daec93d
commit
07cf7769ce
184
src/flt_spoe.c
184
src/flt_spoe.c
@ -74,8 +74,7 @@ enum spoe_ctx_state {
|
|||||||
|
|
||||||
/* All possible states for a SPOE applet */
|
/* All possible states for a SPOE applet */
|
||||||
enum spoe_appctx_state {
|
enum spoe_appctx_state {
|
||||||
SPOE_APPCTX_ST_PROCESSING = 0,
|
SPOE_APPCTX_ST_WAITING_ACK = 0,
|
||||||
SPOE_APPCTX_ST_WAITING_SYNC_ACK,
|
|
||||||
SPOE_APPCTX_ST_EXIT,
|
SPOE_APPCTX_ST_EXIT,
|
||||||
SPOE_APPCTX_ST_END,
|
SPOE_APPCTX_ST_END,
|
||||||
};
|
};
|
||||||
@ -266,7 +265,7 @@ struct flt_ops spoe_ops;
|
|||||||
|
|
||||||
static int spoe_acquire_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
|
static int spoe_acquire_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
|
||||||
static void spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
|
static void spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
|
||||||
static struct appctx *spoe_create_appctx(struct spoe_config *conf);
|
static struct appctx *spoe_create_appctx(struct spoe_context *ctx);
|
||||||
|
|
||||||
/********************************************************************
|
/********************************************************************
|
||||||
* helper functions/globals
|
* helper functions/globals
|
||||||
@ -371,38 +370,8 @@ static inline void spoe_update_stat_time(ullong *since, long *t)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/********************************************************************
|
/********************************************************************
|
||||||
* Functions that encode/decode SPOE frames
|
* Functions that decode SPOE frames
|
||||||
********************************************************************/
|
********************************************************************/
|
||||||
|
|
||||||
/* Encode the NOTIFY frame sent by HAProxy to an agent. It returns the number of
|
|
||||||
* encoded bytes in the frame on success, 0 if an encoding error occurred and -1
|
|
||||||
* if a fatal error occurred. */
|
|
||||||
static int spoe_prepare_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
|
|
||||||
{
|
|
||||||
char *p, *end;
|
|
||||||
size_t sz;
|
|
||||||
|
|
||||||
p = frame;
|
|
||||||
end = frame+size;
|
|
||||||
|
|
||||||
/* Set Frame type */
|
|
||||||
*p++ = SPOP_FRM_T_HAPROXY_NOTIFY;
|
|
||||||
|
|
||||||
/* Copy encoded messages, if possible */
|
|
||||||
sz = b_data(&SPOE_APPCTX(appctx)->spoe_ctx->buffer);
|
|
||||||
if (p + sz >= end)
|
|
||||||
goto too_big;
|
|
||||||
memcpy(p, b_head(&SPOE_APPCTX(appctx)->spoe_ctx->buffer), sz);
|
|
||||||
b_del(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, sz);
|
|
||||||
p += sz;
|
|
||||||
|
|
||||||
return (p - frame);
|
|
||||||
|
|
||||||
too_big:
|
|
||||||
SPOE_APPCTX(appctx)->status_code = SPOP_ERR_TOO_BIG;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Decode ACK frame sent by an agent. It returns the number of read bytes on
|
/* Decode ACK frame sent by an agent. It returns the number of read bytes on
|
||||||
* success, 0 if the frame can be ignored and -1 if an error occurred. */
|
* success, 0 if the frame can be ignored and -1 if an error occurred. */
|
||||||
static int spoe_handle_agentack_frame(struct appctx *appctx, char *frame, size_t size)
|
static int spoe_handle_agentack_frame(struct appctx *appctx, char *frame, size_t size)
|
||||||
@ -428,19 +397,6 @@ static int spoe_handle_agentack_frame(struct appctx *appctx, char *frame, size_t
|
|||||||
return (p - frame);
|
return (p - frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
|
|
||||||
* the frame can be ignored, 1 to retry later, and the frame length on
|
|
||||||
* success. */
|
|
||||||
static int spoe_send_frame(struct appctx *appctx, char *buf, size_t framesz)
|
|
||||||
{
|
|
||||||
int ret;
|
|
||||||
|
|
||||||
ret = applet_putblk(appctx, buf, framesz);
|
|
||||||
if (ret <= 0)
|
|
||||||
return 1; /* retry */
|
|
||||||
return framesz;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
|
/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
|
||||||
* when the frame can be ignored, 1 to retry later and the frame length on
|
* when the frame can be ignored, 1 to retry later and the frame length on
|
||||||
* success. */
|
* success. */
|
||||||
@ -493,7 +449,7 @@ static int spoe_init_appctx(struct appctx *appctx)
|
|||||||
struct spoe_agent *agent = spoe_appctx->agent;
|
struct spoe_agent *agent = spoe_appctx->agent;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
|
|
||||||
if (appctx_finalize_startup(appctx, &agent->fe, &BUF_NULL) == -1)
|
if (appctx_finalize_startup(appctx, &agent->fe, &spoe_appctx->spoe_ctx->buffer) == -1)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
spoe_appctx->owner = appctx;
|
spoe_appctx->owner = appctx;
|
||||||
@ -511,7 +467,7 @@ static int spoe_init_appctx(struct appctx *appctx)
|
|||||||
s->do_log = NULL;
|
s->do_log = NULL;
|
||||||
s->scb->flags |= SC_FL_RCV_ONCE;
|
s->scb->flags |= SC_FL_RCV_ONCE;
|
||||||
|
|
||||||
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
|
appctx->st0 = SPOE_APPCTX_ST_WAITING_ACK;
|
||||||
appctx_wakeup(appctx);
|
appctx_wakeup(appctx);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
@ -563,51 +519,6 @@ static void spoe_release_appctx(struct appctx *appctx)
|
|||||||
pool_free(pool_head_spoe_appctx, spoe_appctx);
|
pool_free(pool_head_spoe_appctx, spoe_appctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int spoe_handle_sending_frame_appctx(struct appctx *appctx)
|
|
||||||
{
|
|
||||||
char *frame, *buf;
|
|
||||||
int ret;
|
|
||||||
|
|
||||||
/* 4 bytes are reserved at the beginning of <buf> to store the frame
|
|
||||||
* length. */
|
|
||||||
buf = trash.area; frame = buf;
|
|
||||||
|
|
||||||
if (!SPOE_APPCTX(appctx)->spoe_ctx) {
|
|
||||||
appctx->st0 = SPOE_APPCTX_ST_EXIT;
|
|
||||||
ret = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
ret = spoe_prepare_hanotify_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
|
|
||||||
if (ret > 1)
|
|
||||||
ret = spoe_send_frame(appctx, buf, ret);
|
|
||||||
|
|
||||||
switch (ret) {
|
|
||||||
case -1: /* error */
|
|
||||||
case 0: /* ignore */
|
|
||||||
spoe_release_buffer(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, &SPOE_APPCTX(appctx)->spoe_ctx->buffer_wait);
|
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->spoe_appctx = NULL;
|
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_ERROR;
|
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
|
|
||||||
task_wakeup(SPOE_APPCTX(appctx)->spoe_ctx->strm->task, TASK_WOKEN_MSG);
|
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx = NULL;
|
|
||||||
appctx->st0 = SPOE_APPCTX_ST_EXIT;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 1: /* retry */
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
spoe_release_buffer(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, &SPOE_APPCTX(appctx)->spoe_ctx->buffer_wait);
|
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_WAITING_ACK;
|
|
||||||
appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
end:
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int spoe_handle_receiving_frame_appctx(struct appctx *appctx)
|
static int spoe_handle_receiving_frame_appctx(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
char *frame;
|
char *frame;
|
||||||
@ -622,16 +533,12 @@ static int spoe_handle_receiving_frame_appctx(struct appctx *appctx)
|
|||||||
switch (ret) {
|
switch (ret) {
|
||||||
case -1: /* error */
|
case -1: /* error */
|
||||||
spoe_release_buffer(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, &SPOE_APPCTX(appctx)->spoe_ctx->buffer_wait);
|
spoe_release_buffer(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, &SPOE_APPCTX(appctx)->spoe_ctx->buffer_wait);
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->spoe_appctx = NULL;
|
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_ERROR;
|
SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_ERROR;
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
|
SPOE_APPCTX(appctx)->spoe_ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
|
||||||
task_wakeup(SPOE_APPCTX(appctx)->spoe_ctx->strm->task, TASK_WOKEN_MSG);
|
goto exit;
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx = NULL;
|
|
||||||
appctx->st0 = SPOE_APPCTX_ST_EXIT;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 0: /* ignore */
|
case 0: /* ignore */
|
||||||
break;
|
goto out;
|
||||||
|
|
||||||
case 1: /* retry */
|
case 1: /* retry */
|
||||||
goto end;
|
goto end;
|
||||||
@ -640,62 +547,24 @@ static int spoe_handle_receiving_frame_appctx(struct appctx *appctx)
|
|||||||
SPOE_APPCTX(appctx)->spoe_ctx->buffer = SPOE_APPCTX(appctx)->buffer;
|
SPOE_APPCTX(appctx)->spoe_ctx->buffer = SPOE_APPCTX(appctx)->buffer;
|
||||||
SPOE_APPCTX(appctx)->buffer = BUF_NULL;
|
SPOE_APPCTX(appctx)->buffer = BUF_NULL;
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_DONE;
|
SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_DONE;
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx->spoe_appctx = NULL;
|
goto exit;
|
||||||
task_wakeup(SPOE_APPCTX(appctx)->spoe_ctx->strm->task, TASK_WOKEN_MSG);
|
|
||||||
SPOE_APPCTX(appctx)->spoe_ctx = NULL;
|
|
||||||
appctx->st0 = SPOE_APPCTX_ST_EXIT;
|
|
||||||
ret = -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
out:
|
||||||
/* Do not forget to remove processed frame from the output buffer */
|
/* Do not forget to remove processed frame from the output buffer */
|
||||||
if (trash.data)
|
if (trash.data)
|
||||||
co_skip(sc_oc(appctx_sc(appctx)), trash.data);
|
co_skip(sc_oc(appctx_sc(appctx)), trash.data);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
|
||||||
|
|
||||||
static int spoe_handle_processing_appctx(struct appctx *appctx)
|
exit:
|
||||||
{
|
SPOE_APPCTX(appctx)->spoe_ctx->spoe_appctx = NULL;
|
||||||
int ret;
|
task_wakeup(SPOE_APPCTX(appctx)->spoe_ctx->strm->task, TASK_WOKEN_MSG);
|
||||||
|
SPOE_APPCTX(appctx)->spoe_ctx = NULL;
|
||||||
/* receiving_frame loop */
|
appctx->st0 = SPOE_APPCTX_ST_EXIT;
|
||||||
while (1) {
|
ret = 0;
|
||||||
ret = spoe_handle_receiving_frame_appctx(appctx);
|
goto out;
|
||||||
switch (ret) {
|
|
||||||
case -1: /* error */
|
|
||||||
goto next;
|
|
||||||
case 0: /* ignore */
|
|
||||||
break;
|
|
||||||
case 1: /* retry */
|
|
||||||
goto send;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
send:
|
|
||||||
if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
|
|
||||||
goto end;
|
|
||||||
|
|
||||||
ret = spoe_handle_sending_frame_appctx(appctx);
|
|
||||||
switch (ret) {
|
|
||||||
case -1: /* error */
|
|
||||||
goto next;
|
|
||||||
|
|
||||||
case 0: /* ignore */
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 1: /* retry */
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
end:
|
|
||||||
return 1;
|
|
||||||
next:
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* I/O Handler processing messages exchanged with the agent */
|
/* I/O Handler processing messages exchanged with the agent */
|
||||||
@ -717,9 +586,9 @@ static void spoe_handle_appctx(struct appctx *appctx)
|
|||||||
|
|
||||||
switchstate:
|
switchstate:
|
||||||
switch (appctx->st0) {
|
switch (appctx->st0) {
|
||||||
case SPOE_APPCTX_ST_PROCESSING:
|
/* case SPOE_APPCTX_ST_PROCESSING: */
|
||||||
case SPOE_APPCTX_ST_WAITING_SYNC_ACK:
|
case SPOE_APPCTX_ST_WAITING_ACK:
|
||||||
if (spoe_handle_processing_appctx(appctx))
|
if (spoe_handle_receiving_frame_appctx(appctx))
|
||||||
break;
|
break;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
|
|
||||||
@ -749,8 +618,9 @@ struct applet spoe_applet = {
|
|||||||
|
|
||||||
/* Create a SPOE applet. On success, the created applet is returned, else
|
/* Create a SPOE applet. On success, the created applet is returned, else
|
||||||
* NULL. */
|
* NULL. */
|
||||||
static struct appctx *spoe_create_appctx(struct spoe_config *conf)
|
static struct appctx *spoe_create_appctx(struct spoe_context *ctx)
|
||||||
{
|
{
|
||||||
|
struct spoe_config *conf = FLT_CONF(ctx->filter);
|
||||||
struct spoe_agent *agent = conf->agent;
|
struct spoe_agent *agent = conf->agent;
|
||||||
struct spoe_appctx *spoe_appctx;
|
struct spoe_appctx *spoe_appctx;
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
@ -765,7 +635,8 @@ static struct appctx *spoe_create_appctx(struct spoe_config *conf)
|
|||||||
spoe_appctx->flags = 0;
|
spoe_appctx->flags = 0;
|
||||||
spoe_appctx->status_code = SPOP_ERR_NONE;
|
spoe_appctx->status_code = SPOP_ERR_NONE;
|
||||||
spoe_appctx->buffer = BUF_NULL;
|
spoe_appctx->buffer = BUF_NULL;
|
||||||
|
spoe_appctx->spoe_ctx = ctx;
|
||||||
|
ctx->spoe_appctx = spoe_appctx;
|
||||||
|
|
||||||
if ((appctx = appctx_new_here(&spoe_applet, NULL)) == NULL)
|
if ((appctx = appctx_new_here(&spoe_applet, NULL)) == NULL)
|
||||||
goto out_free_spoe_appctx;
|
goto out_free_spoe_appctx;
|
||||||
@ -860,6 +731,9 @@ static int spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
|
|||||||
p = b_head(&ctx->buffer);
|
p = b_head(&ctx->buffer);
|
||||||
end = p + agent->max_frame_size - SPOP_FRAME_HDR_SIZE;
|
end = p + agent->max_frame_size - SPOP_FRAME_HDR_SIZE;
|
||||||
|
|
||||||
|
/* Set Frame type */
|
||||||
|
*p++ = SPOP_FRM_T_HAPROXY_NOTIFY;
|
||||||
|
|
||||||
if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
|
if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
|
||||||
list_for_each_entry(msg, messages, by_evt) {
|
list_for_each_entry(msg, messages, by_evt) {
|
||||||
if (spoe_encode_message(s, ctx, msg, dir, &p, end) == -1)
|
if (spoe_encode_message(s, ctx, msg, dir, &p, end) == -1)
|
||||||
@ -1195,13 +1069,11 @@ static int spoe_process_messages(struct stream *s, struct spoe_context *ctx,
|
|||||||
goto end;
|
goto end;
|
||||||
if (!ret)
|
if (!ret)
|
||||||
goto skip;
|
goto skip;
|
||||||
appctx = spoe_create_appctx(conf);
|
appctx = spoe_create_appctx(ctx);
|
||||||
if (!appctx) {
|
if (!appctx) {
|
||||||
ctx->status_code = SPOE_CTX_ERR_RES;
|
ctx->status_code = SPOE_CTX_ERR_RES;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
ctx->spoe_appctx = SPOE_APPCTX(appctx);
|
|
||||||
ctx->spoe_appctx->spoe_ctx = ctx;
|
|
||||||
ctx->state = SPOE_CTX_ST_SENDING_MSGS;
|
ctx->state = SPOE_CTX_ST_SENDING_MSGS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user