mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-23 06:41:32 +02:00
MEDIUM: spoe: Remove fragmentation support
It is the first patch of a long series to refactor the SPOE filter. The idea is to rely on a dedicated multiplexer instead of hakcing HAProxy with a list of applets processing a message queue. First of all, optionnal features will be removed. Some will be restored at the end, some others will just be removed. It is the case here. The frame fragmentation support is removed. The only purpose of this feature is to be able to support the streaming. Because it is out of the scope of this refactoring, the fragmentation is removed. The related issue is #2502.
This commit is contained in:
parent
249a547f37
commit
e3c92209f7
@ -42,8 +42,8 @@
|
||||
#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
|
||||
#define SPOE_FL_PIPELINING 0x00000002 /* Set when SPOE agent supports pipelining (set by default) */
|
||||
#define SPOE_FL_ASYNC 0x00000004 /* Set when SPOE agent supports async (set by default) */
|
||||
#define SPOE_FL_SND_FRAGMENTATION 0x00000008 /* Set when SPOE agent supports sending fragmented payload */
|
||||
#define SPOE_FL_RCV_FRAGMENTATION 0x00000010 /* Set when SPOE agent supports receiving fragmented payload */
|
||||
/* unsused 0x00000008 */
|
||||
/* unused 0x00000010 */
|
||||
#define SPOE_FL_FORCE_SET_VAR 0x00000020 /* Set when SPOE agent will set all variables from agent (and not only known variables) */
|
||||
|
||||
/* Flags set on the SPOE context */
|
||||
@ -51,14 +51,14 @@
|
||||
#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
|
||||
#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
|
||||
#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
|
||||
#define SPOE_CTX_FL_FRAGMENTED 0x00000010 /* Set when a fragmented frame is processing */
|
||||
/* unsued 0x00000010 */
|
||||
|
||||
#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
|
||||
|
||||
/* Flags set on the SPOE applet */
|
||||
#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
|
||||
#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronous frames is supported */
|
||||
#define SPOE_APPCTX_FL_FRAGMENTATION 0x00000004 /* Set if fragmentation is supported */
|
||||
/* unused 0x00000004 */
|
||||
|
||||
#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
|
||||
#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
|
||||
@ -92,7 +92,6 @@ enum spoe_appctx_state {
|
||||
SPOE_APPCTX_ST_CONNECTING,
|
||||
SPOE_APPCTX_ST_IDLE,
|
||||
SPOE_APPCTX_ST_PROCESSING,
|
||||
SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY,
|
||||
SPOE_APPCTX_ST_WAITING_SYNC_ACK,
|
||||
SPOE_APPCTX_ST_DISCONNECT,
|
||||
SPOE_APPCTX_ST_DISCONNECTING,
|
||||
@ -132,7 +131,6 @@ enum spoe_context_error {
|
||||
SPOE_CTX_ERR_TOUT,
|
||||
SPOE_CTX_ERR_RES,
|
||||
SPOE_CTX_ERR_TOO_BIG,
|
||||
SPOE_CTX_ERR_FRAG_FRAME_ABRT,
|
||||
SPOE_CTX_ERR_INTERRUPT,
|
||||
SPOE_CTX_ERR_UNKNOWN = 255,
|
||||
SPOE_CTX_ERRS,
|
||||
@ -357,22 +355,15 @@ struct spoe_context {
|
||||
unsigned int process_exp; /* expiration date to process an event */
|
||||
|
||||
struct spoe_appctx *spoe_appctx; /* SPOE appctx sending the current frame */
|
||||
struct {
|
||||
struct spoe_message *curmsg; /* SPOE message from which to resume encoding */
|
||||
struct spoe_arg *curarg; /* SPOE arg in <curmsg> from which to resume encoding */
|
||||
unsigned int curoff; /* offset in <curarg> from which to resume encoding */
|
||||
unsigned int curlen; /* length of <curarg> need to be encode, for SMP_F_MAY_CHANGE data */
|
||||
unsigned int flags; /* SPOE_FRM_FL_* */
|
||||
} frag_ctx; /* Info about fragmented frames, valid on if SPOE_CTX_FL_FRAGMENTED is set */
|
||||
|
||||
struct {
|
||||
ullong start_ts; /* start date of the current event/group */
|
||||
ullong request_ts; /* date the frame processing starts (reset for each frag) */
|
||||
ullong queue_ts; /* date the frame is queued (reset for each frag) */
|
||||
ullong request_ts; /* date the frame processing starts */
|
||||
ullong queue_ts; /* date the frame is queued */
|
||||
ullong wait_ts; /* date the stream starts waiting for a response */
|
||||
ullong response_ts; /* date the response processing starts */
|
||||
long t_request; /* delay to encode and push the frame in queue (cumulative for frags) */
|
||||
long t_queue; /* delay before the frame gets out the sending queue (cumulative for frags) */
|
||||
long t_request; /* delay to encode and push the frame in queue */
|
||||
long t_queue; /* delay before the frame gets out the sending queue */
|
||||
long t_waiting; /* delay before the response is received */
|
||||
long t_response; /* delay to process the response (from the stream pov) */
|
||||
long t_process; /* processing time of the last event/group */
|
||||
@ -402,12 +393,6 @@ struct spoe_appctx {
|
||||
struct list list; /* next spoe appctx for the same agent */
|
||||
struct eb32_node node; /* node used for applets tree */
|
||||
unsigned int cur_fpa;
|
||||
|
||||
struct {
|
||||
struct spoe_context *ctx; /* SPOE context owning the fragmented frame */
|
||||
unsigned int cursid; /* stream-id of the fragmented frame. used if the processing is aborted */
|
||||
unsigned int curfid; /* frame-id of the fragmented frame. used if the processing is aborted */
|
||||
} frag_ctx; /* Info about fragmented frames, unused for unfragmented frames */
|
||||
};
|
||||
|
||||
#endif /* _HAPROXY_SPOE_T_H */
|
||||
|
@ -57,35 +57,6 @@ spoe_encode_buffer(const char *str, size_t len, char **buf, char *end)
|
||||
return len;
|
||||
}
|
||||
|
||||
/* Encode a buffer, possibly partially. It does the same thing than
|
||||
* 'spoe_encode_buffer', but if there is not enough space, it does not fail.
|
||||
* On success, it returns the number of copied bytes and <*buf> is moved after
|
||||
* the encoded value. If an error occurred, it returns -1. */
|
||||
static inline int
|
||||
spoe_encode_frag_buffer(const char *str, size_t len, char **buf, char *end)
|
||||
{
|
||||
char *p = *buf;
|
||||
int ret;
|
||||
|
||||
if (p >= end)
|
||||
return -1;
|
||||
|
||||
if (!len) {
|
||||
*p++ = 0;
|
||||
*buf = p;
|
||||
return 0;
|
||||
}
|
||||
|
||||
ret = encode_varint(len, &p, end);
|
||||
if (ret == -1 || p >= end)
|
||||
return -1;
|
||||
|
||||
ret = (p+len < end) ? len : (end - p);
|
||||
memcpy(p, str, ret);
|
||||
*buf = p + ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Decode a buffer. The buffer length is decoded and saved in <*len>. <*str>
|
||||
* points on the first byte of the buffer.
|
||||
* On success, it returns the buffer length and <*buf> is moved after the
|
||||
@ -161,44 +132,12 @@ spoe_encode_data(struct sample *smp, char **buf, char *end)
|
||||
|
||||
case SMP_T_STR:
|
||||
case SMP_T_BIN: {
|
||||
/* If defined, get length and offset of the sample by reading the sample
|
||||
* context. ctx.a[0] is the pointer to the length and ctx.a[1] is the
|
||||
* pointer to the offset. If the offset is greater than 0, it means the
|
||||
* sample is partially encoded. In this case, we only need to encode the
|
||||
* remaining. When all the sample is encoded, the offset is reset to 0.
|
||||
* So the caller know it can try to encode the next sample. */
|
||||
struct buffer *chk = &smp->data.u.str;
|
||||
unsigned int *len = smp->ctx.a[0];
|
||||
unsigned int *off = smp->ctx.a[1];
|
||||
|
||||
if (!*off) {
|
||||
/* First evaluation of the sample : encode the
|
||||
* type (string or binary), the buffer length
|
||||
* (as a varint) and at least 1 byte of the
|
||||
* buffer. */
|
||||
struct buffer *chk = &smp->data.u.str;
|
||||
|
||||
*p++ = (smp->data.type == SMP_T_STR)
|
||||
? SPOE_DATA_T_STR
|
||||
: SPOE_DATA_T_BIN;
|
||||
ret = spoe_encode_frag_buffer(chk->area,
|
||||
chk->data, &p,
|
||||
end);
|
||||
if (ret == -1)
|
||||
return -1;
|
||||
*len = chk->data;
|
||||
}
|
||||
else {
|
||||
/* The sample has been fragmented, encode remaining data */
|
||||
ret = MIN(*len - *off, end - p);
|
||||
memcpy(p, chk->area + *off, ret);
|
||||
p += ret;
|
||||
}
|
||||
/* Now update <*off> */
|
||||
if (ret + *off != *len)
|
||||
*off += ret;
|
||||
else
|
||||
*off = 0;
|
||||
*p++ = (smp->data.type == SMP_T_STR) ? SPOE_DATA_T_STR : SPOE_DATA_T_BIN;
|
||||
ret = spoe_encode_buffer(chk->area, chk->data, &p, end);
|
||||
if (ret == -1)
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
|
||||
|
285
src/flt_spoe.c
285
src/flt_spoe.c
@ -233,7 +233,6 @@ static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
|
||||
[SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
|
||||
[SPOE_APPCTX_ST_IDLE] = "IDLE",
|
||||
[SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
|
||||
[SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY] = "SENDING_FRAG_NOTIFY",
|
||||
[SPOE_APPCTX_ST_WAITING_SYNC_ACK] = "WAITING_SYNC_ACK",
|
||||
[SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
|
||||
[SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
|
||||
@ -419,11 +418,6 @@ spoe_prepare_hahello_frame(struct appctx *appctx, char *frame, size_t size)
|
||||
memcpy(chk->area+chk->data, "async", 5);
|
||||
chk->data += 5;
|
||||
}
|
||||
if (agent != NULL && (agent->flags & SPOE_FL_RCV_FRAGMENTATION)) {
|
||||
if (chk->data) chk->area[chk->data++] = ',';
|
||||
memcpy(chk->area+chk->data, "fragmentation", 13);
|
||||
chk->data += 13;
|
||||
}
|
||||
if (spoe_encode_buffer(chk->area, chk->data, &p, end) == -1)
|
||||
goto too_big;
|
||||
|
||||
@ -523,15 +517,6 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
|
||||
stream_id = ctx->stream_id;
|
||||
frame_id = ctx->frame_id;
|
||||
|
||||
if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
|
||||
/* The fragmentation is not supported by the applet */
|
||||
if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
|
||||
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
|
||||
return -1;
|
||||
}
|
||||
flags = ctx->frag_ctx.flags;
|
||||
}
|
||||
|
||||
/* Set Frame type */
|
||||
*p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
|
||||
|
||||
@ -560,67 +545,6 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Encode next part of a fragmented frame sent by HAProxy to an agent. It
|
||||
* returns the number of encoded bytes in the frame on success, 0 if an encoding
|
||||
* error occurred and -1 if a fatal error occurred. */
|
||||
static int
|
||||
spoe_prepare_hafrag_frame(struct appctx *appctx, struct spoe_context *ctx,
|
||||
char *frame, size_t size)
|
||||
{
|
||||
char *p, *end;
|
||||
unsigned int stream_id, frame_id;
|
||||
unsigned int flags;
|
||||
size_t sz;
|
||||
|
||||
p = frame;
|
||||
end = frame+size;
|
||||
|
||||
/* <ctx> is null when the stream has aborted the processing of a
|
||||
* fragmented frame. In this case, we must notify the corresponding
|
||||
* agent using ids stored in <frag_ctx>. */
|
||||
if (ctx == NULL) {
|
||||
flags = (SPOE_FRM_FL_FIN|SPOE_FRM_FL_ABRT);
|
||||
stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
|
||||
frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid;
|
||||
}
|
||||
else {
|
||||
flags = ctx->frag_ctx.flags;
|
||||
stream_id = ctx->stream_id;
|
||||
frame_id = ctx->frame_id;
|
||||
}
|
||||
|
||||
/* Set Frame type */
|
||||
*p++ = SPOE_FRM_T_UNSET;
|
||||
|
||||
/* Set flags */
|
||||
flags = htonl(flags);
|
||||
memcpy(p, (char *)&flags, 4);
|
||||
p += 4;
|
||||
|
||||
/* Set stream-id and frame-id */
|
||||
if (encode_varint(stream_id, &p, end) == -1)
|
||||
goto too_big;
|
||||
if (encode_varint(frame_id, &p, end) == -1)
|
||||
goto too_big;
|
||||
|
||||
if (ctx == NULL)
|
||||
goto end;
|
||||
|
||||
/* Copy encoded messages, if possible */
|
||||
sz = b_data(&ctx->buffer);
|
||||
if (p + sz >= end)
|
||||
goto too_big;
|
||||
memcpy(p, b_head(&ctx->buffer), sz);
|
||||
p += sz;
|
||||
|
||||
end:
|
||||
return (p - frame);
|
||||
|
||||
too_big:
|
||||
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Decode and process the HELLO frame sent by an agent. It returns the number of
|
||||
* read bytes on success, 0 if a decoding error occurred, and -1 if a fatal
|
||||
* error occurred. */
|
||||
@ -763,11 +687,6 @@ spoe_handle_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
|
||||
if (!sz || isspace((unsigned char)*str) || *str == ',')
|
||||
flags |= SPOE_APPCTX_FL_ASYNC;
|
||||
}
|
||||
else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) {
|
||||
str += 13; sz -= 13;
|
||||
if (!sz || isspace((unsigned char)*str) || *str == ',')
|
||||
flags |= SPOE_APPCTX_FL_FRAGMENTATION;
|
||||
}
|
||||
|
||||
/* Get the next comma or break */
|
||||
if (!sz || (delim = memchr(str, ',', sz)) == NULL)
|
||||
@ -982,24 +901,6 @@ spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
|
||||
}
|
||||
}
|
||||
|
||||
if (SPOE_APPCTX(appctx)->frag_ctx.ctx &&
|
||||
SPOE_APPCTX(appctx)->frag_ctx.cursid == (unsigned int)stream_id &&
|
||||
SPOE_APPCTX(appctx)->frag_ctx.curfid == (unsigned int)frame_id) {
|
||||
|
||||
/* ABRT bit is set for an unfinished fragmented frame */
|
||||
if (flags & SPOE_FRM_FL_ABRT) {
|
||||
*ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
|
||||
(*ctx)->state = SPOE_CTX_ST_ERROR;
|
||||
(*ctx)->status_code = SPOE_CTX_ERR_FRAG_FRAME_ABRT;
|
||||
/* Ignore the payload */
|
||||
goto end;
|
||||
}
|
||||
/* TODO: Handle more flags for fragmented frames: RESUME, FINISH... */
|
||||
/* For now, we ignore the ack */
|
||||
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* No Stream found, ignore the frame */
|
||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
|
||||
" - Ignore ACK frame"
|
||||
@ -1014,8 +915,7 @@ spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
|
||||
* but not if there is no longer frame waiting for a ack
|
||||
* (timeout)
|
||||
*/
|
||||
if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue) ||
|
||||
SPOE_APPCTX(appctx)->frag_ctx.ctx)
|
||||
if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue))
|
||||
return -1;
|
||||
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
|
||||
SPOE_APPCTX(appctx)->cur_fpa = 0;
|
||||
@ -1307,16 +1207,6 @@ spoe_release_appctx(struct appctx *appctx)
|
||||
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
|
||||
}
|
||||
|
||||
/* If the applet was processing a fragmented frame, report an error to
|
||||
* the corresponding stream. */
|
||||
if (spoe_appctx->frag_ctx.ctx) {
|
||||
ctx = spoe_appctx->frag_ctx.ctx;
|
||||
ctx->spoe_appctx = NULL;
|
||||
ctx->state = SPOE_CTX_ST_ERROR;
|
||||
ctx->status_code = (spoe_appctx->status_code + 0x100);
|
||||
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
|
||||
}
|
||||
|
||||
if (!LIST_ISEMPTY(&agent->rt[tid].applets)) {
|
||||
/* If there are still some running applets, remove reference on
|
||||
* the current one from streams in the async waiting queue. In
|
||||
@ -1523,12 +1413,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
|
||||
* length. */
|
||||
buf = trash.area; frame = buf+4;
|
||||
|
||||
if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
|
||||
ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
|
||||
ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
|
||||
SPOE_APPCTX(appctx)->max_frame_size);
|
||||
}
|
||||
else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
|
||||
if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
|
||||
*skip = 1;
|
||||
ret = 1;
|
||||
goto end;
|
||||
@ -1549,9 +1434,6 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
|
||||
goto end;
|
||||
|
||||
case 0: /* ignore */
|
||||
if (ctx == NULL)
|
||||
goto abort_frag_frame;
|
||||
|
||||
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
|
||||
LIST_DELETE(&ctx->list);
|
||||
LIST_INIT(&ctx->list);
|
||||
@ -1569,34 +1451,17 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
|
||||
break;
|
||||
|
||||
default:
|
||||
if (ctx == NULL)
|
||||
goto abort_frag_frame;
|
||||
|
||||
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
|
||||
LIST_DELETE(&ctx->list);
|
||||
LIST_INIT(&ctx->list);
|
||||
_HA_ATOMIC_DEC(&agent->counters.nb_sending);
|
||||
spoe_update_stat_time(&ctx->stats.queue_ts, &ctx->stats.t_queue);
|
||||
ctx->spoe_appctx = SPOE_APPCTX(appctx);
|
||||
if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
|
||||
(ctx->frag_ctx.flags & SPOE_FRM_FL_FIN))
|
||||
goto no_frag_frame_sent;
|
||||
else
|
||||
goto frag_frame_sent;
|
||||
goto frame_sent;
|
||||
}
|
||||
goto end;
|
||||
|
||||
frag_frame_sent:
|
||||
appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
|
||||
*skip = 1;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
|
||||
ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
|
||||
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
|
||||
goto end;
|
||||
|
||||
no_frag_frame_sent:
|
||||
frame_sent:
|
||||
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
|
||||
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
|
||||
LIST_APPEND(&agent->rt[tid].waiting_queue, &ctx->list);
|
||||
@ -1612,20 +1477,9 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
|
||||
}
|
||||
_HA_ATOMIC_INC(&agent->counters.nb_waiting);
|
||||
ctx->stats.wait_ts = now_ns;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
|
||||
SPOE_APPCTX(appctx)->cur_fpa++;
|
||||
|
||||
ctx->state = SPOE_CTX_ST_WAITING_ACK;
|
||||
goto end;
|
||||
|
||||
abort_frag_frame:
|
||||
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
|
||||
goto end;
|
||||
|
||||
end:
|
||||
return ret;
|
||||
@ -1673,14 +1527,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
|
||||
ctx->spoe_appctx->cur_fpa--;
|
||||
ctx->spoe_appctx = NULL;
|
||||
}
|
||||
if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY &&
|
||||
ctx == SPOE_APPCTX(appctx)->frag_ctx.ctx) {
|
||||
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
|
||||
SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
|
||||
}
|
||||
else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
|
||||
if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
|
||||
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
|
||||
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
|
||||
break;
|
||||
@ -1967,7 +1814,6 @@ spoe_handle_appctx(struct appctx *appctx)
|
||||
__fallthrough;
|
||||
|
||||
case SPOE_APPCTX_ST_PROCESSING:
|
||||
case SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY:
|
||||
case SPOE_APPCTX_ST_WAITING_SYNC_ACK:
|
||||
if (spoe_handle_processing_appctx(appctx))
|
||||
goto out;
|
||||
@ -2145,8 +1991,7 @@ spoe_queue_context(struct spoe_context *ctx)
|
||||
/***************************************************************************
|
||||
* Functions that encode SPOE messages
|
||||
**************************************************************************/
|
||||
/* Encode a SPOE message. Info in <ctx->frag_ctx>, if any, are used to handle
|
||||
* fragmented_content. If the next message can be processed, it returns 0. If
|
||||
/* Encode a SPOE message. If the next message can be processed, it returns 0. If
|
||||
* the message is too big, it returns -1.*/
|
||||
static int
|
||||
spoe_encode_message(struct stream *s, struct spoe_context *ctx,
|
||||
@ -2168,15 +2013,6 @@ spoe_encode_message(struct stream *s, struct spoe_context *ctx,
|
||||
goto next;
|
||||
}
|
||||
|
||||
/* Resume encoding of a SPOE argument */
|
||||
if (ctx->frag_ctx.curarg != NULL) {
|
||||
arg = ctx->frag_ctx.curarg;
|
||||
goto encode_argument;
|
||||
}
|
||||
|
||||
if (ctx->frag_ctx.curoff != UINT_MAX)
|
||||
goto encode_msg_payload;
|
||||
|
||||
/* Check if there is enough space for the message name and the
|
||||
* number of arguments. It implies <msg->id_len> is encoded on 2
|
||||
* bytes, at most (< 2288). */
|
||||
@ -2191,34 +2027,16 @@ spoe_encode_message(struct stream *s, struct spoe_context *ctx,
|
||||
**buf = msg->nargs;
|
||||
(*buf)++;
|
||||
|
||||
ctx->frag_ctx.curoff = 0;
|
||||
encode_msg_payload:
|
||||
|
||||
/* Loop on arguments */
|
||||
list_for_each_entry(arg, &msg->args, list) {
|
||||
ctx->frag_ctx.curarg = arg;
|
||||
ctx->frag_ctx.curoff = UINT_MAX;
|
||||
ctx->frag_ctx.curlen = 0;
|
||||
|
||||
encode_argument:
|
||||
if (ctx->frag_ctx.curoff != UINT_MAX)
|
||||
goto encode_arg_value;
|
||||
|
||||
/* Encode the argument name as a string. It can by NULL */
|
||||
if (spoe_encode_buffer(arg->name, arg->name_len, buf, end) == -1)
|
||||
goto too_big;
|
||||
|
||||
ctx->frag_ctx.curoff = 0;
|
||||
encode_arg_value:
|
||||
|
||||
/* Fetch the argument value */
|
||||
smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
|
||||
if (smp) {
|
||||
smp->ctx.a[0] = &ctx->frag_ctx.curlen;
|
||||
smp->ctx.a[1] = &ctx->frag_ctx.curoff;
|
||||
}
|
||||
ret = spoe_encode_data(smp, buf, end);
|
||||
if (ret == -1 || ctx->frag_ctx.curoff)
|
||||
if (ret == -1)
|
||||
goto too_big;
|
||||
}
|
||||
|
||||
@ -2229,10 +2047,8 @@ spoe_encode_message(struct stream *s, struct spoe_context *ctx,
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Encode list of SPOE messages. Info in <ctx->frag_ctx>, if any, are used to
|
||||
* handle fragmented content. On success it returns 1. If an error occurred, -1
|
||||
* is returned. If nothing has been encoded, it returns 0 (this is only possible
|
||||
* for unfragmented payload). */
|
||||
/* Encode list of SPOE messages. On success it returns 1. If an error occurred, -1
|
||||
* is returned. If nothing has been encoded, it returns 0. */
|
||||
static int
|
||||
spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
|
||||
struct list *messages, int dir, int type)
|
||||
@ -2246,34 +2062,13 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
|
||||
end = p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
|
||||
|
||||
if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
|
||||
/* Resume encoding of a SPOE message */
|
||||
if (ctx->frag_ctx.curmsg != NULL) {
|
||||
msg = ctx->frag_ctx.curmsg;
|
||||
goto encode_evt_message;
|
||||
}
|
||||
|
||||
list_for_each_entry(msg, messages, by_evt) {
|
||||
ctx->frag_ctx.curmsg = msg;
|
||||
ctx->frag_ctx.curarg = NULL;
|
||||
ctx->frag_ctx.curoff = UINT_MAX;
|
||||
|
||||
encode_evt_message:
|
||||
if (spoe_encode_message(s, ctx, msg, dir, &p, end) == -1)
|
||||
goto too_big;
|
||||
}
|
||||
}
|
||||
else if (type == SPOE_MSGS_BY_GROUP) { /* Loop on messages by group */
|
||||
/* Resume encoding of a SPOE message */
|
||||
if (ctx->frag_ctx.curmsg != NULL) {
|
||||
msg = ctx->frag_ctx.curmsg;
|
||||
goto encode_grp_message;
|
||||
}
|
||||
|
||||
list_for_each_entry(msg, messages, by_grp) {
|
||||
ctx->frag_ctx.curmsg = msg;
|
||||
ctx->frag_ctx.curarg = NULL;
|
||||
ctx->frag_ctx.curoff = UINT_MAX;
|
||||
|
||||
encode_grp_message:
|
||||
if (spoe_encode_message(s, ctx, msg, dir, &p, end) == -1)
|
||||
goto too_big;
|
||||
@ -2283,48 +2078,25 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
|
||||
goto skip;
|
||||
|
||||
|
||||
/* nothing has been encoded for an unfragmented payload */
|
||||
if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) && p == b_head(&ctx->buffer))
|
||||
/* nothing has been encoded */
|
||||
if (p == b_head(&ctx->buffer))
|
||||
goto skip;
|
||||
|
||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
||||
" - encode %s messages - spoe_appctx=%p"
|
||||
" - encode messages - spoe_appctx=%p"
|
||||
"- max_size=%u - encoded=%ld\n",
|
||||
(int)date.tv_sec, (int)date.tv_usec,
|
||||
agent->id, __FUNCTION__, s,
|
||||
((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
|
||||
ctx->spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE),
|
||||
p - b_head(&ctx->buffer));
|
||||
|
||||
b_set_data(&ctx->buffer, p - b_head(&ctx->buffer));
|
||||
ctx->frag_ctx.curmsg = NULL;
|
||||
ctx->frag_ctx.curarg = NULL;
|
||||
ctx->frag_ctx.curoff = 0;
|
||||
ctx->frag_ctx.flags = SPOE_FRM_FL_FIN;
|
||||
|
||||
return 1;
|
||||
|
||||
too_big:
|
||||
/* Return an error if fragmentation is unsupported or if nothing has
|
||||
* been encoded because its too big and not splittable. */
|
||||
if (!(agent->flags & SPOE_FL_SND_FRAGMENTATION) || p == b_head(&ctx->buffer)) {
|
||||
ctx->status_code = SPOE_CTX_ERR_TOO_BIG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
||||
" - encode fragmented messages - spoe_appctx=%p"
|
||||
" - curmsg=%p - curarg=%p - curoff=%u"
|
||||
" - max_size=%u - encoded=%ld\n",
|
||||
(int)date.tv_sec, (int)date.tv_usec,
|
||||
agent->id, __FUNCTION__, s, ctx->spoe_appctx,
|
||||
ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
|
||||
(agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - b_head(&ctx->buffer));
|
||||
|
||||
b_set_data(&ctx->buffer, p - b_head(&ctx->buffer));
|
||||
ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
|
||||
ctx->frag_ctx.flags &= ~SPOE_FRM_FL_FIN;
|
||||
return 1;
|
||||
/* Return an error if nothing has been encoded because its too big */
|
||||
ctx->status_code = SPOE_CTX_ERR_TOO_BIG;
|
||||
return -1;
|
||||
|
||||
skip:
|
||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
||||
@ -2610,18 +2382,12 @@ spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx)
|
||||
if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
|
||||
return;
|
||||
_HA_ATOMIC_INC(&agent->counters.nb_processed);
|
||||
if (sa) {
|
||||
if (sa->frag_ctx.ctx == ctx) {
|
||||
sa->frag_ctx.ctx = NULL;
|
||||
spoe_wakeup_appctx(sa->owner);
|
||||
}
|
||||
else
|
||||
sa->cur_fpa--;
|
||||
}
|
||||
if (sa)
|
||||
sa->cur_fpa--;
|
||||
|
||||
/* Reset the flag to allow next processing */
|
||||
agent->rt[tid].processing--;
|
||||
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
|
||||
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
|
||||
|
||||
/* Reset processing timer */
|
||||
ctx->process_exp = TICK_ETERNITY;
|
||||
@ -2630,11 +2396,7 @@ spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx)
|
||||
|
||||
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
|
||||
|
||||
ctx->spoe_appctx = NULL;
|
||||
ctx->frag_ctx.curmsg = NULL;
|
||||
ctx->frag_ctx.curarg = NULL;
|
||||
ctx->frag_ctx.curoff = 0;
|
||||
ctx->frag_ctx.flags = 0;
|
||||
ctx->spoe_appctx = NULL;
|
||||
|
||||
if (!LIST_ISEMPTY(&ctx->list)) {
|
||||
if (ctx->state == SPOE_CTX_ST_SENDING_MSGS)
|
||||
@ -2944,7 +2706,7 @@ static void
|
||||
spoe_reset_context(struct spoe_context *ctx)
|
||||
{
|
||||
ctx->state = SPOE_CTX_ST_READY;
|
||||
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
|
||||
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
|
||||
|
||||
ctx->stats.start_ts = 0;
|
||||
ctx->stats.request_ts = 0;
|
||||
@ -3401,7 +3163,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
|
||||
curagent->var_on_error = NULL;
|
||||
curagent->var_t_process = NULL;
|
||||
curagent->var_t_total = NULL;
|
||||
curagent->flags = (SPOE_FL_ASYNC | SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
|
||||
curagent->flags = (SPOE_FL_ASYNC | SPOE_FL_PIPELINING);
|
||||
curagent->cps_max = 0;
|
||||
curagent->eps_max = 0;
|
||||
curagent->max_frame_size = MAX_FRAME_SIZE;
|
||||
@ -3553,10 +3315,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
|
||||
else if (strcmp(args[1], "send-frag-payload") == 0) {
|
||||
if (alertif_too_many_args(1, file, linenum, args, &err_code))
|
||||
goto out;
|
||||
if (kwm == 1)
|
||||
curagent->flags &= ~SPOE_FL_SND_FRAGMENTATION;
|
||||
else
|
||||
curagent->flags |= SPOE_FL_SND_FRAGMENTATION;
|
||||
/* TODO: Add a warning or a diag ? Ignore it for now */
|
||||
goto out;
|
||||
}
|
||||
else if (strcmp(args[1], "dontlog-normal") == 0) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user