diff --git a/src/flt_spoe.c b/src/flt_spoe.c index f3bdfc3b9..8e2c7a7ac 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -56,8 +56,8 @@ /* The minimum size for a frame */ #define MIN_FRAME_SIZE 256 -/* Reserved for the metadata and the frame type. So - - * is the maximum payload size */ +/* Reserved for the metadata and the frame type. + * So - is the maximum payload size */ #define FRAME_HDR_SIZE 32 /* Flags set on the SPOE agent */ @@ -68,21 +68,28 @@ #define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */ #define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */ #define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */ +#define SPOE_CTX_FL_FRAGMENTED 0x00000010 /* Set when a fragmented frame is processing */ #define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS) /* Flags set on the SPOE applet */ -#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */ -#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */ -#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */ +#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */ +#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */ +#define SPOE_APPCTX_FL_FRAGMENTATION 0x00000004 /* Set if fragmentation is supported */ +#define SPOE_APPCTX_FL_PERSIST 0x00000008 /* Set if the applet is persistent */ #define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */ #define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */ +/* Flags set on the SPOE frame */ +#define SPOE_FRM_FL_FIN 0x00000001 +#define SPOE_FRM_FL_ABRT 0x00000002 + /* All possible states for a SPOE context */ enum spoe_ctx_state { SPOE_CTX_ST_NONE = 0, SPOE_CTX_ST_READY, + SPOE_CTX_ST_ENCODING_MSGS, SPOE_CTX_ST_SENDING_MSGS, SPOE_CTX_ST_WAITING_ACK, SPOE_CTX_ST_DONE, @@ -95,6 +102,8 @@ enum spoe_appctx_state { SPOE_APPCTX_ST_CONNECTING, SPOE_APPCTX_ST_IDLE, SPOE_APPCTX_ST_PROCESSING, + SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY, + SPOE_APPCTX_ST_WAITING_SYNC_ACK, SPOE_APPCTX_ST_DISCONNECT, SPOE_APPCTX_ST_DISCONNECTING, SPOE_APPCTX_ST_EXIT, @@ -132,6 +141,7 @@ enum spoe_context_error { SPOE_CTX_ERR_NONE = 0, SPOE_CTX_ERR_TOUT, SPOE_CTX_ERR_RES, + SPOE_CTX_ERR_TOO_BIG, SPOE_CTX_ERR_UNKNOWN = 255, SPOE_CTX_ERRS, }; @@ -148,6 +158,9 @@ enum spoe_frame_error { SPOE_FRM_ERR_NO_CAP, SPOE_FRM_ERR_BAD_VSN, SPOE_FRM_ERR_BAD_FRAME_SIZE, + SPOE_FRM_ERR_FRAG_NOT_SUPPORTED, + SPOE_FRM_ERR_INTERLACED_FRAMES, + SPOE_FRM_ERR_RES, SPOE_FRM_ERR_UNKNOWN = 99, SPOE_FRM_ERRS, }; @@ -189,6 +202,7 @@ struct spoe_message { char *file; /* file where the SPOE message appears */ int line; /* line where the SPOE message appears */ } conf; /* config information */ + unsigned int nargs; /* # of arguments */ struct list args; /* Arguments added when the SPOE messages is sent */ struct list list; /* Used to chain SPOE messages */ @@ -266,6 +280,14 @@ struct spoe_context { unsigned int stream_id; /* stream_id and frame_id are used */ unsigned int frame_id; /* to map NOTIFY and ACK frames */ unsigned int process_exp; /* expiration date to process an event */ + + struct { + struct spoe_appctx *spoe_appctx; /* SPOE appctx sending the fragmented frame */ + struct spoe_message *curmsg; /* SPOE message from which to resume encoding */ + struct spoe_arg *curarg; /* SPOE arg in from which to resume encoding */ + unsigned int curoff; /* offset in from which to resume encoding */ + unsigned int flags; /* SPOE_FRM_FL_* */ + } frag_ctx; /* Info about fragmented frames, valid on if SPOE_CTX_FL_FRAGMENTED is set */ }; /* SPOE context inside a appctx */ @@ -283,8 +305,16 @@ struct spoe_appctx { struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */ struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */ struct list list; /* next spoe appctx for the same agent */ + + struct { + struct spoe_context *ctx; /* SPOE context owning the fragmented frame */ + unsigned int cursid; /* stream-id of the fragmented frame. used if the processing is aborted */ + unsigned int curfid; /* frame-id of the fragmented frame. used if the processing is aborted */ + } frag_ctx; /* Info about fragmented frames, unused for unfragmented frames */ }; + +/* Helper to get SPOE ctx inside an appctx */ #define SPOE_APPCTX(appctx) ((struct spoe_appctx *)((appctx)->ctx.spoe.ptr)) /* SPOE filter id. Used to identify SPOE filters */ @@ -377,17 +407,20 @@ release_spoe_agent(struct spoe_agent *agent) } static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = { - [SPOE_FRM_ERR_NONE] = "normal", - [SPOE_FRM_ERR_IO] = "I/O error", - [SPOE_FRM_ERR_TOUT] = "a timeout occurred", - [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", - [SPOE_FRM_ERR_INVALID] = "invalid frame received", - [SPOE_FRM_ERR_NO_VSN] = "version value not found", - [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", - [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", - [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", - [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", - [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", + [SPOE_FRM_ERR_NONE] = "normal", + [SPOE_FRM_ERR_IO] = "I/O error", + [SPOE_FRM_ERR_TOUT] = "a timeout occurred", + [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", + [SPOE_FRM_ERR_INVALID] = "invalid frame received", + [SPOE_FRM_ERR_NO_VSN] = "version value not found", + [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", + [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", + [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", + [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", + [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported", + [SPOE_FRM_ERR_INTERLACED_FRAMES] = "invalid interlaced frames", + [SPOE_FRM_ERR_RES] = "resource allocation error", + [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", }; static const char *spoe_event_str[SPOE_EV_EVENTS] = { @@ -406,23 +439,26 @@ static const char *spoe_event_str[SPOE_EV_EVENTS] = { #if defined(DEBUG_SPOE) || defined(DEBUG_FULL) static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = { - [SPOE_CTX_ST_NONE] = "NONE", - [SPOE_CTX_ST_READY] = "READY", - [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS", - [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK", - [SPOE_CTX_ST_DONE] = "DONE", - [SPOE_CTX_ST_ERROR] = "ERROR", + [SPOE_CTX_ST_NONE] = "NONE", + [SPOE_CTX_ST_READY] = "READY", + [SPOE_CTX_ST_ENCODING_MSGS] = "ENCODING_MSGS", + [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS", + [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK", + [SPOE_CTX_ST_DONE] = "DONE", + [SPOE_CTX_ST_ERROR] = "ERROR", }; static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = { - [SPOE_APPCTX_ST_CONNECT] = "CONNECT", - [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING", - [SPOE_APPCTX_ST_IDLE] = "IDLE", - [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING", - [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT", - [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING", - [SPOE_APPCTX_ST_EXIT] = "EXIT", - [SPOE_APPCTX_ST_END] = "END", + [SPOE_APPCTX_ST_CONNECT] = "CONNECT", + [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING", + [SPOE_APPCTX_ST_IDLE] = "IDLE", + [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING", + [SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY] = "SENDING_FRAG_NOTIFY", + [SPOE_APPCTX_ST_WAITING_SYNC_ACK] = "WAITING_SYNC_ACK", + [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT", + [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING", + [SPOE_APPCTX_ST_EXIT] = "EXIT", + [SPOE_APPCTX_ST_END] = "END", }; #endif @@ -547,8 +583,8 @@ decode_spoe_version(const char *str, size_t len) double d; int vsn = -1; - memset(tmp, 0, len+1); memcpy(tmp, str, len); + tmp[len] = 0; start = tmp; while (isspace(*start)) @@ -600,7 +636,7 @@ decode_spoe_varint(const char *buf, const char *end, uint64_t *i) unsigned char *msg = (unsigned char *)buf; int idx = 0; - if (msg > (unsigned char *)end) + if (msg >= (unsigned char *)end) return -1; if (msg[0] < 240) { @@ -610,7 +646,7 @@ decode_spoe_varint(const char *buf, const char *end, uint64_t *i) *i = msg[0]; do { ++idx; - if (msg+idx > (unsigned char *)end) + if (msg+idx >= (unsigned char *)end) return -1; *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1)); } while (msg[idx] >= 128); @@ -635,6 +671,24 @@ encode_spoe_string(const char *str, size_t len, char *dst) return (idx + len); } +/* Encode first part of a fragmented string. The string will be prefix by its + * length, encoded as a variable-length integer. This function never fails and + * returns the number of written bytes. */ +static int +encode_frag_spoe_string(const char *str, size_t sz, size_t len, char *dst) +{ + int idx = 0; + + if (!sz) { + dst[0] = 0; + return 1; + } + + idx += encode_spoe_varint(sz, dst); + memcpy(dst+idx, str, len); + return (idx + len); +} + /* Decode a string. Its length is decoded first as a variable-length integer. If * it succeeds, and if the string length is valid, the begin of the string is * saved in <*str>, its length is saved in <*len> and the total numbre of bytes @@ -808,12 +862,13 @@ static int prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) { struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - int idx = 0; - size_t max = (7 /* TYPE + METADATA */ - + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL) - + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4 - + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL) - + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36); + unsigned int flags = SPOE_FRM_FL_FIN; + int idx = 0; + size_t max = (7 /* TYPE + METADATA */ + + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL) + + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4 + + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL) + + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36); if (size < max) { spoe_status_code = SPOE_FRM_ERR_TOO_BIG; @@ -823,8 +878,9 @@ prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) /* Frame type */ frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO; - /* No flags for now */ - memset(frame+idx, 0, 4); + /* Set flags */ + //flags = htonl(flags); + memcpy(frame+idx, (char *)&flags, 4); idx += 4; /* No stream-id and frame-id for HELLO frames */ @@ -866,10 +922,11 @@ static int prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size) { const char *reason; - int rlen, idx = 0; - size_t max = (7 /* TYPE + METADATA */ - + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2 - + 1 + SLEN(MSG_KEY) + 1 + 2 + 255); + unsigned int flags = SPOE_FRM_FL_FIN; + int rlen, idx = 0; + size_t max = (7 /* TYPE + METADATA */ + + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2 + + 1 + SLEN(MSG_KEY) + 1 + 2 + 255); if (size < max) return -1; @@ -883,8 +940,8 @@ prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size) /* Frame type */ frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON; - /* No flags for now */ - memset(frame+idx, 0, 4); + /* Set flags */ + memcpy(frame+idx, (char *)&flags, 4); idx += 4; /* No stream-id and frame-id for DISCONNECT frames */ @@ -912,17 +969,36 @@ static int prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, char *frame, size_t size) { - int idx = 0; + int idx = 0; + unsigned int stream_id, frame_id, flags = SPOE_FRM_FL_FIN; frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY; - /* No flags for now */ - memset(frame+idx, 0, 4); + if (ctx == NULL) { + flags |= SPOE_FRM_FL_ABRT; + stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid; + frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid; + } + else { + stream_id = ctx->stream_id; + frame_id = ctx->frame_id; + + if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) { + if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) { + spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + return 0; + } + flags = ctx->frag_ctx.flags; + } + } + + /* Set flags */ + memcpy(frame+idx, (char *)&flags, 4); idx += 4; /* Set stream-id and frame-id */ - idx += encode_spoe_varint(ctx->stream_id, frame+idx); - idx += encode_spoe_varint(ctx->frame_id, frame+idx); + idx += encode_spoe_varint(stream_id, frame+idx); + idx += encode_spoe_varint(frame_id, frame+idx); /* check the buffer size */ if (idx + SPOE_APPCTX(appctx)->buffer->i > size) { @@ -933,7 +1009,6 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, /* Copy encoded messages */ memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i); idx += SPOE_APPCTX(appctx)->buffer->i; - return idx; } @@ -942,12 +1017,12 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, static int handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) { - int vsn, max_frame_size, flags; - int i, idx = 0; - size_t min_size = (7 /* TYPE + METADATA */ - + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3 - + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1 - + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0); + int vsn, max_frame_size, i, idx = 0; + unsigned int flags; + size_t min_size = (7 /* TYPE + METADATA */ + + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3 + + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1 + + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0); /* Check frame type */ if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO) @@ -958,9 +1033,16 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) return -1; } - /* Skip flags: fragmentation is not supported for now */ + /* Retrieve flags */ + memcpy((char *)&flags, frame+idx, 4); idx += 4; + /* Fragmentation is not supported for HELLO frame */ + if (!(flags & SPOE_FRM_FL_FIN)) { + spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + return -1; + } + /* stream-id and frame-id must be cleared */ if (frame[idx] != 0 || frame[idx+1] != 0) { spoe_status_code = SPOE_FRM_ERR_INVALID; @@ -1065,6 +1147,11 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) if (sz == i || isspace(str[i]) || str[i] == ',') flags |= SPOE_APPCTX_FL_ASYNC; } + else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) { + i += 13; + if (sz == i || isspace(str[i]) || str[i] == ',') + flags |= SPOE_APPCTX_FL_FRAGMENTATION; + } if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL) break; @@ -1103,10 +1190,11 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) static int handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) { - int i, idx = 0; - size_t min_size = (7 /* TYPE + METADATA */ - + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1 - + 1 + SLEN(MSG_KEY) + 1 + 1); + int i, idx = 0; + unsigned int flags; + size_t min_size = (7 /* TYPE + METADATA */ + + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1 + + 1 + SLEN(MSG_KEY) + 1 + 1); /* Check frame type */ if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON) @@ -1117,9 +1205,16 @@ handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) return -1; } - /* Skip flags: fragmentation is not supported for now */ + /* Retrieve flags */ + memcpy((char *)&flags, frame+idx, 4); idx += 4; + /* Fragmentation is not supported for DISCONNECT frame */ + if (!(flags & SPOE_FRM_FL_FIN)) { + spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + return -1; + } + /* stream-id and frame-id must be cleared */ if (frame[idx] != 0 || frame[idx+1] != 0) { spoe_status_code = SPOE_FRM_ERR_INVALID; @@ -1194,13 +1289,14 @@ handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) /* Decode ACK frame sent by an agent. It returns the number of read bytes on * success, 0 if the frame can be ignored and -1 if an error occurred. */ static int -handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) +handle_spoe_agentack_frame(struct appctx *appctx, struct spoe_context **ctx, + char *frame, size_t size) { - struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - struct spoe_context *ctx, *back; - uint64_t stream_id, frame_id; - int i, idx = 0; - size_t min_size = (7 /* TYPE + METADATA */); + struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; + uint64_t stream_id, frame_id; + int i, idx = 0; + unsigned int flags; + size_t min_size = (7 /* TYPE + METADATA */); /* Check frame type */ if (frame[idx++] != SPOE_FRM_T_AGENT_ACK) @@ -1211,9 +1307,16 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) return -1; } - /* Skip flags: fragmentation is not supported for now */ + /* Retrieve flags */ + memcpy((char *)&flags, frame+idx, 4); idx += 4; + /* Fragmentation is not supported for now */ + if (!(flags & SPOE_FRM_FL_FIN)) { + spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + return -1; + } + /* Get the stream-id and the frame-id */ if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1) return 0; @@ -1223,41 +1326,51 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) idx += i; if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { - list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { - if (ctx->stream_id == (unsigned int)stream_id && - ctx->frame_id == (unsigned int)frame_id) + list_for_each_entry((*ctx), &agent->waiting_queue, list) { + if ((*ctx)->stream_id == (unsigned int)stream_id && + (*ctx)->frame_id == (unsigned int)frame_id) goto found; } } else { - list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) { - if (ctx->stream_id == (unsigned int)stream_id && - ctx->frame_id == (unsigned int)frame_id) + list_for_each_entry((*ctx), &SPOE_APPCTX(appctx)->waiting_queue, list) { + if ((*ctx)->stream_id == (unsigned int)stream_id && + (*ctx)->frame_id == (unsigned int)frame_id) goto found; } } + /* FIXME: check if ABRT bit is set for a unfinished fragmented frame */ + /* No Stream found, ignore the frame */ + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Ignore ACK frame" + " - stream-id=%u - frame-id=%u\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx, + (unsigned int)stream_id, (unsigned int)frame_id); + + *ctx = NULL; return 0; found: - if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait)) + if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait)) { + *ctx = NULL; return 1; /* Retry later */ - - /* Transfer the buffer ownership to the SPOE context */ - ctx->buffer = SPOE_APPCTX(appctx)->buffer; - SPOE_APPCTX(appctx)->buffer = &buf_empty; + } /* Copy encoded actions */ - memcpy(ctx->buffer->p, frame+idx, size-idx); - ctx->buffer->i = size-idx; + memcpy(SPOE_APPCTX(appctx)->buffer->p, frame+idx, size-idx); + SPOE_APPCTX(appctx)->buffer->i = size-idx; - /* Notify the stream */ - LIST_DEL(&ctx->list); - LIST_INIT(&ctx->list); - ctx->state = SPOE_CTX_ST_DONE; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + /* Transfer the buffer ownership to the SPOE context */ + (*ctx)->buffer = SPOE_APPCTX(appctx)->buffer; + SPOE_APPCTX(appctx)->buffer = &buf_empty; + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - ACK frame received - ctx=%p - stream-id=%u - frame-id=%u\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx, + *ctx, (*ctx)->stream_id, (*ctx)->frame_id); return idx; } @@ -1464,6 +1577,14 @@ release_spoe_applet(struct appctx *appctx) task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } + if (SPOE_APPCTX(appctx)->frag_ctx.ctx) { + ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx; + ctx->frag_ctx.spoe_appctx = NULL; + ctx->state = SPOE_CTX_ST_ERROR; + ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + } + release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx)); @@ -1630,7 +1751,7 @@ handle_processing_spoe_applet(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - struct spoe_context *ctx; + struct spoe_context *ctx = NULL; char *frame = trash.str; unsigned int fpa = 0; int ret, framesz = 0, skip_sending = 0, skip_receiving = 0; @@ -1648,28 +1769,38 @@ handle_processing_spoe_applet(struct appctx *appctx) } process: + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - process: fpa=%u/%u - skip_sending=%d - skip_receiving=%d" + " - appctx-state=%s\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx, fpa, agent->max_fpa, + skip_sending, skip_receiving, spoe_appctx_state_str[appctx->st0]); + if (fpa > agent->max_fpa || (skip_sending && skip_receiving)) goto stop; - - /* Frames must be handled synchronously and a the applet is waiting for - * a ACK frame */ - if (!(SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) && - !LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) { + else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) { if (skip_receiving) goto stop; goto recv_frame; } - - if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) { + else if (skip_sending) + goto recv_frame; + else if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) { + ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx; + goto send_frame; + } + else if (LIST_ISEMPTY(&agent->sending_queue)) { skip_sending = 1; goto recv_frame; } - ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); + send_frame: /* Transfer the buffer ownership to the SPOE appctx */ - SPOE_APPCTX(appctx)->buffer = ctx->buffer; - ctx->buffer = &buf_empty; + if (ctx) { + SPOE_APPCTX(appctx)->buffer = ctx->buffer; + ctx->buffer = &buf_empty; + } ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size); if (ret > 1) @@ -1681,14 +1812,15 @@ handle_processing_spoe_applet(struct appctx *appctx) goto next; case 0: /* ignore */ - agent->sending_rate++; - ctx->state = SPOE_CTX_ST_ERROR; - ctx->status_code = (spoe_status_code + 0x100); release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + agent->sending_rate++; + fpa++; + LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); - fpa++; + ctx->state = SPOE_CTX_ST_ERROR; + ctx->status_code = (spoe_status_code + 0x100); + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); break; case 1: /* retry */ @@ -1697,16 +1829,69 @@ handle_processing_spoe_applet(struct appctx *appctx) break; default: - agent->sending_rate++; - ctx->state = SPOE_CTX_ST_WAITING_ACK; release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); + agent->sending_rate++; + fpa++; + + if (ctx == NULL) { + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; + SPOE_APPCTX(appctx)->frag_ctx.cursid = 0; + SPOE_APPCTX(appctx)->frag_ctx.curfid = 0; + break; + } LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); - if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) - LIST_ADDQ(&agent->waiting_queue, &ctx->list); - else - LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); - fpa++; + + if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) { + if (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN) { + if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + LIST_ADDQ(&agent->waiting_queue, &ctx->list); + } + else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) { + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); + } + else { + appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK; + LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); + } + SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; + SPOE_APPCTX(appctx)->frag_ctx.cursid = 0; + SPOE_APPCTX(appctx)->frag_ctx.curfid = 0; + + ctx->frag_ctx.spoe_appctx = NULL; + ctx->state = SPOE_CTX_ST_WAITING_ACK; + } + else { + appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY; + SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx; + SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id; + SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id; + + ctx->frag_ctx.spoe_appctx = SPOE_APPCTX(appctx); + ctx->state = SPOE_CTX_ST_ENCODING_MSGS; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + skip_sending = 1; + } + } + else { + if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + LIST_ADDQ(&agent->waiting_queue, &ctx->list); + } + else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) { + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); + } + else { + appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK; + LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); + } + + ctx->state = SPOE_CTX_ST_WAITING_ACK; + } } if (fpa > agent->max_fpa) @@ -1724,9 +1909,8 @@ handle_processing_spoe_applet(struct appctx *appctx) goto next; } framesz = ret; - ret = handle_spoe_agentack_frame(appctx, frame, framesz); + ret = handle_spoe_agentack_frame(appctx, &ctx, frame, framesz); } - switch (ret) { case -1: /* error */ if (framesz) @@ -1748,6 +1932,15 @@ handle_processing_spoe_applet(struct appctx *appctx) if (framesz) bo_skip(si_oc(si), framesz+4); fpa++; + + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + + if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + + ctx->state = SPOE_CTX_ST_DONE; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } goto process; @@ -1755,10 +1948,9 @@ handle_processing_spoe_applet(struct appctx *appctx) SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); return 0; stop: - if ((SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) || - LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) { - agent->applets_idle++; + if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) { appctx->st0 = SPOE_APPCTX_ST_IDLE; + agent->applets_idle++; } if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) { LIST_DEL(&SPOE_APPCTX(appctx)->list); @@ -1932,6 +2124,8 @@ handle_spoe_applet(struct appctx *appctx) /* fall through */ case SPOE_APPCTX_ST_PROCESSING: + case SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY: + case SPOE_APPCTX_ST_WAITING_SYNC_ACK: if (handle_processing_spoe_applet(appctx)) goto out; goto switchstate; @@ -1987,6 +2181,7 @@ create_spoe_appctx(struct spoe_config *conf) appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx); if (SPOE_APPCTX(appctx) == NULL) goto out_free_appctx; + memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size); appctx->st0 = SPOE_APPCTX_ST_CONNECT; if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL) @@ -2116,8 +2311,10 @@ queue_spoe_context(struct spoe_context *ctx) end: /* The only reason to return an error is when there is no applet */ - if (LIST_ISEMPTY(&agent->applets)) - return 0; + if (LIST_ISEMPTY(&agent->applets)) { + ctx->status_code = SPOE_CTX_ERR_RES; + return -1; + } /* Add the SPOE context in the sending queue and update all running * info */ @@ -2146,14 +2343,160 @@ queue_spoe_context(struct spoe_context *ctx) } /*************************************************************************** - * Functions that process SPOE messages and actions + * Functions that encode SPOE messages **************************************************************************/ -/* Process SPOE messages for a specific event. During the processing, it returns +static inline int +encode_spoe_arg_string(struct spoe_context *ctx, struct sample *smp, + char *p, size_t max_size) +{ + struct chunk *chk = &smp->data.u.str; + int idx = 0; + + /* Here, we need to know if the sample has already been partially + * encoded. If yes, we only need to encode the remaining, + * reprensenting the number of bytes already encoded in previous + * frames. Else, == 0 */ + + if (!ctx->frag_ctx.curoff) { + /* First evaluation of the sample : encode the type (string or + * binary) and check its size against */ + + /* the string/binary length must not exceed 4 Gb. So 5 bytes is + * reserved to encode its size. */ + if (max_size < 6) + return 0; + + p[idx++] = (smp->data.type == SMP_T_STR) ? SPOE_DATA_T_STR : SPOE_DATA_T_BIN; + max_size -= (idx + 5); + + if (chk->len > max_size) { + /* The sample is too big, we will fragment it. + * will be updated accordingly. */ + idx += encode_frag_spoe_string(chk->str, chk->len, max_size, p+idx); + ctx->frag_ctx.curoff = max_size; + } + else { + /* No fragmentation needed, all the sample is encoded + * and remains 0 */ + idx += encode_spoe_string(chk->str, chk->len, p+idx); + } + } + else { + /* Continue the sample fragmentation, the type was already set + * in a previous frame. So just do a copy of data. */ + + idx = chk->len - ctx->frag_ctx.curoff; /* Remaining data */ + if (idx > max_size) { + /* The sample still needs to be fragmented. + * will be incremented accordingly. */ + memcpy(p, chk->str + ctx->frag_ctx.curoff, max_size); + idx = max_size; + ctx->frag_ctx.curoff += max_size; + } + else { + /* Finish the fragmentation. will be reset. */ + memcpy(p, chk->str + ctx->frag_ctx.curoff, idx); + ctx->frag_ctx.curoff = 0; + } + } + return idx; +} + +static inline int +encode_spoe_arg_method(struct spoe_context *ctx, struct sample *smp, + char *p, size_t max_size) +{ + int idx = 0; + + /* method length must not exceed 2288 bytes. So 3 bytes is reserved to + * encode its size. */ + + if (smp->data.u.meth.meth != HTTP_METH_OTHER) { + const struct http_method_name *meth = + &http_known_methods[smp->data.u.meth.meth]; + + if (meth->len + 3 > max_size) + return 0; + p[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(meth->name, meth->len, p+idx); + } + else { + struct chunk *meth = &smp->data.u.meth.str; + + if (meth->len + 3 > max_size) + return 0; + p[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(meth->str, meth->len, p+idx); + } + return idx; +} + +static inline int +encode_spoe_arg_ipv6(struct spoe_context *ctx, struct sample *smp, + char *p, size_t max_size) +{ + int idx = 0; + + if (max_size < 17) + return 0; + p[idx++] = SPOE_DATA_T_IPV6; + memcpy(p+idx, &smp->data.u.ipv6, 16); + idx += 16; + return idx; +} + + +static inline int +encode_spoe_arg_ipv4(struct spoe_context *ctx, struct sample *smp, + char *p, size_t max_size) +{ + int idx = 0; + + if (max_size < 5) + return 0; + p[idx++] = SPOE_DATA_T_IPV4; + memcpy(p+idx, &smp->data.u.ipv6, 4); + idx += 4; + return idx; +} + +static inline int +encode_spoe_arg_sint(struct spoe_context *ctx, struct sample *smp, + char *p, size_t max_size) +{ + int idx = 0; + + if (max_size < 9) + return 0; + p[idx++] = SPOE_DATA_T_INT64; + idx += encode_spoe_varint(smp->data.u.sint, p+idx); + + return idx; +} + +static inline int +encode_spoe_arg_bool(struct spoe_context *ctx, struct sample *smp, + char *p, size_t max_size) +{ + int flag, idx = 0; + + if (max_size < 1) + return 0; + flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE); + p[idx++] = (SPOE_DATA_T_BOOL | flag); + + return idx; +} + +/* Encode SPOE messages for a specific event. + * + * + * It returns 0 if During the processing, it returns * 0 and it returns 1 when the processing is finished. If an error occurred, -1 * is returned. */ static int -process_spoe_messages(struct stream *s, struct spoe_context *ctx, - struct list *messages, int dir) +encode_spoe_messages(struct stream *s, struct spoe_context *ctx, + struct list *messages, int dir) { struct spoe_config *conf = FLT_CONF(ctx->filter); struct spoe_agent *agent = conf->agent; @@ -2162,35 +2505,71 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx, struct spoe_arg *arg; char *p; size_t max_size; - int off, flag, idx = 0; + int r, idx = 0; max_size = agent->frame_size - FRAME_HDR_SIZE; p = ctx->buffer->p; + /* Resume encoding of a SPOE message */ + if (ctx->frag_ctx.curmsg != NULL) { + msg = ctx->frag_ctx.curmsg; + goto encode_message; + } + /* Loop on messages */ list_for_each_entry(msg, messages, list) { - if (idx + msg->id_len + 1 > max_size) - goto skip; + ctx->frag_ctx.curmsg = msg; + ctx->frag_ctx.curarg = NULL; + ctx->frag_ctx.curoff = UINT_MAX; + + encode_message: + /* Resume encoding of a SPOE argument */ + if (ctx->frag_ctx.curarg != NULL) { + arg = ctx->frag_ctx.curarg; + goto encode_argument; + } + + if (ctx->frag_ctx.curoff != UINT_MAX) + goto encode_msg_payload; + + /* + + . + * Implies is encoded on 2 bytes, at most (< 2288). */ + if (idx + 3 + msg->id_len + 1 > max_size) + goto too_big; /* Set the message name */ idx += encode_spoe_string(msg->id, msg->id_len, p+idx); - /* Save offset where to store the number of arguments for this - * message */ - off = idx++; - p[off] = 0; + /* Store the number of arguments for this message */ + p[idx++] = msg->nargs; + + ctx->frag_ctx.curoff = 0; + encode_msg_payload: /* Loop on arguments */ list_for_each_entry(arg, &msg->args, list) { - p[off]++; /* Increment the number of arguments */ + ctx->frag_ctx.curarg = arg; + ctx->frag_ctx.curoff = UINT_MAX; - if (idx + arg->name_len + 1 > max_size) - goto skip; + encode_argument: + if (ctx->frag_ctx.curoff != UINT_MAX) + goto encode_arg_value; + + /* + . + * Implies is encoded on 2 bytes, at most (< 2288). */ + if (idx + 3 + arg->name_len > max_size) + goto too_big; /* Encode the arguement name as a string. It can by NULL */ idx += encode_spoe_string(arg->name, arg->name_len, p+idx); + ctx->frag_ctx.curoff = 0; + encode_arg_value: + + if (idx + 1 > max_size) + goto too_big; + /* Fetch the arguement value */ smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL); if (!smp) { @@ -2202,75 +2581,85 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx, /* Else, encode the arguement value */ switch (smp->data.type) { case SMP_T_BOOL: - flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE); - p[idx++] = (SPOE_DATA_T_BOOL | flag); + if (!(r = encode_spoe_arg_bool(ctx, smp, p+idx, max_size-idx))) + goto too_big; + idx += r; break; + case SMP_T_SINT: - p[idx++] = SPOE_DATA_T_INT64; - if (idx + 8 > max_size) - goto skip; - idx += encode_spoe_varint(smp->data.u.sint, p+idx); + if (!(r = encode_spoe_arg_sint(ctx, smp, p+idx, max_size-idx))) + goto too_big; + idx += r; break; + case SMP_T_IPV4: - p[idx++] = SPOE_DATA_T_IPV4; - if (idx + 4 > max_size) - goto skip; - memcpy(p+idx, &smp->data.u.ipv4, 4); - idx += 4; + if (!(r = encode_spoe_arg_ipv4(ctx, smp, p+idx, max_size-idx))) + goto too_big; + idx += r; break; + case SMP_T_IPV6: - p[idx++] = SPOE_DATA_T_IPV6; - if (idx + 16 > max_size) - goto skip; - memcpy(p+idx, &smp->data.u.ipv6, 16); - idx += 16; + if (!(r = encode_spoe_arg_ipv6(ctx, smp, p+idx, max_size-idx))) + goto too_big; + idx += r; break; + case SMP_T_STR: - p[idx++] = SPOE_DATA_T_STR; - if (idx + smp->data.u.str.len > max_size) - goto skip; - idx += encode_spoe_string(smp->data.u.str.str, - smp->data.u.str.len, - p+idx); - break; case SMP_T_BIN: - p[idx++] = SPOE_DATA_T_BIN; - if (idx + smp->data.u.str.len > max_size) - goto skip; - idx += encode_spoe_string(smp->data.u.str.str, - smp->data.u.str.len, - p+idx); + idx += encode_spoe_arg_string(ctx, smp, p+idx, max_size-idx); + if (ctx->frag_ctx.curoff) + goto too_big; break; + case SMP_T_METH: - if (smp->data.u.meth.meth == HTTP_METH_OTHER) { - p[idx++] = SPOE_DATA_T_STR; - if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size) - goto skip; - idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name, - http_known_methods[smp->data.u.meth.meth].len, - p+idx); - } - else { - p[idx++] = SPOE_DATA_T_STR; - if (idx + smp->data.u.str.len > max_size) - goto skip; - idx += encode_spoe_string(smp->data.u.meth.str.str, - smp->data.u.meth.str.len, - p+idx); - } + if (!(r = encode_spoe_arg_method(ctx, smp, p+idx, max_size-idx))) + goto too_big; + idx += r; break; + default: p[idx++] = SPOE_DATA_T_NULL; } } } + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - encode %s messages - spoe_appctx=%p - max_size=%lu - idx=%u\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, + ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"), + ctx->frag_ctx.spoe_appctx, max_size, idx); + ctx->buffer->i = idx; + ctx->frag_ctx.curmsg = NULL; + ctx->frag_ctx.curarg = NULL; + ctx->frag_ctx.curoff = 0; + ctx->frag_ctx.flags = SPOE_FRM_FL_FIN; return 1; - skip: - return 0; + too_big: + // FIXME: if fragmentation not supported => + // ctx->status_code = SPOE_CTX_ERR_TOO_BIG; + // return -1; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - encode fragmented messages - spoe_appctx=%p - curmsg=%p - curarg=%p - curoff=%u" + " - max_size=%lu - idx=%u\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx, + ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff, + max_size, idx); + + ctx->buffer->i = idx; + ctx->flags |= SPOE_CTX_FL_FRAGMENTED; + ctx->frag_ctx.flags &= ~SPOE_FRM_FL_FIN; + return 1; } + +/*************************************************************************** + * Functions that handle SPOE actions + **************************************************************************/ /* Helper function to set a variable */ static void set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len, @@ -2411,34 +2800,37 @@ process_spoe_actions(struct stream *s, struct spoe_context *ctx, return 0; } -static int +/*************************************************************************** + * Functions that process SPOE events + **************************************************************************/ +static inline int start_event_processing(struct spoe_context *ctx, int dir) { /* If a process is already started for this SPOE context, retry * later. */ if (ctx->flags & SPOE_CTX_FL_PROCESS) - goto wait; - - if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait)) - goto wait; + return 0; /* Set the right flag to prevent request and response processing * in same time. */ ctx->flags |= ((dir == SMP_OPT_DIR_REQ) ? SPOE_CTX_FL_REQ_PROCESS : SPOE_CTX_FL_RSP_PROCESS); - return 1; - - wait: - return 0; } -static void +static inline void stop_event_processing(struct spoe_context *ctx) { + struct spoe_appctx *sa = ctx->frag_ctx.spoe_appctx; + + if (sa) { + sa->frag_ctx.ctx = NULL; + wakeup_spoe_appctx(sa->owner); + } + /* Reset the flag to allow next processing */ - ctx->flags &= ~SPOE_CTX_FL_PROCESS; + ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED); ctx->status_code = 0; @@ -2447,6 +2839,12 @@ stop_event_processing(struct spoe_context *ctx) release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait); + ctx->frag_ctx.spoe_appctx = NULL; + ctx->frag_ctx.curmsg = NULL; + ctx->frag_ctx.curarg = NULL; + ctx->frag_ctx.curoff = 0; + ctx->frag_ctx.flags = 0; + if (!LIST_ISEMPTY(&ctx->list)) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); @@ -2472,7 +2870,6 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], spoe_event_str[ev]); - dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); if (LIST_ISEMPTY(&(ctx->messages[ev]))) @@ -2509,21 +2906,31 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, ret = start_event_processing(ctx, dir); if (!ret) goto out; - ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir); - if (!ret) - goto skip; - if (!queue_spoe_context(ctx)) { - ctx->status_code = SPOE_CTX_ERR_RES; + if (queue_spoe_context(ctx) < 0) goto error; - } - ctx->state = SPOE_CTX_ST_SENDING_MSGS; + ctx->state = SPOE_CTX_ST_ENCODING_MSGS; /* fall through */ } - if (ctx->state == SPOE_CTX_ST_SENDING_MSGS || - ctx->state == SPOE_CTX_ST_WAITING_ACK) { + if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) { + if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait)) + goto out; + ret = encode_spoe_messages(s, ctx, &(ctx->messages[ev]), dir); + if (ret < 0) + goto error; + ctx->state = SPOE_CTX_ST_SENDING_MSGS; + } + + if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) { + if (ctx->frag_ctx.spoe_appctx) + wakeup_spoe_appctx(ctx->frag_ctx.spoe_appctx->owner); + ret = 0; + goto out; + } + + if (ctx->state == SPOE_CTX_ST_WAITING_ACK) { ret = 0; goto out; } @@ -2547,7 +2954,6 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, if (agent->var_on_error) { struct sample smp; - // FIXME: Get the error code here memset(&smp, 0, sizeof(smp)); smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); smp.data.u.sint = ctx->status_code; @@ -2556,6 +2962,11 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, set_spoe_var(ctx, "txn", agent->var_on_error, strlen(agent->var_on_error), &smp); } + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to create process event '%s': code=%u\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm, spoe_event_str[ev], + ctx->status_code); send_log(ctx->strm->be, LOG_WARNING, "SPOE: [%s] failed to process event '%s': code=%u\n", agent->id, spoe_event_str[ev], ctx->status_code); @@ -3322,6 +3733,7 @@ cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm) curmsg->event = SPOE_EV_NONE; curmsg->conf.file = strdup(file); curmsg->conf.line = linenum; + curmsg->nargs = 0; LIST_INIT(&curmsg->args); LIST_ADDQ(&curmsgs, &curmsg->list); } @@ -3361,6 +3773,7 @@ cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm) free(arg); goto out; } + curmsg->nargs++; LIST_ADDQ(&curmsg->args, &arg->list); cur_arg++; }