diff --git a/include/proto/checks.h b/include/proto/checks.h index 3c6eb7ff9..eb26c9a27 100644 --- a/include/proto/checks.h +++ b/include/proto/checks.h @@ -52,8 +52,8 @@ void send_email_alert(struct server *s, int priority, const char *format, ...) int srv_check_healthcheck_port(struct check *chk); /* Declared here, but the definitions are in flt_spoe.c */ -int prepare_spoe_healthcheck_request(char **req, int *len); -int handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen); +int spoe_prepare_healthcheck_request(char **req, int *len); +int spoe_handle_healthcheck_response(char *frame, size_t size, char *err, int errlen); #endif /* _PROTO_CHECKS_H */ diff --git a/src/cfgparse.c b/src/cfgparse.c index 54869608d..d4021c58b 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -4921,7 +4921,7 @@ int cfg_parse_listen(const char *file, int linenum, char **args, int kwm) curproxy->options2 &= ~PR_O2_CHK_ANY; curproxy->options2 |= PR_O2_SPOP_CHK; - if (prepare_spoe_healthcheck_request(&curproxy->check_req, &curproxy->check_len)) { + if (spoe_prepare_healthcheck_request(&curproxy->check_req, &curproxy->check_len)) { Alert("parsing [%s:%d] : failed to prepare SPOP healthcheck request.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto out; diff --git a/src/checks.c b/src/checks.c index 49bd886bf..0668a7645 100644 --- a/src/checks.c +++ b/src/checks.c @@ -1328,7 +1328,7 @@ static void event_srv_chk_r(struct connection *conn) if (!done && check->bi->i < (4+framesz)) goto wait_more_data; - if (!handle_spoe_healthcheck_response(check->bi->data+4, framesz, err, HCHK_DESC_LEN-1)) + if (!spoe_handle_healthcheck_response(check->bi->data+4, framesz, err, HCHK_DESC_LEN-1)) set_server_check_status(check, HCHK_STATUS_L7OKD, "SPOA server is ok"); else set_server_check_status(check, HCHK_STATUS_L7STS, err); diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 8e2c7a7ac..a3ddf85b1 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -229,7 +229,7 @@ struct spoe_agent { /* Config info */ char *engine_id; /* engine-id string */ char *var_pfx; /* Prefix used for vars set by the agent */ - char *var_on_error; /* Variable to set when an error occured, in the TXN scope */ + char *var_on_error; /* Variable to set when an error occurred, in the TXN scope */ unsigned int flags; /* SPOE_FL_* */ unsigned int cps_max; /* Maximum # of connections per second */ unsigned int eps_max; /* Maximum # of errors per second */ @@ -301,6 +301,11 @@ struct spoe_appctx { unsigned int flags; /* SPOE_APPCTX_FL_* */ unsigned int status_code; /* SPOE_FRM_ERR_* */ +#if defined(DEBUG_SPOE) || defined(DEBUG_FULL) + char *reason; /* Error message, used for debugging only */ + int rlen; /* reason length */ +#endif + struct buffer *buffer; /* Buffer used to store a encoded messages */ struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */ struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */ @@ -343,21 +348,17 @@ struct list curmps; static struct pool_head *pool2_spoe_ctx = NULL; static struct pool_head *pool2_spoe_appctx = NULL; -/* Temporary variables used to ease error processing */ -int spoe_status_code = SPOE_FRM_ERR_NONE; -char spoe_reason[256]; - struct flt_ops spoe_ops; -static int queue_spoe_context(struct spoe_context *ctx); -static int acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait); -static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait); +static int spoe_queue_context(struct spoe_context *ctx); +static int spoe_acquire_buffer(struct buffer **buf, struct buffer_wait *buffer_wait); +static void spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait); /******************************************************************** * helper functions/globals ********************************************************************/ static void -release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp) +spoe_release_msg_placeholder(struct spoe_msg_placeholder *mp) { if (!mp) return; @@ -367,7 +368,7 @@ release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp) static void -release_spoe_message(struct spoe_message *msg) +spoe_release_message(struct spoe_message *msg) { struct spoe_arg *arg, *back; @@ -385,7 +386,7 @@ release_spoe_message(struct spoe_message *msg) } static void -release_spoe_agent(struct spoe_agent *agent) +spoe_release_agent(struct spoe_agent *agent) { struct spoe_message *msg, *back; int i; @@ -400,7 +401,7 @@ release_spoe_agent(struct spoe_agent *agent) for (i = 0; i < SPOE_EV_EVENTS; ++i) { list_for_each_entry_safe(msg, back, &agent->messages[i], list) { LIST_DEL(&msg->list); - release_spoe_message(msg); + spoe_release_message(msg); } } free(agent); @@ -463,6 +464,9 @@ static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = { #endif +/* Used to generates a unique id for an engine. On success, it returns a + * allocated string. So it is the caller's reponsibility to release it. If the + * allocation failed, it returns NULL. */ static char * generate_pseudo_uuid() { @@ -493,15 +497,22 @@ generate_pseudo_uuid() return uuid; } +/* Returns the minimum number of appets alive at a time. This function is used + * to know if more applets should be created for an engine. */ static inline unsigned int min_applets_act(struct spoe_agent *agent) { unsigned int nbsrv; + /* TODO: Add a config parameter to customize this value. Always 0 for + * now */ if (agent->min_applets) return agent->min_applets; - nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck); + /* Get the number of active servers for the backend */ + nbsrv = (agent->b.be->srv_act + ? agent->b.be->srv_act + : agent->b.be->srv_bck); return 2*nbsrv; } @@ -573,407 +584,604 @@ static struct spoe_version supported_versions[] = { #define SUPPORTED_VERSIONS_VAL "1.0" /* Comma-separated list of supported capabilities (none for now) */ -//#define CAPABILITIES_VAL "" #define CAPABILITIES_VAL "pipelining,async" +/* Convert a string to a SPOE version value. The string must follow the format + * "MAJOR.MINOR". It will be concerted into the integer (1000 * MAJOR + MINOR). + * If an error occurred, -1 is returned. */ static int -decode_spoe_version(const char *str, size_t len) +spoe_str_to_vsn(const char *str, size_t len) { - char tmp[len+1], *start, *end; - double d; - int vsn = -1; + const char *p, *end; + int maj, min, vsn; - memcpy(tmp, str, len); - tmp[len] = 0; + p = str; + end = str+len; + maj = min = 0; + vsn = -1; - start = tmp; - while (isspace(*start)) - start++; + /* skip leading spaces */ + while (p < end && isspace(*p)) + p++; - d = strtod(start, &end); - if (d == 0 || start == end) + /* parse Major number, until the '.' */ + while (*p != '.') { + if (p >= end || *p < '0' || *p > '9') + goto out; + maj *= 10; + maj += (*p - '0'); + p++; + } + + /* check Major version */ + if (!maj) goto out; - if (*end) { - while (isspace(*end)) - end++; - if (*end) - goto out; + p++; /* skip the '.' */ + if (p >= end || *p < '0' || *p > '9') /* Minor number is missing */ + goto out; + + /* Parse Minor number */ + while (p < end) { + if (*p < '0' || *p > '9') + break; + min *= 10; + min += (*p - '0'); + p++; } - vsn = (int)(d * 1000); + + /* check Minor number */ + if (min > 999) + goto out; + + /* skip trailing spaces */ + while (p < end && isspace(*p)) + p++; + if (p != end) + goto out; + + vsn = maj * 1000 + min; out: return vsn; } -/* Encode a variable-length integer. This function never fails and returns the - * number of written bytes. */ +/* Encode the integer into a varint (variable-length integer). The encoded + * value is copied in <*buf>. Here is the encoding format: + * + * 0 <= X < 240 : 1 byte (7.875 bits) [ XXXX XXXX ] + * 240 <= X < 2288 : 2 bytes (11 bits) [ 1111 XXXX ] [ 0XXX XXXX ] + * 2288 <= X < 264432 : 3 bytes (18 bits) [ 1111 XXXX ] [ 1XXX XXXX ] [ 0XXX XXXX ] + * 264432 <= X < 33818864 : 4 bytes (25 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*2 [ 0XXX XXXX ] + * 33818864 <= X < 4328786160 : 5 bytes (32 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*3 [ 0XXX XXXX ] + * ... + * + * On success, it returns the number of written bytes and <*buf> is moved after + * the encoded value. Otherwise, it returns -1. */ static int -encode_spoe_varint(uint64_t i, char *buf) +spoe_encode_varint(uint64_t i, char **buf, char *end) { - int idx; + unsigned char *p = (unsigned char *)*buf; + int r; - if (i < 240) { - buf[0] = (unsigned char)i; - return 1; - } - - buf[0] = (unsigned char)i | 240; - i = (i - 240) >> 4; - for (idx = 1; i >= 128; ++idx) { - buf[idx] = (unsigned char)i | 128; - i = (i - 128) >> 7; - } - buf[idx++] = (unsigned char)i; - return idx; -} - -/* Decode a varable-length integer. If the decoding fails, -1 is returned. This - * happens when the buffer's end in reached. On success, the number of read - * bytes is returned. */ -static int -decode_spoe_varint(const char *buf, const char *end, uint64_t *i) -{ - unsigned char *msg = (unsigned char *)buf; - int idx = 0; - - if (msg >= (unsigned char *)end) + if (p >= (unsigned char *)end) return -1; - if (msg[0] < 240) { - *i = msg[0]; + if (i < 240) { + *p++ = i; + *buf = (char *)p; return 1; } - *i = msg[0]; - do { - ++idx; - if (msg+idx >= (unsigned char *)end) + + *p++ = (unsigned char)i | 240; + i = (i - 240) >> 4; + while (i >= 128) { + if (p >= (unsigned char *)end) return -1; - *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1)); - } while (msg[idx] >= 128); - return (idx + 1); + *p++ = (unsigned char)i | 128; + i = (i - 128) >> 7; + } + + if (p >= (unsigned char *)end) + return -1; + *p++ = (unsigned char)i; + + r = ((char *)p - *buf); + *buf = (char *)p; + return r; } -/* Encode a string. The string will be prefix by its length, encoded as a - * variable-length integer. This function never fails and returns the number of - * written bytes. */ +/* Decode a varint from <*buf> and save the decoded value in <*i>. See + * 'spoe_encode_varint' for details about varint. + * On success, it returns the number of read bytes and <*buf> is moved after the + * varint. Otherwise, it returns -1. */ static int -encode_spoe_string(const char *str, size_t len, char *dst) +spoe_decode_varint(char **buf, char *end, uint64_t *i) { - int idx = 0; + unsigned char *p = (unsigned char *)*buf; + int r; + + if (p >= (unsigned char *)end) + return -1; + + *i = *p++; + if (*i < 240) { + *buf = (char *)p; + return 1; + } + + r = 4; + do { + if (p >= (unsigned char *)end) + return -1; + *i += (uint64_t)*p << r; + r += 7; + } while (*p++ >= 128); + + r = ((char *)p - *buf); + *buf = (char *)p; + return r; +} + +/* Encode a buffer. Its length is encoded as a varint, followed by a copy + * of . It must have enough space in <*buf> to encode the buffer, else an + * error is triggered. + * On success, it returns and <*buf> is moved after the encoded value. If + * an error occurred, it returns -1. */ +static int +spoe_encode_buffer(const char *str, size_t len, char **buf, char *end) +{ + char *p = *buf; + int ret; + + if (p >= end) + return -1; if (!len) { - dst[0] = 0; - return 1; + *p++ = 0; + *buf = p; + return 0; } - idx += encode_spoe_varint(len, dst); - memcpy(dst+idx, str, len); - return (idx + len); + ret = spoe_encode_varint(len, &p, end); + if (ret == -1 || p + len > end) + return -1; + + memcpy(p, str, len); + *buf = p + len; + return len; } -/* Encode first part of a fragmented string. The string will be prefix by its - * length, encoded as a variable-length integer. This function never fails and - * returns the number of written bytes. */ +/* Encode a buffer, possibly partially. It does the same thing than + * 'spoe_encode_buffer', but if there is not enough space, it does not fail. + * On success, it returns the number of copied bytes and <*buf> is moved after + * the encoded value. If an error occured, it returns -1. */ static int -encode_frag_spoe_string(const char *str, size_t sz, size_t len, char *dst) +spoe_encode_frag_buffer(const char *str, size_t len, char **buf, char *end) { - int idx = 0; + char *p = *buf; + int ret; - if (!sz) { - dst[0] = 0; - return 1; + if (p >= end) + return -1; + + if (!len) { + *p++ = 0; + *buf = p; + return 0; } - idx += encode_spoe_varint(sz, dst); - memcpy(dst+idx, str, len); - return (idx + len); + ret = spoe_encode_varint(len, &p, end); + if (ret == -1 || p >= end) + return -1; + + ret = (p+len < end) ? len : (end - p); + memcpy(p, str, ret); + *buf = p + ret; + return ret; } -/* Decode a string. Its length is decoded first as a variable-length integer. If - * it succeeds, and if the string length is valid, the begin of the string is - * saved in <*str>, its length is saved in <*len> and the total numbre of bytes - * read is returned. If an error occurred, -1 is returned and <*str> remains - * NULL. */ +/* Decode a buffer. The buffer length is decoded and saved in <*len>. <*str> + * points on the first byte of the buffer. + * On success, it returns the buffer length and <*buf> is moved after the + * encoded buffer. Otherwise, it returns -1. */ static int -decode_spoe_string(char *buf, char *end, char **str, uint64_t *len) +spoe_decode_buffer(char **buf, char *end, char **str, size_t *len) { - int i, idx = 0; + char *p = *buf; + uint64_t sz; + int ret; *str = NULL; *len = 0; - if ((i = decode_spoe_varint(buf, end, len)) == -1) - goto error; - idx += i; - if (buf + idx + *len > end) - goto error; + ret = spoe_decode_varint(&p, end, &sz); + if (ret == -1 || p + sz > end) + return -1; - *str = buf+idx; - return (idx + *len); + *str = p; + *len = sz; + *buf = p + sz; + return sz; +} - error: - return -1; +/* Encode a typed data using value in . On success, it returns the number + * of copied bytes and <*buf> is moved after the encoded value. If an error + * occured, it returns -1. + * + * If the value is too big to be encoded, depending on its type, then encoding + * failed or the value is partially encoded. Only strings and binaries can be + * partially encoded. In this case, the offset <*off> is updated to known how + * many bytes has been encoded. If <*off> is zero at the end, it means that all + * data has been encoded. */ +static int +spoe_encode_data(struct sample *smp, unsigned int *off, char **buf, char *end) +{ + char *p = *buf; + int ret; + + if (p >= end) + return -1; + + if (smp == NULL) { + *p++ = SPOE_DATA_T_NULL; + goto end; + } + + switch (smp->data.type) { + case SMP_T_BOOL: + *p = SPOE_DATA_T_BOOL; + *p++ |= ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE); + break; + + case SMP_T_SINT: + *p++ = SPOE_DATA_T_INT64; + if (spoe_encode_varint(smp->data.u.sint, &p, end) == -1) + return -1; + break; + + case SMP_T_IPV4: + if (p + 5 > end) + return -1; + *p++ = SPOE_DATA_T_IPV4; + memcpy(p, &smp->data.u.ipv4, 4); + p += 4; + break; + + case SMP_T_IPV6: + if (p + 17 > end) + return -1; + *p++ = SPOE_DATA_T_IPV6; + memcpy(p, &smp->data.u.ipv6, 16); + p += 16; + break; + + case SMP_T_STR: + case SMP_T_BIN: { + struct chunk *chk = &smp->data.u.str; + + /* Here, we need to know if the sample has already been + * partially encoded. If yes, we only need to encode the + * remaining, <*off> reprensenting the number of bytes + * already encoded. */ + if (!*off) { + /* First evaluation of the sample : encode the + * type (string or binary), the buffer length + * (as a varint) and at least 1 byte of the + * buffer. */ + struct chunk *chk = &smp->data.u.str; + + *p++ = (smp->data.type == SMP_T_STR) + ? SPOE_DATA_T_STR + : SPOE_DATA_T_BIN; + ret = spoe_encode_frag_buffer(chk->str, chk->len, &p, end); + if (ret == -1) + return -1; + } + else { + /* The sample has been fragmented, encode remaining data */ + ret = MIN(chk->len - *off, end - p); + memcpy(p, chk->str + *off, ret); + p += ret; + } + /* Now update <*off> */ + if (ret + *off != chk->len) + *off += ret; + else + *off = 0; + break; + } + + case SMP_T_METH: + *p++ = SPOE_DATA_T_STR; + if (smp->data.u.meth.meth != HTTP_METH_OTHER) { + const struct http_method_name *meth = + &http_known_methods[smp->data.u.meth.meth]; + + if (spoe_encode_buffer(meth->name, meth->len, &p, end) == -1) + return -1; + } + else { + struct chunk *meth = &smp->data.u.meth.str; + + if (spoe_encode_buffer(meth->str, meth->len, &p, end) == -1) + return -1; + } + break; + + default: + *p++ = SPOE_DATA_T_NULL; + break; + } + + end: + ret = (p - *buf); + *buf = p; + return ret; } /* Skip a typed data. If an error occurred, -1 is returned, otherwise the number - * of bytes read is returned. A types data is composed of a type (1 byte) and - * corresponding data: + * of skipped bytes is returned and the <*buf> is moved after skipped data. + * + * A types data is composed of a type (1 byte) and corresponding data: * - boolean: non additional data (0 bytes) - * - integers: a variable-length integer (see decode_spoe_varint) + * - integers: a variable-length integer (see spoe_decode_varint) * - ipv4: 4 bytes * - ipv6: 16 bytes * - binary and string: a buffer prefixed by its size, a variable-length - * integer (see decode_spoe_string) */ + * integer (see spoe_decode_buffer) */ static int -skip_spoe_data(char *frame, char *end) +spoe_skip_data(char **buf, char *end) { - uint64_t sz = 0; - int i, idx = 0; + char *str, *p = *buf; + int type, ret; + size_t sz; + uint64_t v; - if (frame > end) + if (p >= end) return -1; - switch (frame[idx++] & SPOE_DATA_T_MASK) { + type = *p++; + switch (type & SPOE_DATA_T_MASK) { case SPOE_DATA_T_BOOL: break; case SPOE_DATA_T_INT32: case SPOE_DATA_T_INT64: case SPOE_DATA_T_UINT32: case SPOE_DATA_T_UINT64: - if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) + if (spoe_decode_varint(&p, end, &v) == -1) return -1; - idx += i; break; case SPOE_DATA_T_IPV4: - idx += 4; + if (p+4 > end) + return -1; + p += 4; break; case SPOE_DATA_T_IPV6: - idx += 16; + if (p+16 > end) + return -1; + p += 16; break; case SPOE_DATA_T_STR: case SPOE_DATA_T_BIN: - if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) + /* All the buffer must be skipped */ + if (spoe_decode_buffer(&p, end, &str, &sz) == -1) return -1; - idx += i + sz; break; } - if (frame+idx > end) - return -1; - return idx; + ret = (p - *buf); + *buf = p; + return ret; } -/* Decode a typed data. If an error occurred, -1 is returned, otherwise the - * number of read bytes is returned. See skip_spoe_data for details. */ +/* Decode a typed data and fill . If an error occurred, -1 is returned, + * otherwise the number of read bytes is returned and <*buf> is moved after the + * decoded data. See spoe_skip_data for details. */ static int -decode_spoe_data(char *frame, char *end, struct sample *smp) +spoe_decode_data(char **buf, char *end, struct sample *smp) { - uint64_t sz = 0; - int type, i, idx = 0; + char *str, *p = *buf; + int type, r = 0; + size_t sz; - if (frame > end) + if (p >= end) return -1; - type = frame[idx++]; + type = *p++; switch (type & SPOE_DATA_T_MASK) { case SPOE_DATA_T_BOOL: - smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE); + smp->data.u.sint = ((type & SPOE_DATA_FL_MASK) == SPOE_DATA_FL_TRUE); smp->data.type = SMP_T_BOOL; break; case SPOE_DATA_T_INT32: case SPOE_DATA_T_INT64: case SPOE_DATA_T_UINT32: case SPOE_DATA_T_UINT64: - if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1) + if (spoe_decode_varint(&p, end, (uint64_t *)&smp->data.u.sint) == -1) return -1; - idx += i; smp->data.type = SMP_T_SINT; break; case SPOE_DATA_T_IPV4: - if (frame+idx+4 > end) + if (p+4 > end) return -1; - memcpy(&smp->data.u.ipv4, frame+idx, 4); smp->data.type = SMP_T_IPV4; - idx += 4; + memcpy(&smp->data.u.ipv4, p, 4); + p += 4; break; case SPOE_DATA_T_IPV6: - if (frame+idx+16 > end) + if (p+16 > end) return -1; - memcpy(&smp->data.u.ipv6, frame+idx, 16); + memcpy(&smp->data.u.ipv6, p, 16); smp->data.type = SMP_T_IPV6; - idx += 16; + p += 16; break; case SPOE_DATA_T_STR: - if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) - return -1; - idx += i; - if (frame+idx+sz > end) - return -1; - smp->data.u.str.str = frame+idx; - smp->data.u.str.len = sz; - smp->data.type = SMP_T_STR; - idx += sz; - break; case SPOE_DATA_T_BIN: - if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) + /* All the buffer must be decoded */ + if (spoe_decode_buffer(&p, end, &str, &sz) == -1) return -1; - idx += i; - if (frame+idx+sz > end) - return -1; - smp->data.u.str.str = frame+idx; + smp->data.u.str.str = str; smp->data.u.str.len = sz; - smp->data.type = SMP_T_BIN; - idx += sz; + smp->data.type = (type == SPOE_DATA_T_STR) ? SMP_T_STR : SMP_T_BIN; break; } - if (frame+idx > end) - return -1; - return idx; + r = (p - *buf); + *buf = p; + return r; } -/* Skip an action in a frame received from an agent. If an error occurred, -1 is - * returned, otherwise the number of read bytes is returned. An action is - * composed of the action type followed by a typed data. */ +/* Encode the HELLO frame sent by HAProxy to an agent. It returns the number of + * encoded bytes in the frame on success, 0 if an encoding error occured and -1 + * if a fatal error occurred. */ static int -skip_spoe_action(char *frame, char *end) -{ - int n, i, idx = 0; - - if (frame+2 > end) - return -1; - - idx++; /* Skip the action type */ - n = frame[idx++]; - while (n-- > 0) { - if ((i = skip_spoe_data(frame+idx, end)) == -1) - return -1; - idx += i; - } - - if (frame+idx > end) - return -1; - return idx; -} - -/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on - * success, 0 if the frame can be ignored and -1 if an error occurred. */ -static int -prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) +spoe_prepare_hahello_frame(struct appctx *appctx, char *frame, size_t size) { struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - unsigned int flags = SPOE_FRM_FL_FIN; - int idx = 0; - size_t max = (7 /* TYPE + METADATA */ - + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL) - + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4 - + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL) - + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36); + char *p, *end; + unsigned int flags = SPOE_FRM_FL_FIN; + size_t sz; - if (size < max) { - spoe_status_code = SPOE_FRM_ERR_TOO_BIG; - return -1; - } + p = frame; + end = frame+size; - /* Frame type */ - frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO; + /* Set Frame type */ + *p++ = SPOE_FRM_T_HAPROXY_HELLO; /* Set flags */ - //flags = htonl(flags); - memcpy(frame+idx, (char *)&flags, 4); - idx += 4; + memcpy(p, (char *)&flags, 4); + p += 4; /* No stream-id and frame-id for HELLO frames */ - frame[idx++] = 0; - frame[idx++] = 0; + *p++ = 0; *p++ = 0; /* There are 3 mandatory items: "supported-versions", "max-frame-size" * and "capabilities" */ /* "supported-versions" K/V item */ - idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx); - frame[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx); + sz = SLEN(SUPPORTED_VERSIONS_KEY); + if (spoe_encode_buffer(SUPPORTED_VERSIONS_KEY, sz, &p, end) == -1) + goto too_big; + + *p++ = SPOE_DATA_T_STR; + sz = SLEN(SUPPORTED_VERSIONS_VAL); + if (spoe_encode_buffer(SUPPORTED_VERSIONS_VAL, sz, &p, end) == -1) + goto too_big; /* "max-fram-size" K/V item */ - idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx); - frame[idx++] = SPOE_DATA_T_UINT32; - idx += encode_spoe_varint(SPOE_APPCTX(appctx)->max_frame_size, frame+idx); + sz = SLEN(MAX_FRAME_SIZE_KEY); + if (spoe_encode_buffer(MAX_FRAME_SIZE_KEY, sz, &p, end) == -1) + goto too_big; + + *p++ = SPOE_DATA_T_UINT32; + if (spoe_encode_varint(SPOE_APPCTX(appctx)->max_frame_size, &p, end) == -1) + goto too_big; /* "capabilities" K/V item */ - idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx); - frame[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx); + sz = SLEN(CAPABILITIES_KEY); + if (spoe_encode_buffer(CAPABILITIES_KEY, sz, &p, end) == -1) + goto too_big; - /* "engine-id" K/V item */ + *p++ = SPOE_DATA_T_STR; + sz = SLEN(CAPABILITIES_VAL); + if (spoe_encode_buffer(CAPABILITIES_VAL, sz, &p, end) == -1) + goto too_big; + + /* (optionnal) "engine-id" K/V item, if present */ if (agent != NULL && agent->engine_id != NULL) { - idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx); - frame[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx); + sz = SLEN(ENGINE_ID_KEY); + if (spoe_encode_buffer(ENGINE_ID_KEY, sz, &p, end) == -1) + goto too_big; + + *p++ = SPOE_DATA_T_STR; + sz = strlen(agent->engine_id); + if (spoe_encode_buffer(agent->engine_id, sz, &p, end) == -1) + goto too_big; } - return idx; + return (p - frame); + + too_big: + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG; + return 0; } -/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame - * size on success, 0 if the frame can be ignored and -1 if an error - * occurred. */ +/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the number of + * encoded bytes in the frame on success, 0 if an encoding error occurred and -1 + * if a fatal error occurred. */ static int -prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size) +spoe_prepare_hadiscon_frame(struct appctx *appctx, char *frame, size_t size) { - const char *reason; + const char *reason; + char *p, *end; unsigned int flags = SPOE_FRM_FL_FIN; - int rlen, idx = 0; - size_t max = (7 /* TYPE + METADATA */ - + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2 - + 1 + SLEN(MSG_KEY) + 1 + 2 + 255); + size_t sz; - if (size < max) - return -1; + p = frame; + end = frame+size; - /* Get the message corresponding to the status code */ - if (spoe_status_code >= SPOE_FRM_ERRS) - spoe_status_code = SPOE_FRM_ERR_UNKNOWN; - reason = spoe_frm_err_reasons[spoe_status_code]; - rlen = strlen(reason); - - /* Frame type */ - frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON; + /* Set Frame type */ + *p++ = SPOE_FRM_T_HAPROXY_DISCON; /* Set flags */ - memcpy(frame+idx, (char *)&flags, 4); - idx += 4; + memcpy(p, (char *)&flags, 4); + p += 4; /* No stream-id and frame-id for DISCONNECT frames */ - frame[idx++] = 0; - frame[idx++] = 0; + *p++ = 0; *p++ = 0; + + if (SPOE_APPCTX(appctx)->status_code >= SPOE_FRM_ERRS) + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_UNKNOWN; /* There are 2 mandatory items: "status-code" and "message" */ /* "status-code" K/V item */ - idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx); - frame[idx++] = SPOE_DATA_T_UINT32; - idx += encode_spoe_varint(spoe_status_code, frame+idx); + sz = SLEN(STATUS_CODE_KEY); + if (spoe_encode_buffer(STATUS_CODE_KEY, sz, &p, end) == -1) + goto too_big; + + *p++ = SPOE_DATA_T_UINT32; + if (spoe_encode_varint(SPOE_APPCTX(appctx)->status_code, &p, end) == -1) + goto too_big; /* "message" K/V item */ - idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx); - frame[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(reason, rlen, frame+idx); + sz = SLEN(MSG_KEY); + if (spoe_encode_buffer(MSG_KEY, sz, &p, end) == -1) + goto too_big; - return idx; + /*Get the message corresponding to the status code */ + reason = spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code]; + + *p++ = SPOE_DATA_T_STR; + sz = strlen(reason); + if (spoe_encode_buffer(reason, sz, &p, end) == -1) + goto too_big; + + return (p - frame); + + too_big: + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG; + return 0; } -/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on - * success, 0 if the frame can be ignored and -1 if an error occurred. */ +/* Encode the NOTIFY frame sent by HAProxy to an agent. It returns the number of + * encoded bytes in the frame on success, 0 if an encoding error occurred and -1 + * if a fatal error occurred. */ static int -prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, +spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, char *frame, size_t size) { - int idx = 0; - unsigned int stream_id, frame_id, flags = SPOE_FRM_FL_FIN; + char *p, *end; + unsigned int stream_id, frame_id; + unsigned int flags = SPOE_FRM_FL_FIN; + size_t sz; - frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY; + p = frame; + end = frame+size; + /* is null when the stream has aborted the processing of a + * fragmented frame. In this case, we must notify the corresponding + * agent using ids stored in . */ if (ctx == NULL) { flags |= SPOE_FRM_FL_ABRT; stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid; @@ -984,103 +1192,117 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, frame_id = ctx->frame_id; if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) { + /* The fragmentation is not supported by the applet */ if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) { - spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; - return 0; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + return -1; } flags = ctx->frag_ctx.flags; } } + /* Set Frame type */ + *p++ = SPOE_FRM_T_HAPROXY_NOTIFY; + /* Set flags */ - memcpy(frame+idx, (char *)&flags, 4); - idx += 4; + memcpy(p, (char *)&flags, 4); + p += 4; /* Set stream-id and frame-id */ - idx += encode_spoe_varint(stream_id, frame+idx); - idx += encode_spoe_varint(frame_id, frame+idx); + if (spoe_encode_varint(stream_id, &p, end) == -1) + goto too_big; + if (spoe_encode_varint(frame_id, &p, end) == -1) + goto too_big; - /* check the buffer size */ - if (idx + SPOE_APPCTX(appctx)->buffer->i > size) { - spoe_status_code = SPOE_FRM_ERR_TOO_BIG; + /* Copy encoded messages, if possible */ + sz = SPOE_APPCTX(appctx)->buffer->i; + if (p + sz >= end) + goto too_big; + memcpy(p, SPOE_APPCTX(appctx)->buffer->p, sz); + p += sz; + + return (p - frame); + + too_big: + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG; + return 0; +} + +/* Decode and process the HELLO frame sent by an agent. It returns the number of + * read bytes on success, 0 if a decoding error occurred, and -1 if a fatal + * error occurred. */ +static int +spoe_handle_agenthello_frame(struct appctx *appctx, char *frame, size_t size) +{ + char *p, *end; + int vsn, max_frame_size; + unsigned int flags; + + p = frame; + end = frame + size; + + /* Check frame type */ + if (*p++ != SPOE_FRM_T_AGENT_HELLO) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; return 0; } - /* Copy encoded messages */ - memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i); - idx += SPOE_APPCTX(appctx)->buffer->i; - return idx; -} - -/* Decode HELLO frame sent by an agent. It returns the number of by read bytes - * on success, 0 if the frame can be ignored and -1 if an error occurred. */ -static int -handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) -{ - int vsn, max_frame_size, i, idx = 0; - unsigned int flags; - size_t min_size = (7 /* TYPE + METADATA */ - + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3 - + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1 - + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0); - - /* Check frame type */ - if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO) + if (size < 7 /* TYPE + METADATA */) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; return 0; - - if (size < min_size) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; } /* Retrieve flags */ - memcpy((char *)&flags, frame+idx, 4); - idx += 4; + memcpy((char *)&flags, p, 4); + p += 4; /* Fragmentation is not supported for HELLO frame */ if (!(flags & SPOE_FRM_FL_FIN)) { - spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; return -1; } /* stream-id and frame-id must be cleared */ - if (frame[idx] != 0 || frame[idx+1] != 0) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (*p != 0 || *(p+1) != 0) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += 2; + p += 2; /* There are 3 mandatory items: "version", "max-frame-size" and * "capabilities" */ /* Loop on K/V items */ vsn = max_frame_size = flags = 0; - while (idx < size) { - char *str; - uint64_t sz; + while (p < end) { + char *str; + size_t sz; + int ret; /* Decode the item key */ - idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); - if (str == NULL) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + ret = spoe_decode_buffer(&p, end, &str, &sz); + if (ret == -1 || !sz) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } + /* Check "version" K/V item */ if (!memcmp(str, VERSION_KEY, sz)) { + int i, type = *p++; + /* The value must be a string */ - if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); - if (str == NULL) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (spoe_decode_buffer(&p, end, &str, &sz) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - vsn = decode_spoe_version(str, sz); + vsn = spoe_str_to_vsn(str, sz); if (vsn == -1) { - spoe_status_code = SPOE_FRM_ERR_BAD_VSN; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_VSN; return -1; } for (i = 0; supported_versions[i].str != NULL; ++i) { @@ -1089,242 +1311,257 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) break; } if (supported_versions[i].str == NULL) { - spoe_status_code = SPOE_FRM_ERR_BAD_VSN; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_VSN; return -1; } } /* Check "max-frame-size" K/V item */ else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) { - int type; + int type = *p++; /* The value must be integer */ - type = frame[idx++]; if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (spoe_decode_varint(&p, end, &sz) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += i; - if (sz < MIN_FRAME_SIZE || sz > SPOE_APPCTX(appctx)->max_frame_size) { - spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE; + if (sz < MIN_FRAME_SIZE || + sz > SPOE_APPCTX(appctx)->max_frame_size) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE; return -1; } max_frame_size = sz; } /* Check "capabilities" K/V item */ else if (!memcmp(str, CAPABILITIES_KEY, sz)) { - int i; + int type = *p++; /* The value must be a string */ - if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; + } + if (spoe_decode_buffer(&p, end, &str, &sz) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); - if (str == NULL) - continue; - i = 0; - while (i < sz) { + while (sz) { char *delim; /* Skip leading spaces */ - for (; isspace(str[i]) && i < sz; i++); + for (; isspace(*str) && sz; str++, sz--); - if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) { - i += 10; - if (sz == i || isspace(str[i]) || str[i] == ',') + if (sz >= 10 && !strncmp(str, "pipelining", 10)) { + str += 10; sz -= 10; + if (!sz || isspace(*str) || *str == ',') flags |= SPOE_APPCTX_FL_PIPELINING; } - else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) { - i += 5; - if (sz == i || isspace(str[i]) || str[i] == ',') + else if (sz >= 5 && !strncmp(str, "async", 5)) { + str += 5; sz -= 5; + if (!sz || isspace(*str) || *str == ',') flags |= SPOE_APPCTX_FL_ASYNC; } - else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) { - i += 13; - if (sz == i || isspace(str[i]) || str[i] == ',') + else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) { + str += 13; sz -= 13; + if (!sz || isspace(*str) || *str == ',') flags |= SPOE_APPCTX_FL_FRAGMENTATION; } - if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL) + /* Get the next comma or break */ + if (!sz || (delim = memchr(str, ',', sz)) == NULL) break; - i = (delim - str) + 1; + delim++; + sz -= (delim - str); + str = delim; } } else { /* Silently ignore unknown item */ - if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (spoe_skip_data(&p, end) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += i; } } /* Final checks */ if (!vsn) { - spoe_status_code = SPOE_FRM_ERR_NO_VSN; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NO_VSN; return -1; } if (!max_frame_size) { - spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NO_FRAME_SIZE; return -1; } SPOE_APPCTX(appctx)->version = (unsigned int)vsn; SPOE_APPCTX(appctx)->max_frame_size = (unsigned int)max_frame_size; SPOE_APPCTX(appctx)->flags |= flags; - return idx; + + return (p - frame); } /* Decode DISCONNECT frame sent by an agent. It returns the number of by read * bytes on success, 0 if the frame can be ignored and -1 if an error * occurred. */ static int -handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) +spoe_handle_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) { - int i, idx = 0; + char *p, *end; unsigned int flags; - size_t min_size = (7 /* TYPE + METADATA */ - + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1 - + 1 + SLEN(MSG_KEY) + 1 + 1); + + p = frame; + end = frame + size; /* Check frame type */ - if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON) + if (*p++ != SPOE_FRM_T_AGENT_DISCON) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; return 0; + } - if (size < min_size) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (size < 7 /* TYPE + METADATA */) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } /* Retrieve flags */ - memcpy((char *)&flags, frame+idx, 4); - idx += 4; + memcpy((char *)&flags, p, 4); + p += 4; /* Fragmentation is not supported for DISCONNECT frame */ if (!(flags & SPOE_FRM_FL_FIN)) { - spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; return -1; } /* stream-id and frame-id must be cleared */ - if (frame[idx] != 0 || frame[idx+1] != 0) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (*p != 0 || *(p+1) != 0) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += 2; + p += 2; /* There are 2 mandatory items: "status-code" and "message" */ /* Loop on K/V items */ - while (idx < size) { - char *str; - uint64_t sz; + while (p < end) { + char *str; + size_t sz; + int ret; /* Decode the item key */ - idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); - if (str == NULL) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + ret = spoe_decode_buffer(&p, end, &str, &sz); + if (ret == -1 || !sz) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } /* Check "status-code" K/V item */ if (!memcmp(str, STATUS_CODE_KEY, sz)) { - int type; + int type = *p++; /* The value must be an integer */ - type = frame[idx++]; if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (spoe_decode_varint(&p, end, &sz) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += i; - spoe_status_code = sz; + SPOE_APPCTX(appctx)->status_code = sz; } /* Check "message" K/V item */ - else if (sz && !memcmp(str, MSG_KEY, sz)) { + else if (!memcmp(str, MSG_KEY, sz)) { + int type = *p++; + /* The value must be a string */ - if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); - if (str == NULL || sz > 255) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + ret = spoe_decode_buffer(&p, end, &str, &sz); + if (ret == -1 || sz > 255) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - memcpy(spoe_reason, str, sz); - spoe_reason[sz] = 0; +#if defined(DEBUG_SPOE) || defined(DEBUG_FULL) + SPOE_APPCTX(appctx)->reason = str; + SPOE_APPCTX(appctx)->rlen = sz; +#endif } else { /* Silently ignore unknown item */ - if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (spoe_skip_data(&p, end) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } - idx += i; } } - return idx; + return (p - frame); } /* Decode ACK frame sent by an agent. It returns the number of read bytes on * success, 0 if the frame can be ignored and -1 if an error occurred. */ static int -handle_spoe_agentack_frame(struct appctx *appctx, struct spoe_context **ctx, +spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx, char *frame, size_t size) { struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; + char *p, *end; uint64_t stream_id, frame_id; - int i, idx = 0; + int len; unsigned int flags; - size_t min_size = (7 /* TYPE + METADATA */); + + p = frame; + end = frame + size; + *ctx = NULL; /* Check frame type */ - if (frame[idx++] != SPOE_FRM_T_AGENT_ACK) + if (*p++ != SPOE_FRM_T_AGENT_ACK) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; return 0; + } - if (size < min_size) { - spoe_status_code = SPOE_FRM_ERR_INVALID; - return -1; + if (size < 7 /* TYPE + METADATA */) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; + return 0; } /* Retrieve flags */ - memcpy((char *)&flags, frame+idx, 4); - idx += 4; + memcpy((char *)&flags, p, 4); + p += 4; /* Fragmentation is not supported for now */ if (!(flags & SPOE_FRM_FL_FIN)) { - spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; return -1; } /* Get the stream-id and the frame-id */ - if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1) + if (spoe_decode_varint(&p, end, &stream_id) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; return 0; - idx += i; - if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1) + } + if (spoe_decode_varint(&p, end, &frame_id) == -1) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID; return 0; - idx += i; + } + /* Try to find the corresponding SPOE context */ if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { list_for_each_entry((*ctx), &agent->waiting_queue, list) { if ((*ctx)->stream_id == (unsigned int)stream_id && @@ -1335,56 +1572,62 @@ handle_spoe_agentack_frame(struct appctx *appctx, struct spoe_context **ctx, else { list_for_each_entry((*ctx), &SPOE_APPCTX(appctx)->waiting_queue, list) { if ((*ctx)->stream_id == (unsigned int)stream_id && - (*ctx)->frame_id == (unsigned int)frame_id) + (*ctx)->frame_id == (unsigned int)frame_id) goto found; } } - /* FIXME: check if ABRT bit is set for a unfinished fragmented frame */ /* No Stream found, ignore the frame */ - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Ignore ACK frame" + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - Ignore ACK frame" " - stream-id=%u - frame-id=%u\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx, (unsigned int)stream_id, (unsigned int)frame_id); - *ctx = NULL; + /* FIXME: Define a proper error for this case (SPOE_FRM_ERR_FRAMEID_NOTFOUND ?) */ return 0; found: - if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait)) { + if (!spoe_acquire_buffer(&SPOE_APPCTX(appctx)->buffer, + &SPOE_APPCTX(appctx)->buffer_wait)) { *ctx = NULL; return 1; /* Retry later */ } /* Copy encoded actions */ - memcpy(SPOE_APPCTX(appctx)->buffer->p, frame+idx, size-idx); - SPOE_APPCTX(appctx)->buffer->i = size-idx; + len = (end - p); + memcpy(SPOE_APPCTX(appctx)->buffer->p, p, len); + SPOE_APPCTX(appctx)->buffer->i = len; + p += len; /* Transfer the buffer ownership to the SPOE context */ (*ctx)->buffer = SPOE_APPCTX(appctx)->buffer; SPOE_APPCTX(appctx)->buffer = &buf_empty; + (*ctx)->state = SPOE_CTX_ST_DONE; + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - ACK frame received - ctx=%p - stream-id=%u - frame-id=%u\n", + " - ACK frame received" + " - ctx=%p - stream-id=%u - frame-id=%u - flags=0x%08x\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, appctx, - *ctx, (*ctx)->stream_id, (*ctx)->frame_id); - return idx; + __FUNCTION__, appctx, *ctx, (*ctx)->stream_id, + (*ctx)->frame_id, flags); + return (p - frame); } /* This function is used in cfgparse.c and declared in proto/checks.h. It * prepare the request to send to agents during a healthcheck. It returns 0 on * success and -1 if an error occurred. */ int -prepare_spoe_healthcheck_request(char **req, int *len) +spoe_prepare_healthcheck_request(char **req, int *len) { - struct appctx appctx; - struct spoe_appctx spoe_appctx; - char *frame, buf[MAX_FRAME_SIZE+4]; - unsigned int framesz; - int idx; + struct appctx appctx; + struct spoe_appctx spoe_appctx; + char *frame, *end, buf[MAX_FRAME_SIZE+4]; + size_t sz; + int ret; memset(&appctx, 0, sizeof(appctx)); memset(&spoe_appctx, 0, sizeof(spoe_appctx)); @@ -1393,24 +1636,27 @@ prepare_spoe_healthcheck_request(char **req, int *len) appctx.ctx.spoe.ptr = &spoe_appctx; SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE; - frame = buf+4; - idx = prepare_spoe_hahello_frame(&appctx, frame, MAX_FRAME_SIZE); - if (idx <= 0) - return -1; - if (idx + SLEN(HEALTHCHECK_KEY) + 1 > MAX_FRAME_SIZE) - return -1; + frame = buf+4; /* Reserved the 4 first bytes for the frame size */ + end = frame + MAX_FRAME_SIZE; - /* "healthcheck" K/V item */ - idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx); - frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE); - - framesz = htonl(idx); - memcpy(buf, (char *)&framesz, 4); - - if ((*req = malloc(idx+4)) == NULL) + ret = spoe_prepare_hahello_frame(&appctx, frame, MAX_FRAME_SIZE); + if (ret <= 0) return -1; - memcpy(*req, buf, idx+4); - *len = idx+4; + frame += ret; + + /* Add "healthcheck" K/V item */ + sz = SLEN(HEALTHCHECK_KEY); + if (spoe_encode_buffer(HEALTHCHECK_KEY, sz, &frame, end) == -1) + return -1; + *frame++ = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE); + + *len = frame - buf; + sz = htonl(*len - 4); + memcpy(buf, (char *)&sz, 4); + + if ((*req = malloc(*len)) == NULL) + return -1; + memcpy(*req, buf, *len); return 0; } @@ -1418,11 +1664,10 @@ prepare_spoe_healthcheck_request(char **req, int *len) * the response received from an agent during a healthcheck. It returns 0 on * success and -1 if an error occurred. */ int -handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen) +spoe_handle_healthcheck_response(char *frame, size_t size, char *err, int errlen) { struct appctx appctx; struct spoe_appctx spoe_appctx; - int r; memset(&appctx, 0, sizeof(appctx)); memset(&spoe_appctx, 0, sizeof(spoe_appctx)); @@ -1430,20 +1675,19 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen appctx.ctx.spoe.ptr = &spoe_appctx; SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE; - if (handle_spoe_agentdiscon_frame(&appctx, frame, size) != 0) - goto error; - if ((r = handle_spoe_agenthello_frame(&appctx, frame, size)) <= 0) { - if (r == 0) - spoe_status_code = SPOE_FRM_ERR_INVALID; + if (*frame == SPOE_FRM_T_AGENT_DISCON) { + spoe_handle_agentdiscon_frame(&appctx, frame, size); goto error; } + if (spoe_handle_agenthello_frame(&appctx, frame, size) <= 0) + goto error; return 0; error: - if (spoe_status_code >= SPOE_FRM_ERRS) - spoe_status_code = SPOE_FRM_ERR_UNKNOWN; - strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen); + if (SPOE_APPCTX(&appctx)->status_code >= SPOE_FRM_ERRS) + SPOE_APPCTX(&appctx)->status_code = SPOE_FRM_ERR_UNKNOWN; + strncpy(err, spoe_frm_err_reasons[SPOE_APPCTX(&appctx)->status_code], errlen); return -1; } @@ -1451,23 +1695,28 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen * the frame can be ignored, 1 to retry later, and the frame legnth on * success. */ static int -send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) +spoe_send_frame(struct appctx *appctx, char *buf, size_t framesz) { struct stream_interface *si = appctx->owner; - int ret; - uint32_t netint; + int ret; + uint32_t netint; if (si_ic(si)->buf == &buf_empty) - return 1; + goto retry; + /* 4 bytes are reserved at the beginning of to store the frame + * length. */ netint = htonl(framesz); memcpy(buf, (char *)&netint, 4); ret = bi_putblk(si_ic(si), buf, framesz+4); if (ret <= 0) { - if (ret == -1) + if (ret == -1) { + retry: + si_applet_cant_put(si); return 1; /* retry */ - spoe_status_code = SPOE_FRM_ERR_IO; + } + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO; return -1; /* error */ } return framesz; @@ -1477,28 +1726,30 @@ send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) * when the frame can be ignored, 1 to retry later and the frame length on * success. */ static int -recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) +spoe_recv_frame(struct appctx *appctx, char *buf, size_t framesz) { struct stream_interface *si = appctx->owner; - int ret; - uint32_t netint; + int ret; + uint32_t netint; if (si_oc(si)->buf == &buf_empty) - return 1; + goto retry; ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0); if (ret > 0) { framesz = ntohl(netint); if (framesz > SPOE_APPCTX(appctx)->max_frame_size) { - spoe_status_code = SPOE_FRM_ERR_TOO_BIG; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG; return -1; } - ret = bo_getblk(si_oc(si), trash.str, framesz, 4); + ret = bo_getblk(si_oc(si), buf, framesz, 4); } if (ret <= 0) { - if (ret == 0) + if (ret == 0) { + retry: return 1; /* retry */ - spoe_status_code = SPOE_FRM_ERR_IO; + } + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO; return -1; /* error */ } return framesz; @@ -1508,7 +1759,7 @@ recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) * Functions that manage the SPOE applet ********************************************************************/ static int -wakeup_spoe_appctx(struct appctx *appctx) +spoe_wakeup_appctx(struct appctx *appctx) { si_applet_want_get(appctx->owner); si_applet_want_put(appctx->owner); @@ -1519,39 +1770,47 @@ wakeup_spoe_appctx(struct appctx *appctx) /* Callback function that catches applet timeouts. If a timeout occurred, we set * st1> flag and the SPOE applet is woken up. */ static struct task * -process_spoe_applet(struct task * task) +spoe_process_appctx(struct task * task) { struct appctx *appctx = task->context; appctx->st1 = SPOE_APPCTX_ERR_NONE; if (tick_is_expired(task->expire, now_ms)) { task->expire = TICK_ETERNITY; - appctx->st1 = SPOE_APPCTX_ERR_TOUT; + appctx->st1 = SPOE_APPCTX_ERR_TOUT; } - wakeup_spoe_appctx(appctx); + spoe_wakeup_appctx(appctx); return task; } /* Callback function that releases a SPOE applet. This happens when the * connection with the agent is closed. */ static void -release_spoe_applet(struct appctx *appctx) +spoe_release_appctx(struct appctx *appctx) { - struct stream_interface *si = appctx->owner; - struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; + struct stream_interface *si = appctx->owner; + struct spoe_appctx *spoe_appctx = SPOE_APPCTX(appctx); + struct spoe_agent *agent; struct spoe_context *ctx, *back; - struct spoe_appctx *spoe_appctx; + + if (spoe_appctx == NULL) + return; + + appctx->ctx.spoe.ptr = NULL; + agent = spoe_appctx->agent; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx); + /* Remove applet from the list of running applets */ agent->applets_act--; - if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->list)) { - LIST_DEL(&SPOE_APPCTX(appctx)->list); - LIST_INIT(&SPOE_APPCTX(appctx)->list); + if (!LIST_ISEMPTY(&spoe_appctx->list)) { + LIST_DEL(&spoe_appctx->list); + LIST_INIT(&spoe_appctx->list); } + /* Shutdown the server connection, if needed */ if (appctx->st0 != SPOE_APPCTX_ST_END) { if (appctx->st0 == SPOE_APPCTX_ST_IDLE) agent->applets_idle--; @@ -1560,50 +1819,56 @@ release_spoe_applet(struct appctx *appctx) si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; appctx->st0 = SPOE_APPCTX_ST_END; - if (SPOE_APPCTX(appctx)->status_code == SPOE_FRM_ERR_NONE) - SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO; + if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE) + spoe_appctx->status_code = SPOE_FRM_ERR_IO; } - if (SPOE_APPCTX(appctx)->task) { - task_delete(SPOE_APPCTX(appctx)->task); - task_free(SPOE_APPCTX(appctx)->task); + /* Destroy the task attached to this applet */ + if (spoe_appctx->task) { + task_delete(spoe_appctx->task); + task_free(spoe_appctx->task); } - list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) { + /* Notify all waiting streams */ + list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); ctx->state = SPOE_CTX_ST_ERROR; - ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); + ctx->status_code = (spoe_appctx->status_code + 0x100); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } - if (SPOE_APPCTX(appctx)->frag_ctx.ctx) { - ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx; + /* If the applet was processing a fragmented frame, notify the + * corresponding stream. */ + if (spoe_appctx->frag_ctx.ctx) { + ctx = spoe_appctx->frag_ctx.ctx; ctx->frag_ctx.spoe_appctx = NULL; ctx->state = SPOE_CTX_ST_ERROR; - ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); + ctx->status_code = (spoe_appctx->status_code + 0x100); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } - release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); - pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx)); + /* Release allocated memory */ + spoe_release_buffer(&spoe_appctx->buffer, + &spoe_appctx->buffer_wait); + pool_free2(pool2_spoe_appctx, spoe_appctx); if (!LIST_ISEMPTY(&agent->applets)) goto end; + /* If this was the last running applet, notify all waiting streams */ list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); ctx->state = SPOE_CTX_ST_ERROR; - ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); + ctx->status_code = (spoe_appctx->status_code + 0x100); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } - list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); ctx->state = SPOE_CTX_ST_ERROR; - ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); + ctx->status_code = (spoe_appctx->status_code + 0x100); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } @@ -1611,15 +1876,16 @@ release_spoe_applet(struct appctx *appctx) /* Update runtinme agent info */ agent->frame_size = agent->max_frame_size; list_for_each_entry(spoe_appctx, &agent->applets, list) - agent->frame_size = MIN(spoe_appctx->max_frame_size, agent->frame_size); + agent->frame_size = MIN(spoe_appctx->max_frame_size, + agent->frame_size); } static int -handle_connect_spoe_applet(struct appctx *appctx) +spoe_handle_connect_appctx(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - char *frame = trash.str; + char *frame, *buf; int ret; if (si->state <= SI_ST_CON) { @@ -1628,36 +1894,42 @@ handle_connect_spoe_applet(struct appctx *appctx) goto stop; } if (si->state != SI_ST_EST) { - spoe_status_code = SPOE_FRM_ERR_IO; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO; goto exit; } if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx); - spoe_status_code = SPOE_FRM_ERR_TOUT; + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - Connection timed out\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx); + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT; goto exit; } if (SPOE_APPCTX(appctx)->task->expire == TICK_ETERNITY) - SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.hello); + SPOE_APPCTX(appctx)->task->expire = + tick_add_ifset(now_ms, agent->timeout.hello); - ret = prepare_spoe_hahello_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size); + /* 4 bytes are reserved at the beginning of to store the frame + * length. */ + buf = trash.str; frame = buf+4; + ret = spoe_prepare_hahello_frame(appctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); if (ret > 1) - ret = send_spoe_frame(appctx, frame, ret); + ret = spoe_send_frame(appctx, buf, ret); switch (ret) { case -1: /* error */ - goto exit; - case 0: /* ignore => an error, cannot be ignored */ goto exit; case 1: /* retry later */ - si_applet_cant_put(si); goto stop; - default: /* CONNECT frame successfully sent */ + default: + /* HELLO frame successfully sent, now wait for the + * reply. */ appctx->st0 = SPOE_APPCTX_ST_CONNECTING; goto next; } @@ -1667,52 +1939,48 @@ handle_connect_spoe_applet(struct appctx *appctx) stop: return 1; exit: - SPOE_APPCTX(appctx)->status_code = spoe_status_code; appctx->st0 = SPOE_APPCTX_ST_EXIT; return 0; } static int -handle_connecting_spoe_applet(struct appctx *appctx) +spoe_handle_connecting_appctx(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - char *frame = trash.str; - int ret, framesz = 0; + char *frame; + int ret; if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - spoe_status_code = SPOE_FRM_ERR_IO; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO; goto exit; } if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx); - spoe_status_code = SPOE_FRM_ERR_TOUT; + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - Connection timed out\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx); + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT; goto exit; } - ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size); + frame = trash.str; trash.len = 0; + ret = spoe_recv_frame(appctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); if (ret > 1) { if (*frame == SPOE_FRM_T_AGENT_DISCON) { appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; goto next; } - framesz = ret; - ret = handle_spoe_agenthello_frame(appctx, frame, framesz); + trash.len = ret + 4; + ret = spoe_handle_agenthello_frame(appctx, frame, ret); } switch (ret) { case -1: /* error */ - if (framesz) - bo_skip(si_oc(si), framesz+4); - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - goto next; - - case 0: /* ignore */ - if (framesz) - bo_skip(si_oc(si), framesz+4); + case 0: /* ignore => an error, cannot be ignored */ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; goto next; @@ -1720,51 +1988,192 @@ handle_connecting_spoe_applet(struct appctx *appctx) goto stop; default: - /* hello handshake is finished, set the idle timeout, - * Add the appctx in the agent cache, decrease the - * number of new applets and wake up waiting streams. */ - if (framesz) - bo_skip(si_oc(si), framesz+4); + /* HELLO handshake is finished, set the idle timeout and + * add the applet in the list of running applets. */ agent->applets_idle++; appctx->st0 = SPOE_APPCTX_ST_IDLE; LIST_DEL(&SPOE_APPCTX(appctx)->list); LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list); /* Update runtinme agent info */ - agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size, agent->frame_size); + agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size, + agent->frame_size); goto next; } next: - SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + /* Do not forget to remove processed frame from the output buffer */ + if (trash.len) + bo_skip(si_oc(si), trash.len); + + SPOE_APPCTX(appctx)->task->expire = + tick_add_ifset(now_ms, agent->timeout.idle); return 0; stop: return 1; exit: - SPOE_APPCTX(appctx)->status_code = spoe_status_code; appctx->st0 = SPOE_APPCTX_ST_EXIT; return 0; } static int -handle_processing_spoe_applet(struct appctx *appctx) +spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx, + int *skip) +{ + struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; + char *frame, *buf; + int ret; + + /* 4 bytes are reserved at the beginning of to store the frame + * length. */ + buf = trash.str; frame = buf+4; + ret = spoe_prepare_hanotify_frame(appctx, ctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); + if (ret > 1) + ret = spoe_send_frame(appctx, buf, ret); + + switch (ret) { + case -1: /* error */ + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto end; + + case 0: /* ignore */ + if (ctx == NULL) + goto abort_frag_frame; + + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_ERROR; + ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + break; + + case 1: /* retry */ + *skip = 1; + break; + + default: + if (ctx == NULL) + goto abort_frag_frame; + + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) || + (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN)) + goto no_frag_frame_sent; + else { + *skip = 1; + goto frag_frame_sent; + } + } + goto end; + + frag_frame_sent: + appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY; + SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx; + SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id; + SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id; + + ctx->frag_ctx.spoe_appctx = SPOE_APPCTX(appctx); + ctx->state = SPOE_CTX_ST_ENCODING_MSGS; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + goto end; + + no_frag_frame_sent: + if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + LIST_ADDQ(&agent->waiting_queue, &ctx->list); + } + else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) { + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); + } + else { + appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK; + LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); + } + SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; + SPOE_APPCTX(appctx)->frag_ctx.cursid = 0; + SPOE_APPCTX(appctx)->frag_ctx.curfid = 0; + + ctx->frag_ctx.spoe_appctx = NULL; + ctx->state = SPOE_CTX_ST_WAITING_ACK; + goto end; + + abort_frag_frame: + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; + SPOE_APPCTX(appctx)->frag_ctx.cursid = 0; + SPOE_APPCTX(appctx)->frag_ctx.curfid = 0; + goto end; + + end: + return ret; +} + +static int +spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip) +{ + struct spoe_context *ctx = NULL; + char *frame; + int ret; + + frame = trash.str; trash.len = 0; + ret = spoe_recv_frame(appctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); + if (ret > 1) { + if (*frame == SPOE_FRM_T_AGENT_DISCON) { + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + goto end; + } + trash.len = ret + 4; + ret = spoe_handle_agentack_frame(appctx, &ctx, frame, ret); + } + switch (ret) { + case -1: /* error */ + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + break; + + case 0: /* ignore */ + break; + + case 1: /* retry */ + *skip = 1; + break; + + default: + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + break; + } + + /* Do not forget to remove processed frame from the output buffer */ + if (trash.len) + bo_skip(si_oc(appctx->owner), trash.len); + end: + return ret; +} + +static int +spoe_handle_processing_appctx(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - struct spoe_context *ctx = NULL; - char *frame = trash.str; + struct spoe_context *ctx = NULL; unsigned int fpa = 0; - int ret, framesz = 0, skip_sending = 0, skip_receiving = 0; + int ret, skip_sending = 0, skip_receiving = 0; if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - spoe_status_code = SPOE_FRM_ERR_IO; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO; goto exit; } if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - spoe_status_code = SPOE_FRM_ERR_TOUT; - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - appctx->st1 = SPOE_APPCTX_ERR_NONE; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT; + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + appctx->st1 = SPOE_APPCTX_ERR_NONE; goto next; } @@ -1774,7 +2183,8 @@ handle_processing_spoe_applet(struct appctx *appctx) " - appctx-state=%s\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx, fpa, agent->max_fpa, - skip_sending, skip_receiving, spoe_appctx_state_str[appctx->st0]); + skip_sending, skip_receiving, + spoe_appctx_state_str[appctx->st0]); if (fpa > agent->max_fpa || (skip_sending && skip_receiving)) goto stop; @@ -1801,151 +2211,55 @@ handle_processing_spoe_applet(struct appctx *appctx) SPOE_APPCTX(appctx)->buffer = ctx->buffer; ctx->buffer = &buf_empty; } - - ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size); - if (ret > 1) - ret = send_spoe_frame(appctx, frame, ret); - + ret = spoe_handle_sending_frame_appctx(appctx, ctx, &skip_sending); switch (ret) { case -1: /* error */ - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; goto next; case 0: /* ignore */ - release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); + spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer, + &SPOE_APPCTX(appctx)->buffer_wait); agent->sending_rate++; fpa++; - - LIST_DEL(&ctx->list); - LIST_INIT(&ctx->list); - ctx->state = SPOE_CTX_ST_ERROR; - ctx->status_code = (spoe_status_code + 0x100); - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); break; case 1: /* retry */ - si_applet_cant_put(si); - skip_sending = 1; break; default: - release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); + spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer, + &SPOE_APPCTX(appctx)->buffer_wait); agent->sending_rate++; fpa++; - - if (ctx == NULL) { - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; - SPOE_APPCTX(appctx)->frag_ctx.cursid = 0; - SPOE_APPCTX(appctx)->frag_ctx.curfid = 0; - break; - } - LIST_DEL(&ctx->list); - LIST_INIT(&ctx->list); - - if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) { - if (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN) { - if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - LIST_ADDQ(&agent->waiting_queue, &ctx->list); - } - else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) { - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); - } - else { - appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK; - LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); - } - SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; - SPOE_APPCTX(appctx)->frag_ctx.cursid = 0; - SPOE_APPCTX(appctx)->frag_ctx.curfid = 0; - - ctx->frag_ctx.spoe_appctx = NULL; - ctx->state = SPOE_CTX_ST_WAITING_ACK; - } - else { - appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY; - SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx; - SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id; - SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id; - - ctx->frag_ctx.spoe_appctx = SPOE_APPCTX(appctx); - ctx->state = SPOE_CTX_ST_ENCODING_MSGS; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - skip_sending = 1; - } - } - else { - if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - LIST_ADDQ(&agent->waiting_queue, &ctx->list); - } - else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) { - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); - } - else { - appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK; - LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); - } - - ctx->state = SPOE_CTX_ST_WAITING_ACK; - } + break; } - if (fpa > agent->max_fpa) goto stop; recv_frame: if (skip_receiving) goto process; - - framesz = 0; - ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size); - if (ret > 1) { - if (*frame == SPOE_FRM_T_AGENT_DISCON) { - appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; - goto next; - } - framesz = ret; - ret = handle_spoe_agentack_frame(appctx, &ctx, frame, framesz); - } + ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving); switch (ret) { case -1: /* error */ - if (framesz) - bo_skip(si_oc(si), framesz+4); - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; goto next; case 0: /* ignore */ - if (framesz) - bo_skip(si_oc(si), framesz+4); fpa++; break; case 1: /* retry */ - skip_receiving = 1; break; default: - if (framesz) - bo_skip(si_oc(si), framesz+4); fpa++; - - LIST_DEL(&ctx->list); - LIST_INIT(&ctx->list); - - if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - - ctx->state = SPOE_CTX_ST_DONE; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + break; } goto process; next: - SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + SPOE_APPCTX(appctx)->task->expire = + tick_add_ifset(now_ms, agent->timeout.idle); return 0; stop: if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) { @@ -1956,25 +2270,23 @@ handle_processing_spoe_applet(struct appctx *appctx) LIST_DEL(&SPOE_APPCTX(appctx)->list); LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list); if (fpa) - SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + SPOE_APPCTX(appctx)->task->expire = + tick_add_ifset(now_ms, agent->timeout.idle); } return 1; exit: - SPOE_APPCTX(appctx)->status_code = spoe_status_code; appctx->st0 = SPOE_APPCTX_ST_EXIT; return 0; } static int -handle_disconnect_spoe_applet(struct appctx *appctx) +spoe_handle_disconnect_appctx(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - char *frame = trash.str; - int ret; - - SPOE_APPCTX(appctx)->status_code = spoe_status_code; + char *frame, *buf; + int ret; if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) goto exit; @@ -1982,34 +2294,37 @@ handle_disconnect_spoe_applet(struct appctx *appctx) if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) goto exit; - ret = prepare_spoe_hadiscon_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size); + /* 4 bytes are reserved at the beginning of to store the frame + * length. */ + buf = trash.str; frame = buf+4; + ret = spoe_prepare_hadiscon_frame(appctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); if (ret > 1) - ret = send_spoe_frame(appctx, frame, ret); + ret = spoe_send_frame(appctx, buf, ret); switch (ret) { case -1: /* error */ - goto exit; - - case 0: /* ignore */ + case 0: /* ignore => an error, cannot be ignored */ goto exit; case 1: /* retry */ - si_applet_cant_put(si); goto stop; default: SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" " - disconnected by HAProxy (%d): %s\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, appctx, spoe_status_code, - spoe_frm_err_reasons[spoe_status_code]); + __FUNCTION__, appctx, + SPOE_APPCTX(appctx)->status_code, + spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code]); appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; goto next; } next: - SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + SPOE_APPCTX(appctx)->task->expire = + tick_add_ifset(now_ms, agent->timeout.idle); return 0; stop: return 1; @@ -2019,80 +2334,81 @@ handle_disconnect_spoe_applet(struct appctx *appctx) } static int -handle_disconnecting_spoe_applet(struct appctx *appctx) +spoe_handle_disconnecting_appctx(struct appctx *appctx) { struct stream_interface *si = appctx->owner; - char *frame = trash.str; - int ret, framesz = 0; + char *frame; + int ret; if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - spoe_status_code = SPOE_FRM_ERR_IO; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO; goto exit; } if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - spoe_status_code = SPOE_FRM_ERR_TOUT; + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT; goto exit; } - framesz = 0; - ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size); + frame = trash.str; trash.len = 0; + ret = spoe_recv_frame(appctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); if (ret > 1) { - framesz = ret; - ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz); + trash.len = ret + 4; + ret = spoe_handle_agentdiscon_frame(appctx, frame, ret); } switch (ret) { case -1: /* error */ - if (framesz) - bo_skip(si_oc(si), framesz+4); SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" " - error on frame (%s)\n", (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id, __FUNCTION__, appctx, - spoe_frm_err_reasons[spoe_status_code]); + spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code]); goto exit; case 0: /* ignore */ - if (framesz) - bo_skip(si_oc(si), framesz+4); goto next; case 1: /* retry */ goto stop; default: - if (framesz) - bo_skip(si_oc(si), framesz+4); SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - disconnected by peer (%d): %s\n", + " - disconnected by peer (%d): %.*s\n", (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id, - __FUNCTION__, appctx, spoe_status_code, - spoe_reason); + __FUNCTION__, appctx, SPOE_APPCTX(appctx)->status_code, + SPOE_APPCTX(appctx)->rlen, SPOE_APPCTX(appctx)->reason); goto exit; } next: + /* Do not forget to remove processed frame from the output buffer */ + if (trash.len) + bo_skip(si_oc(appctx->owner), trash.len); + return 0; stop: return 1; exit: - if (SPOE_APPCTX(appctx)->status_code == SPOE_FRM_ERR_NONE) - SPOE_APPCTX(appctx)->status_code = spoe_status_code; appctx->st0 = SPOE_APPCTX_ST_EXIT; return 0; } /* I/O Handler processing messages exchanged with the agent */ static void -handle_spoe_applet(struct appctx *appctx) +spoe_handle_appctx(struct appctx *appctx) { - struct stream_interface *si = appctx->owner; - struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent; - spoe_status_code = SPOE_FRM_ERR_NONE; + if (SPOE_APPCTX(appctx) == NULL) + return; + + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE; + agent = SPOE_APPCTX(appctx)->agent; switchstate: SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" @@ -2102,12 +2418,12 @@ handle_spoe_applet(struct appctx *appctx) switch (appctx->st0) { case SPOE_APPCTX_ST_CONNECT: - if (handle_connect_spoe_applet(appctx)) + if (spoe_handle_connect_appctx(appctx)) goto out; goto switchstate; case SPOE_APPCTX_ST_CONNECTING: - if (handle_connecting_spoe_applet(appctx)) + if (spoe_handle_connecting_appctx(appctx)) goto out; goto switchstate; @@ -2115,7 +2431,8 @@ handle_spoe_applet(struct appctx *appctx) if (stopping && LIST_ISEMPTY(&agent->sending_queue) && LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) { - SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + SPOE_APPCTX(appctx)->task->expire = + tick_add_ifset(now_ms, agent->timeout.idle); appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; goto switchstate; } @@ -2126,26 +2443,27 @@ handle_spoe_applet(struct appctx *appctx) case SPOE_APPCTX_ST_PROCESSING: case SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY: case SPOE_APPCTX_ST_WAITING_SYNC_ACK: - if (handle_processing_spoe_applet(appctx)) + if (spoe_handle_processing_appctx(appctx)) goto out; goto switchstate; case SPOE_APPCTX_ST_DISCONNECT: - if (handle_disconnect_spoe_applet(appctx)) + if (spoe_handle_disconnect_appctx(appctx)) goto out; goto switchstate; case SPOE_APPCTX_ST_DISCONNECTING: - if (handle_disconnecting_spoe_applet(appctx)) + if (spoe_handle_disconnecting_appctx(appctx)) goto out; goto switchstate; case SPOE_APPCTX_ST_EXIT: + appctx->st0 = SPOE_APPCTX_ST_END; + SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY; + si_shutw(si); si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; - appctx->st0 = SPOE_APPCTX_ST_END; - SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY; /* fall through */ case SPOE_APPCTX_ST_END: @@ -2161,14 +2479,14 @@ handle_spoe_applet(struct appctx *appctx) struct applet spoe_applet = { .obj_type = OBJ_TYPE_APPLET, .name = "", /* used for logging */ - .fct = handle_spoe_applet, - .release = release_spoe_applet, + .fct = spoe_handle_appctx, + .release = spoe_release_appctx, }; /* Create a SPOE applet. On success, the created applet is returned, else * NULL. */ static struct appctx * -create_spoe_appctx(struct spoe_config *conf) +spoe_create_appctx(struct spoe_config *conf) { struct appctx *appctx; struct session *sess; @@ -2188,7 +2506,7 @@ create_spoe_appctx(struct spoe_config *conf) goto out_free_spoe_appctx; SPOE_APPCTX(appctx)->owner = appctx; - SPOE_APPCTX(appctx)->task->process = process_spoe_applet; + SPOE_APPCTX(appctx)->task->process = spoe_process_appctx; SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY; SPOE_APPCTX(appctx)->task->context = appctx; SPOE_APPCTX(appctx)->agent = conf->agent; @@ -2200,7 +2518,7 @@ create_spoe_appctx(struct spoe_config *conf) LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list); SPOE_APPCTX(appctx)->buffer_wait.target = appctx; - SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx; + SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_appctx; LIST_INIT(&SPOE_APPCTX(appctx)->list); LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue); @@ -2249,7 +2567,7 @@ create_spoe_appctx(struct spoe_config *conf) } static int -queue_spoe_context(struct spoe_context *ctx) +spoe_queue_context(struct spoe_context *ctx) { struct spoe_config *conf = FLT_CONF(ctx->filter); struct spoe_agent *agent = conf->agent; @@ -2260,7 +2578,9 @@ queue_spoe_context(struct spoe_context *ctx) min_applets = min_applets_act(agent); /* Check if we need to create a new SPOE applet or not. */ - if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate) + if (agent->applets_act >= min_applets && + agent->applets_idle && + agent->sending_rate) goto end; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" @@ -2290,7 +2610,7 @@ queue_spoe_context(struct spoe_context *ctx) } } - appctx = create_spoe_appctx(conf); + appctx = spoe_create_appctx(conf); if (appctx == NULL) { SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - failed to create SPOE appctx\n", @@ -2323,17 +2643,18 @@ queue_spoe_context(struct spoe_context *ctx) agent->sending_rate--; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - Add stream in sending queue - applets_act=%u - applets_idle=%u" - " - sending_rate=%u\n", + " - Add stream in sending queue" + " - applets_act=%u - applets_idle=%u - sending_rate=%u\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, - ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate); + ctx->strm, agent->applets_act, agent->applets_idle, + agent->sending_rate); /* Finally try to wakeup the first IDLE applet found and move it at the * end of the list. */ list_for_each_entry(spoe_appctx, &agent->applets, list) { appctx = spoe_appctx->owner; if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { - wakeup_spoe_appctx(appctx); + spoe_wakeup_appctx(appctx); LIST_DEL(&spoe_appctx->list); LIST_ADDQ(&agent->applets, &spoe_appctx->list); break; @@ -2345,157 +2666,11 @@ queue_spoe_context(struct spoe_context *ctx) /*************************************************************************** * Functions that encode SPOE messages **************************************************************************/ -static inline int -encode_spoe_arg_string(struct spoe_context *ctx, struct sample *smp, - char *p, size_t max_size) -{ - struct chunk *chk = &smp->data.u.str; - int idx = 0; - - /* Here, we need to know if the sample has already been partially - * encoded. If yes, we only need to encode the remaining, - * reprensenting the number of bytes already encoded in previous - * frames. Else, == 0 */ - - if (!ctx->frag_ctx.curoff) { - /* First evaluation of the sample : encode the type (string or - * binary) and check its size against */ - - /* the string/binary length must not exceed 4 Gb. So 5 bytes is - * reserved to encode its size. */ - if (max_size < 6) - return 0; - - p[idx++] = (smp->data.type == SMP_T_STR) ? SPOE_DATA_T_STR : SPOE_DATA_T_BIN; - max_size -= (idx + 5); - - if (chk->len > max_size) { - /* The sample is too big, we will fragment it. - * will be updated accordingly. */ - idx += encode_frag_spoe_string(chk->str, chk->len, max_size, p+idx); - ctx->frag_ctx.curoff = max_size; - } - else { - /* No fragmentation needed, all the sample is encoded - * and remains 0 */ - idx += encode_spoe_string(chk->str, chk->len, p+idx); - } - } - else { - /* Continue the sample fragmentation, the type was already set - * in a previous frame. So just do a copy of data. */ - - idx = chk->len - ctx->frag_ctx.curoff; /* Remaining data */ - if (idx > max_size) { - /* The sample still needs to be fragmented. - * will be incremented accordingly. */ - memcpy(p, chk->str + ctx->frag_ctx.curoff, max_size); - idx = max_size; - ctx->frag_ctx.curoff += max_size; - } - else { - /* Finish the fragmentation. will be reset. */ - memcpy(p, chk->str + ctx->frag_ctx.curoff, idx); - ctx->frag_ctx.curoff = 0; - } - } - return idx; -} - -static inline int -encode_spoe_arg_method(struct spoe_context *ctx, struct sample *smp, - char *p, size_t max_size) -{ - int idx = 0; - - /* method length must not exceed 2288 bytes. So 3 bytes is reserved to - * encode its size. */ - - if (smp->data.u.meth.meth != HTTP_METH_OTHER) { - const struct http_method_name *meth = - &http_known_methods[smp->data.u.meth.meth]; - - if (meth->len + 3 > max_size) - return 0; - p[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(meth->name, meth->len, p+idx); - } - else { - struct chunk *meth = &smp->data.u.meth.str; - - if (meth->len + 3 > max_size) - return 0; - p[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(meth->str, meth->len, p+idx); - } - return idx; -} - -static inline int -encode_spoe_arg_ipv6(struct spoe_context *ctx, struct sample *smp, - char *p, size_t max_size) -{ - int idx = 0; - - if (max_size < 17) - return 0; - p[idx++] = SPOE_DATA_T_IPV6; - memcpy(p+idx, &smp->data.u.ipv6, 16); - idx += 16; - return idx; -} - - -static inline int -encode_spoe_arg_ipv4(struct spoe_context *ctx, struct sample *smp, - char *p, size_t max_size) -{ - int idx = 0; - - if (max_size < 5) - return 0; - p[idx++] = SPOE_DATA_T_IPV4; - memcpy(p+idx, &smp->data.u.ipv6, 4); - idx += 4; - return idx; -} - -static inline int -encode_spoe_arg_sint(struct spoe_context *ctx, struct sample *smp, - char *p, size_t max_size) -{ - int idx = 0; - - if (max_size < 9) - return 0; - p[idx++] = SPOE_DATA_T_INT64; - idx += encode_spoe_varint(smp->data.u.sint, p+idx); - - return idx; -} - -static inline int -encode_spoe_arg_bool(struct spoe_context *ctx, struct sample *smp, - char *p, size_t max_size) -{ - int flag, idx = 0; - - if (max_size < 1) - return 0; - flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE); - p[idx++] = (SPOE_DATA_T_BOOL | flag); - - return idx; -} - -/* Encode SPOE messages for a specific event. - * - * - * It returns 0 if During the processing, it returns - * 0 and it returns 1 when the processing is finished. If an error occurred, -1 - * is returned. */ +/* Encode SPOE messages for a specific event. Info in frag_ctx>, if any, + * are used to handle fragmented content. On success it returns 1. If an error + * occurred, -1 is returned. */ static int -encode_spoe_messages(struct stream *s, struct spoe_context *ctx, +spoe_encode_messages(struct stream *s, struct spoe_context *ctx, struct list *messages, int dir) { struct spoe_config *conf = FLT_CONF(ctx->filter); @@ -2503,13 +2678,11 @@ encode_spoe_messages(struct stream *s, struct spoe_context *ctx, struct spoe_message *msg; struct sample *smp; struct spoe_arg *arg; - char *p; - size_t max_size; - int r, idx = 0; + char *p, *end; + int ret; - max_size = agent->frame_size - FRAME_HDR_SIZE; - - p = ctx->buffer->p; + p = ctx->buffer->p; + end = p + agent->frame_size - FRAME_HDR_SIZE; /* Resume encoding of a SPOE message */ if (ctx->frag_ctx.curmsg != NULL) { @@ -2533,16 +2706,18 @@ encode_spoe_messages(struct stream *s, struct spoe_context *ctx, if (ctx->frag_ctx.curoff != UINT_MAX) goto encode_msg_payload; - /* + + . - * Implies is encoded on 2 bytes, at most (< 2288). */ - if (idx + 3 + msg->id_len + 1 > max_size) + /* Check if there is enough space for the message name and the + * number of arguments. It implies id_len> is encoded on 2 + * bytes, at most (< 2288). */ + if (p + 2 + msg->id_len + 1 > end) goto too_big; - /* Set the message name */ - idx += encode_spoe_string(msg->id, msg->id_len, p+idx); + /* Encode the message name */ + if (spoe_encode_buffer(msg->id, msg->id_len, &p, end) == -1) + goto too_big; - /* Store the number of arguments for this message */ - p[idx++] = msg->nargs; + /* Set the number of arguments for this message */ + *p++ = msg->nargs; ctx->frag_ctx.curoff = 0; encode_msg_payload: @@ -2556,81 +2731,32 @@ encode_spoe_messages(struct stream *s, struct spoe_context *ctx, if (ctx->frag_ctx.curoff != UINT_MAX) goto encode_arg_value; - /* + . - * Implies is encoded on 2 bytes, at most (< 2288). */ - if (idx + 3 + arg->name_len > max_size) - goto too_big; - /* Encode the arguement name as a string. It can by NULL */ - idx += encode_spoe_string(arg->name, arg->name_len, p+idx); + if (spoe_encode_buffer(arg->name, arg->name_len, &p, end) == -1) + goto too_big; ctx->frag_ctx.curoff = 0; encode_arg_value: - if (idx + 1 > max_size) - goto too_big; - /* Fetch the arguement value */ - smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL); - if (!smp) { - /* If no value is available, set it to NULL */ - p[idx++] = SPOE_DATA_T_NULL; - continue; - } - - /* Else, encode the arguement value */ - switch (smp->data.type) { - case SMP_T_BOOL: - if (!(r = encode_spoe_arg_bool(ctx, smp, p+idx, max_size-idx))) - goto too_big; - idx += r; - break; - - case SMP_T_SINT: - if (!(r = encode_spoe_arg_sint(ctx, smp, p+idx, max_size-idx))) - goto too_big; - idx += r; - break; - - case SMP_T_IPV4: - if (!(r = encode_spoe_arg_ipv4(ctx, smp, p+idx, max_size-idx))) - goto too_big; - idx += r; - break; - - case SMP_T_IPV6: - if (!(r = encode_spoe_arg_ipv6(ctx, smp, p+idx, max_size-idx))) - goto too_big; - idx += r; - break; - - case SMP_T_STR: - case SMP_T_BIN: - idx += encode_spoe_arg_string(ctx, smp, p+idx, max_size-idx); - if (ctx->frag_ctx.curoff) - goto too_big; - break; - - case SMP_T_METH: - if (!(r = encode_spoe_arg_method(ctx, smp, p+idx, max_size-idx))) - goto too_big; - idx += r; - break; - - default: - p[idx++] = SPOE_DATA_T_NULL; - } + smp = sample_process(s->be, s->sess, s, + dir|SMP_OPT_FINAL, arg->expr, NULL); + ret = spoe_encode_data(smp, &ctx->frag_ctx.curoff, &p, end); + if (ret == -1 || ctx->frag_ctx.curoff) + goto too_big; } } SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - encode %s messages - spoe_appctx=%p - max_size=%lu - idx=%u\n", + " - encode %s messages - spoe_appctx=%p" + "- max_size=%u - encoded=%ld\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s, ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"), - ctx->frag_ctx.spoe_appctx, max_size, idx); + ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE), + p - ctx->buffer->p); - ctx->buffer->i = idx; + ctx->buffer->i = p - ctx->buffer->p; ctx->frag_ctx.curmsg = NULL; ctx->frag_ctx.curarg = NULL; ctx->frag_ctx.curoff = 0; @@ -2643,14 +2769,15 @@ encode_spoe_messages(struct stream *s, struct spoe_context *ctx, // return -1; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - encode fragmented messages - spoe_appctx=%p - curmsg=%p - curarg=%p - curoff=%u" - " - max_size=%lu - idx=%u\n", + " - encode fragmented messages - spoe_appctx=%p" + " - curmsg=%p - curarg=%p - curoff=%u" + " - max_size=%u - encoded=%ld\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx, ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff, - max_size, idx); + (agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p); - ctx->buffer->i = idx; + ctx->buffer->i = p - ctx->buffer->p; ctx->flags |= SPOE_CTX_FL_FRAGMENTED; ctx->frag_ctx.flags &= ~SPOE_FRM_FL_FIN; return 1; @@ -2662,7 +2789,7 @@ encode_spoe_messages(struct stream *s, struct spoe_context *ctx, **************************************************************************/ /* Helper function to set a variable */ static void -set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len, +spoe_set_var(struct spoe_context *ctx, char *scope, char *name, int len, struct sample *smp) { struct spoe_config *conf = FLT_CONF(ctx->filter); @@ -2677,7 +2804,7 @@ set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len, /* Helper function to unset a variable */ static void -unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len, +spoe_unset_var(struct spoe_context *ctx, char *scope, char *name, int len, struct sample *smp) { struct spoe_config *conf = FLT_CONF(ctx->filter); @@ -2691,107 +2818,134 @@ unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len, } -/* Process SPOE actions for a specific event. During the processing, it returns - * 0 and it returns 1 when the processing is finished. If an error occurred, -1 - * is returned. */ +static inline int +spoe_decode_action_set_var(struct stream *s, struct spoe_context *ctx, + char **buf, char *end, int dir) +{ + char *str, *scope, *p = *buf; + struct sample smp; + uint64_t sz; + int ret; + + if (p + 2 >= end) + goto skip; + + /* SET-VAR requires 3 arguments */ + if (*p++ != 3) + goto skip; + + switch (*p++) { + case SPOE_SCOPE_PROC: scope = "proc"; break; + case SPOE_SCOPE_SESS: scope = "sess"; break; + case SPOE_SCOPE_TXN : scope = "txn"; break; + case SPOE_SCOPE_REQ : scope = "req"; break; + case SPOE_SCOPE_RES : scope = "res"; break; + default: goto skip; + } + + if (spoe_decode_buffer(&p, end, &str, &sz) == -1) + goto skip; + memset(&smp, 0, sizeof(smp)); + smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); + + if (spoe_decode_data(&p, end, &smp) == -1) + goto skip; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - set-var '%s.%s.%.*s'\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, + __FUNCTION__, s, scope, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx, + (int)sz, str); + + spoe_set_var(ctx, scope, str, sz, &smp); + + ret = (p - *buf); + *buf = p; + return ret; + skip: + return 0; +} + +static inline int +spoe_decode_action_unset_var(struct stream *s, struct spoe_context *ctx, + char **buf, char *end, int dir) +{ + char *str, *scope, *p = *buf; + struct sample smp; + uint64_t sz; + int ret; + + if (p + 2 >= end) + goto skip; + + /* UNSET-VAR requires 2 arguments */ + if (*p++ != 2) + goto skip; + + switch (*p++) { + case SPOE_SCOPE_PROC: scope = "proc"; break; + case SPOE_SCOPE_SESS: scope = "sess"; break; + case SPOE_SCOPE_TXN : scope = "txn"; break; + case SPOE_SCOPE_REQ : scope = "req"; break; + case SPOE_SCOPE_RES : scope = "res"; break; + default: goto skip; + } + + if (spoe_decode_buffer(&p, end, &str, &sz) == -1) + goto skip; + memset(&smp, 0, sizeof(smp)); + smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - unset-var '%s.%s.%.*s'\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, + __FUNCTION__, s, scope, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx, + (int)sz, str); + + spoe_unset_var(ctx, scope, str, sz, &smp); + + ret = (p - *buf); + *buf = p; + return ret; + skip: + return 0; +} + +/* Process SPOE actions for a specific event. It returns 1 on success. If an + * error occurred, 0 is returned. */ static int -process_spoe_actions(struct stream *s, struct spoe_context *ctx, +spoe_process_actions(struct stream *s, struct spoe_context *ctx, enum spoe_event ev, int dir) { - char *p; - size_t size; - int off, i, idx = 0; + char *p, *end; + int ret; - p = ctx->buffer->p; - size = ctx->buffer->i; + p = ctx->buffer->p; + end = p + ctx->buffer->i; - while (idx < size) { - char *str; - uint64_t sz; - struct sample smp; + while (p < end) { enum spoe_action_type type; - off = idx; - if (idx+2 > size) - goto skip; - - type = p[idx++]; + type = *p++; switch (type) { - case SPOE_ACT_T_SET_VAR: { - char *scope; - - if (p[idx++] != 3) - goto skip_action; - - switch (p[idx++]) { - case SPOE_SCOPE_PROC: scope = "proc"; break; - case SPOE_SCOPE_SESS: scope = "sess"; break; - case SPOE_SCOPE_TXN : scope = "txn"; break; - case SPOE_SCOPE_REQ : scope = "req"; break; - case SPOE_SCOPE_RES : scope = "res"; break; - default: goto skip; - } - - idx += decode_spoe_string(p+idx, p+size, &str, &sz); - if (str == NULL) + case SPOE_ACT_T_SET_VAR: + ret = spoe_decode_action_set_var(s, ctx, &p, end, dir); + if (!ret) goto skip; - memset(&smp, 0, sizeof(smp)); - smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); - - if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1) - goto skip; - idx += i; - - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - set-var '%s.%s.%.*s'\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, - __FUNCTION__, s, scope, - ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx, - (int)sz, str); - - set_spoe_var(ctx, scope, str, sz, &smp); break; - } - case SPOE_ACT_T_UNSET_VAR: { - char *scope; - - if (p[idx++] != 2) - goto skip_action; - - switch (p[idx++]) { - case SPOE_SCOPE_PROC: scope = "proc"; break; - case SPOE_SCOPE_SESS: scope = "sess"; break; - case SPOE_SCOPE_TXN : scope = "txn"; break; - case SPOE_SCOPE_REQ : scope = "req"; break; - case SPOE_SCOPE_RES : scope = "res"; break; - default: goto skip; - } - - idx += decode_spoe_string(p+idx, p+size, &str, &sz); - if (str == NULL) + case SPOE_ACT_T_UNSET_VAR: + ret = spoe_decode_action_unset_var(s, ctx, &p, end, dir); + if (!ret) goto skip; - memset(&smp, 0, sizeof(smp)); - smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); - - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - unset-var '%s.%s.%.*s'\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, - __FUNCTION__, s, scope, - ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx, - (int)sz, str); - - unset_spoe_var(ctx, scope, str, sz, &smp); break; - } default: - skip_action: - if ((i = skip_spoe_action(p+off, p+size)) == -1) - goto skip; - idx += i; + goto skip; } } @@ -2804,7 +2958,7 @@ process_spoe_actions(struct stream *s, struct spoe_context *ctx, * Functions that process SPOE events **************************************************************************/ static inline int -start_event_processing(struct spoe_context *ctx, int dir) +spoe_start_event_processing(struct spoe_context *ctx, int dir) { /* If a process is already started for this SPOE context, retry * later. */ @@ -2820,13 +2974,13 @@ start_event_processing(struct spoe_context *ctx, int dir) } static inline void -stop_event_processing(struct spoe_context *ctx) +spoe_stop_event_processing(struct spoe_context *ctx) { struct spoe_appctx *sa = ctx->frag_ctx.spoe_appctx; if (sa) { sa->frag_ctx.ctx = NULL; - wakeup_spoe_appctx(sa->owner); + spoe_wakeup_appctx(sa->owner); } /* Reset the flag to allow next processing */ @@ -2837,7 +2991,7 @@ stop_event_processing(struct spoe_context *ctx) /* Reset processing timer */ ctx->process_exp = TICK_ETERNITY; - release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait); + spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); ctx->frag_ctx.spoe_appctx = NULL; ctx->frag_ctx.curmsg = NULL; @@ -2857,7 +3011,7 @@ stop_event_processing(struct spoe_context *ctx) * returns 0 and it returns 1 when the processing is finished. If an error * occurred, -1 is returned. */ static int -process_spoe_event(struct stream *s, struct spoe_context *ctx, +spoe_process_event(struct stream *s, struct spoe_context *ctx, enum spoe_event ev) { struct spoe_config *conf = FLT_CONF(ctx->filter); @@ -2903,11 +3057,11 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire), ctx->process_exp); } - ret = start_event_processing(ctx, dir); + ret = spoe_start_event_processing(ctx, dir); if (!ret) goto out; - if (queue_spoe_context(ctx) < 0) + if (spoe_queue_context(ctx) < 0) goto error; ctx->state = SPOE_CTX_ST_ENCODING_MSGS; @@ -2915,9 +3069,9 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, } if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) { - if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait)) + if (!spoe_acquire_buffer(&ctx->buffer, &ctx->buffer_wait)) goto out; - ret = encode_spoe_messages(s, ctx, &(ctx->messages[ev]), dir); + ret = spoe_encode_messages(s, ctx, &(ctx->messages[ev]), dir); if (ret < 0) goto error; ctx->state = SPOE_CTX_ST_SENDING_MSGS; @@ -2925,7 +3079,7 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) { if (ctx->frag_ctx.spoe_appctx) - wakeup_spoe_appctx(ctx->frag_ctx.spoe_appctx->owner); + spoe_wakeup_appctx(ctx->frag_ctx.spoe_appctx->owner); ret = 0; goto out; } @@ -2936,9 +3090,8 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, } if (ctx->state == SPOE_CTX_ST_DONE) { - ret = process_spoe_actions(s, ctx, ev, dir); - if (!ret) - goto skip; + spoe_process_actions(s, ctx, ev, dir); + ret = 1; ctx->frame_id++; ctx->state = SPOE_CTX_ST_READY; goto end; @@ -2959,7 +3112,7 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, smp.data.u.sint = ctx->status_code; smp.data.type = SMP_T_BOOL; - set_spoe_var(ctx, "txn", agent->var_on_error, + spoe_set_var(ctx, "txn", agent->var_on_error, strlen(agent->var_on_error), &smp); } SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" @@ -2982,7 +3135,7 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, ret = 1; end: - stop_event_processing(ctx); + spoe_stop_event_processing(ctx); return ret; } @@ -2990,7 +3143,7 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, * Functions that create/destroy SPOE contexts **************************************************************************/ static int -acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) +spoe_acquire_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) { if (*buf != &buf_empty) return 1; @@ -3008,7 +3161,7 @@ acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) } static void -release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) +spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) { if (!LIST_ISEMPTY(&buffer_wait->list)) { LIST_DEL(&buffer_wait->list); @@ -3024,14 +3177,14 @@ release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) } static int -wakeup_spoe_context(struct spoe_context *ctx) +spoe_wakeup_context(struct spoe_context *ctx) { task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); return 1; } static struct spoe_context * -create_spoe_context(struct filter *filter) +spoe_create_context(struct filter *filter) { struct spoe_config *conf = FLT_CONF(filter); struct spoe_context *ctx; @@ -3049,7 +3202,7 @@ create_spoe_context(struct filter *filter) ctx->buffer = &buf_empty; LIST_INIT(&ctx->buffer_wait.list); ctx->buffer_wait.target = ctx; - ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context; + ctx->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_context; LIST_INIT(&ctx->list); ctx->stream_id = 0; @@ -3060,20 +3213,17 @@ create_spoe_context(struct filter *filter) } static void -destroy_spoe_context(struct spoe_context *ctx) +spoe_destroy_context(struct spoe_context *ctx) { if (!ctx) return; - if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) - LIST_DEL(&ctx->buffer_wait.list); - if (!LIST_ISEMPTY(&ctx->list)) - LIST_DEL(&ctx->list); + spoe_stop_event_processing(ctx); pool_free2(pool2_spoe_ctx, ctx); } static void -reset_spoe_context(struct spoe_context *ctx) +spoe_reset_context(struct spoe_context *ctx) { ctx->state = SPOE_CTX_ST_READY; ctx->flags &= ~SPOE_CTX_FL_PROCESS; @@ -3085,7 +3235,7 @@ reset_spoe_context(struct spoe_context *ctx) **************************************************************************/ /* Signal handler: Do a soft stop, wakeup SPOE applet */ static void -sig_stop_spoe(struct sig_handler *sh) +spoe_sig_stop(struct sig_handler *sh) { struct proxy *p; @@ -3105,7 +3255,7 @@ sig_stop_spoe(struct sig_handler *sh) agent = conf->agent; list_for_each_entry(spoe_appctx, &agent->applets, list) { - wakeup_spoe_appctx(spoe_appctx->owner); + spoe_wakeup_appctx(spoe_appctx->owner); } } p = p->next; @@ -3136,7 +3286,7 @@ spoe_init(struct proxy *px, struct flt_conf *fconf) conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES; if (!sighandler_registered) { - signal_register_fct(0, sig_stop_spoe, 0); + signal_register_fct(0, spoe_sig_stop, 0); sighandler_registered = 1; } @@ -3151,11 +3301,8 @@ spoe_deinit(struct proxy *px, struct flt_conf *fconf) if (conf) { struct spoe_agent *agent = conf->agent; - struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners, - struct listener *, by_fe); - free(l); - release_spoe_agent(agent); + spoe_release_agent(agent); free(conf); } fconf->conf = NULL; @@ -3207,7 +3354,7 @@ spoe_start(struct stream *s, struct filter *filter) (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s); - ctx = create_spoe_context(filter); + ctx = spoe_create_context(filter); if (ctx == NULL) { SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - failed to create SPOE context\n", @@ -3253,7 +3400,7 @@ spoe_stop(struct stream *s, struct filter *filter) (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_config *)FLT_CONF(filter))->agent->id, __FUNCTION__, s); - destroy_spoe_context(filter->ctx); + spoe_destroy_context(filter->ctx); } @@ -3267,7 +3414,7 @@ spoe_check_timeouts(struct stream *s, struct filter *filter) if (tick_is_expired(ctx->process_exp, now_ms)) { s->pending_events |= TASK_WOKEN_MSG; - release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait); + spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); } } @@ -3297,7 +3444,7 @@ spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) goto out; ctx->stream_id = s->uniq_id; - ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_CLIENT_SESS); if (!ret) goto out; ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED; @@ -3309,7 +3456,7 @@ spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED) goto out; - ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_SERVER_SESS); if (!ret) { channel_dont_read(chn); channel_dont_close(chn); @@ -3342,22 +3489,22 @@ spoe_chn_pre_analyze(struct stream *s, struct filter *filter, switch (an_bit) { case AN_REQ_INSPECT_FE: - ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE); break; case AN_REQ_INSPECT_BE: - ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE); break; case AN_RES_INSPECT: - ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_RSP); break; case AN_REQ_HTTP_PROCESS_FE: - ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE); break; case AN_REQ_HTTP_PROCESS_BE: - ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE); break; case AN_RES_HTTP_PROCESS_FE: - ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP); + ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_RSP); break; } @@ -3382,7 +3529,7 @@ spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn) __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags); if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) { - reset_spoe_context(ctx); + spoe_reset_context(ctx); } return 1; @@ -3531,7 +3678,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) unsigned timeout; if (!*args[1]) { - Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n", + Alert("parsing [%s:%d] : 'timeout' expects 'hello', 'idle' and 'processing'.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto out; @@ -3543,7 +3690,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) else if (!strcmp(args[1], "processing")) tv = &curagent->timeout.processing; else { - Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n", + Alert("parsing [%s:%d] : 'timeout' supports 'hello', 'idle' or 'processing' (got %s).\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto out; @@ -4044,13 +4191,13 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, conf->agent = curagent; list_for_each_entry_safe(mp, mpback, &curmps, list) { LIST_DEL(&mp->list); - release_spoe_msg_placeholder(mp); + spoe_release_msg_placeholder(mp); } list_for_each_entry_safe(msg, msgback, &curmsgs, list) { Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n", px->id, msg->id, msg->conf.file, msg->conf.line); LIST_DEL(&msg->list); - release_spoe_message(msg); + spoe_release_message(msg); } *cur_arg = pos; @@ -4060,14 +4207,14 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, return 0; error: - release_spoe_agent(curagent); + spoe_release_agent(curagent); list_for_each_entry_safe(mp, mpback, &curmps, list) { LIST_DEL(&mp->list); - release_spoe_msg_placeholder(mp); + spoe_release_msg_placeholder(mp); } list_for_each_entry_safe(msg, msgback, &curmsgs, list) { LIST_DEL(&msg->list); - release_spoe_message(msg); + spoe_release_message(msg); } free(conf); return -1;