diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c index 5c3a4538e..8e234b58f 100644 --- a/contrib/spoa_example/spoa.c +++ b/contrib/spoa_example/spoa.c @@ -98,6 +98,9 @@ enum spoe_frame_error { SPOE_FRM_ERR_NO_CAP, SPOE_FRM_ERR_BAD_VSN, SPOE_FRM_ERR_BAD_FRAME_SIZE, + SPOE_FRM_ERR_FRAG_NOT_SUPPORTED, + SPOE_FRM_ERR_INTERLACED_FRAMES, + SPOE_FRM_ERR_RES, SPOE_FRM_ERR_UNKNOWN = 99, SPOE_FRM_ERRS, }; @@ -131,6 +134,10 @@ enum spoa_frame_type { SPOA_FRM_T_AGENT, }; +/* Flags set on the SPOE frame */ +#define SPOE_FRM_FL_FIN 0x00000001 +#define SPOE_FRM_FL_ABRT 0x00000002 + /* Masks to get data type or flags value */ #define SPOE_DATA_T_MASK 0x0F #define SPOE_DATA_FL_MASK 0xF0 @@ -157,8 +164,10 @@ struct spoe_frame { unsigned int stream_id; unsigned int frame_id; - bool hcheck; /* true is the CONNECT frame is a healthcheck */ - int ip_score; /* -1 if unset, else between 0 and 100 */ + unsigned int flags; + bool hcheck; /* true is the CONNECT frame is a healthcheck */ + bool fragmented; /* true if the frame is fragmented */ + int ip_score; /* -1 if unset, else between 0 and 100 */ struct event process_frame_event; struct worker *worker; @@ -166,6 +175,9 @@ struct spoe_frame { struct client *client; struct list list; + char *frag_buf; /* used to accumulate payload of a fragmented frame */ + unsigned int frag_len; + char data[0]; }; @@ -190,6 +202,7 @@ struct client { struct spoe_engine *engine; bool pipelining; bool async; + bool fragmentation; struct worker *worker; struct list by_worker; @@ -244,20 +257,24 @@ struct timeval processing_delay = {0, 0}; static bool debug = false; static bool pipelining = false; static bool async = false; +static bool fragmentation = false; static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = { - [SPOE_FRM_ERR_NONE] = "normal", - [SPOE_FRM_ERR_IO] = "I/O error", - [SPOE_FRM_ERR_TOUT] = "a timeout occurred", - [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", - [SPOE_FRM_ERR_INVALID] = "invalid frame received", - [SPOE_FRM_ERR_NO_VSN] = "version value not found", - [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", - [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", - [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", - [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", - [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", + [SPOE_FRM_ERR_NONE] = "normal", + [SPOE_FRM_ERR_IO] = "I/O error", + [SPOE_FRM_ERR_TOUT] = "a timeout occurred", + [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", + [SPOE_FRM_ERR_INVALID] = "invalid frame received", + [SPOE_FRM_ERR_NO_VSN] = "version value not found", + [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", + [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", + [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", + [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", + [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported", + [SPOE_FRM_ERR_INTERLACED_FRAMES] = "invalid interlaced frames", + [SPOE_FRM_ERR_RES] = "resource allocation error", + [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", }; static void signal_cb(evutil_socket_t, short, void *); @@ -640,6 +657,15 @@ check_capabilities(struct spoe_frame *frame, int idx) client->async = true; } } + else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) { + i += 5; + if (sz == i || isspace(str[i]) || str[i] == ',') { + DEBUG(frame->worker, + "<%lu> HAProxy supports fragmented frame", + client->id); + client->fragmentation = true; + } + } if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL) break; @@ -740,9 +766,16 @@ handle_hahello(struct spoe_frame *frame) DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id); - /* Skip flags */ + /* Retrieve flags */ + memcpy((char *)&(frame->flags), buf+idx, 4); idx += 4; + /* Fragmentation is not supported for HELLO frame */ + if (!(frame->flags & SPOE_FRM_FL_FIN)) { + client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + goto error; + } + /* stream-id and frame-id must be cleared */ if (buf[idx] != 0 || buf[idx+1] != 0) { client->status_code = SPOE_FRM_ERR_INVALID; @@ -846,9 +879,16 @@ handle_hadiscon(struct spoe_frame *frame) DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id); - /* Skip flags */ + /* Retrieve flags */ + memcpy((char *)&(frame->flags), buf+idx, 4); idx += 4; + /* Fragmentation is not supported for DISCONNECT frame */ + if (!(frame->flags & SPOE_FRM_FL_FIN)) { + client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + goto error; + } + /* stream-id and frame-id must be cleared */ if (buf[idx] != 0 || buf[idx+1] != 0) { client->status_code = SPOE_FRM_ERR_INVALID; @@ -906,8 +946,8 @@ handle_hadiscon(struct spoe_frame *frame) } /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error - * occurred or if the frame must be ignored, 0 if the frame must be ack without - * any processing, otherwise the number of read bytes (always > 0). */ + * occurred, 0 if it must be must be ignored, otherwise the number of read + * bytes. */ static int handle_hanotify(struct spoe_frame *frame) { @@ -923,9 +963,16 @@ handle_hanotify(struct spoe_frame *frame) DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id); - /* Skip flags */ + /* Retrieve flags */ + memcpy((char *)&(frame->flags), buf+idx, 4); idx += 4; + /* Fragmentation is not supported for DISCONNECT frame */ + if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) { + client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + goto error; + } + /* Read the stream-id */ if ((i = decode_spoe_varint(buf+idx, end, &stream_id)) == -1) goto ignore; @@ -936,20 +983,49 @@ handle_hanotify(struct spoe_frame *frame) goto ignore; idx += i; - frame->stream_id = (unsigned int)stream_id; - frame->frame_id = (unsigned int)frame_id; + if (frame->fragmented == true) { + if (frame->stream_id != (unsigned int)stream_id || + frame->frame_id != (unsigned int)frame_id) { + client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES; + goto error; + } - DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u", - client->id, frame->stream_id, frame->frame_id); + if (frame->flags & SPOE_FRM_FL_ABRT) { + DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" + " - Abort processing of a fragmented frame" + " - frag_len=%u - len=%u - offset=%u", + client->id, frame->stream_id, frame->frame_id, + frame->frag_len, frame->len, idx); + goto ignore; + } + DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" + " - %s fragment of a fragmented frame received" + " - frag_len=%u - len=%u - offset=%u", + client->id, frame->stream_id, frame->frame_id, + (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next", + frame->frag_len, frame->len, idx); + } + else { + frame->stream_id = (unsigned int)stream_id; + frame->frame_id = (unsigned int)frame_id; - if (buf + idx == end) { - return 0; + DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" + " - %s frame received" + " - frag_len=%u - len=%u - offset=%u", + client->id, frame->stream_id, frame->frame_id, + (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented", + frame->frag_len, frame->len, idx); + + frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN); } frame->offset = idx; return idx; ignore: + return 0; + + error: return -1; } @@ -960,7 +1036,9 @@ prepare_agenthello(struct spoe_frame *frame) { struct client *client = frame->client; char *buf = frame->buf; - int idx = 0; + char capabilities[64]; + int n, idx = 0; + unsigned int flags = SPOE_FRM_FL_FIN; DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id); frame->type = SPOA_FRM_T_AGENT; @@ -968,8 +1046,8 @@ prepare_agenthello(struct spoe_frame *frame) /* Frame Type */ buf[idx++] = SPOE_FRM_T_AGENT_HELLO; - /* No flags for now */ - memset(buf+idx, 0, 4); /* No flags */ + /* Set flags */ + memcpy(buf+idx, (char *)&flags, 4); idx += 4; /* No stream-id and frame-id for HELLO frames */ @@ -994,18 +1072,41 @@ prepare_agenthello(struct spoe_frame *frame) /* "capabilities" K/V item */ idx += encode_spoe_string("capabilities", 12, buf+idx); buf[idx++] = SPOE_DATA_T_STR; - if (client->pipelining == true && client->async == true) - idx += encode_spoe_string("pipelining,async", 16, buf+idx); - else if (client->pipelining == true) - idx += encode_spoe_string("pipelining", 10, buf+idx); - else if (client->async == true) - idx += encode_spoe_string("async", 5, buf+idx); + + memset(capabilities, 0, sizeof(capabilities)); + n = 0; + + /* 1. Fragmentation capability ? */ + if (fragmentation == true) { + memcpy(capabilities, "fragmentation", 13); + n += 13; + } + /* 2. Pipelining capability ? */ + if (client->pipelining == true && n != 0) { + memcpy(capabilities + n, ", pipelining", 12); + n += 12; + } + else if (client->pipelining == true) { + memcpy(capabilities, "pipelining", 10); + n += 10; + } + /* 3. Async capability ? */ + if (client->async == true && n != 0) { + memcpy(capabilities + n, ", async", 7); + n += 7; + } + else if (client->async == true) { + memcpy(capabilities, "async", 5); + n += 5; + } + /* 4. Encode capabilities string */ + if (n != 0) + idx += encode_spoe_string(capabilities, n, buf+idx); else idx += encode_spoe_string(NULL, 0, buf+idx); - DEBUG(frame->worker, "<%lu> Agent capabilities : %s %s", - client->id, (client->pipelining?"pipelining":""), - (client->async?"async":"")); + DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s", + client->id, n, capabilities); frame->len = idx; return idx; @@ -1020,6 +1121,7 @@ prepare_agentdicon(struct spoe_frame *frame) char *buf = frame->buf; const char *reason; int rlen, idx = 0; + unsigned int flags = SPOE_FRM_FL_FIN; DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id); frame->type = SPOA_FRM_T_AGENT; @@ -1032,8 +1134,8 @@ prepare_agentdicon(struct spoe_frame *frame) /* Frame type */ buf[idx++] = SPOE_FRM_T_AGENT_DISCON; - /* No flags for now */ - memset(buf+idx, 0, 4); + /* Set flags */ + memcpy(buf+idx, (char *)&flags, 4); idx += 4; /* No stream-id and frame-id for DISCONNECT frames */ @@ -1065,8 +1167,9 @@ prepare_agentdicon(struct spoe_frame *frame) static int prepare_agentack(struct spoe_frame *frame) { - char *buf = frame->buf; - int idx = 0; + char *buf = frame->buf; + int idx = 0; + unsigned int flags = SPOE_FRM_FL_FIN; /* Be careful here, in async mode, frame->client can be NULL */ @@ -1076,8 +1179,8 @@ prepare_agentack(struct spoe_frame *frame) /* Frame type */ buf[idx++] = SPOE_FRM_T_AGENT_ACK; - /* No flags for now */ - memset(buf+idx, 0, 4); /* No flags */ + /* Set flags */ + memcpy(buf+idx, (char *)&flags, 4); idx += 4; /* Set stream-id and frame-id for ACK frames */ @@ -1140,6 +1243,8 @@ release_frame(struct spoe_frame *frame) worker = frame->worker; LIST_DEL(&frame->list); + if (frame->frag_buf) + free(frame->frag_buf); memset(frame, 0, sizeof(*frame)+max_frame_size+4); LIST_ADDQ(&worker->frames, &frame->list); } @@ -1186,14 +1291,21 @@ reset_frame(struct spoe_frame *frame) if (frame == NULL) return; - frame->type = SPOA_FRM_T_UNKNOWN; - frame->buf = (char *)(frame->data); - frame->offset = 0; - frame->len = 0; - frame->stream_id = 0; - frame->frame_id = 0; - frame->hcheck = false; - frame->ip_score = -1; + if (frame->frag_buf) + free(frame->frag_buf); + + frame->type = SPOA_FRM_T_UNKNOWN; + frame->buf = (char *)(frame->data); + frame->offset = 0; + frame->len = 0; + frame->stream_id = 0; + frame->frame_id = 0; + frame->flags = 0; + frame->hcheck = false; + frame->fragmented = false; + frame->ip_score = -1; + frame->frag_buf = NULL; + frame->frag_len = 0; LIST_INIT(&frame->list); } @@ -1414,8 +1526,8 @@ process_frame_cb(evutil_socket_t fd, short events, void *arg) int idx = frame->offset; DEBUG(frame->worker, - "Process frame messages : STREAM-ID=%u - FRAME-ID=%u", - frame->stream_id, frame->frame_id); + "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes", + frame->stream_id, frame->frame_id, frame->len - frame->offset); /* Loop on messages */ while (buf+idx < end) { @@ -1471,7 +1583,10 @@ process_frame_cb(evutil_socket_t fd, short events, void *arg) stop_processing: /* Prepare agent ACK frame */ + frame->buf = (char *)(frame->data) + 4; frame->offset = 0; + frame->len = 0; + frame->flags = 0; idx = prepare_agentack(frame); if (frame->ip_score != -1) { @@ -1547,20 +1662,19 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg) goto write_frame; case SPOA_ST_PROCESSING: - n = handle_hanotify(frame); - if (n < 0 && frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) { + if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) { client->state = SPOA_ST_DISCONNECTING; goto disconnecting; } + n = handle_hanotify(frame); if (n < 0) { - LOG(client->worker, "Ignore invalid or unknown frame"); - goto ignore_frame; + LOG(client->worker, "Failed to decode frame: %s", + spoe_frm_err_reasons[client->status_code]); + goto disconnect; } if (n == 0) { - DEBUG(client->worker, "<%lu> No message found, ack it now", - client->id); - prepare_agentack(frame); - goto write_frame; + LOG(client->worker, "Ignore invalid/unknown/aborted frame"); + goto ignore_frame; } else goto process_frame; @@ -1583,6 +1697,34 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg) return; process_frame: + if (frame->fragmented == true) { + char *buf; + size_t len = frame->len - frame->offset; + + buf = realloc(frame->frag_buf, frame->frag_len + len); + if (buf == NULL) { + client->status_code = SPOE_FRM_ERR_RES; + goto disconnect; + } + memcpy(buf + frame->frag_len, frame->buf + frame->offset, len); + frame->frag_buf = buf; + frame->frag_len += len; + + if (!(frame->flags & SPOE_FRM_FL_FIN)) { + /* Wait for next fragments */ + frame->buf = (char *)(frame->data); + frame->offset = 0; + frame->len = 0; + frame->flags = 0; + return; + } + + frame->buf = frame->frag_buf; + frame->len = frame->frag_len; + frame->offset = 0; + /* fall through */ + } + process_incoming_frame(frame); client->incoming_frame = NULL; return; @@ -1829,7 +1971,7 @@ usage(char *prog) " but can be in any other unit if the number is suffixed\n" " by a unit (us, ms, s)\n" "\n" - " Supported capabilities: pipelining, async\n", + " Supported capabilities: fragmentation, pipelining, async\n", prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS); } @@ -1863,6 +2005,8 @@ main(int argc, char **argv) pipelining = true; else if (!strcmp(optarg, "async")) async = true; + else if (!strcmp(optarg, "fragmentation")) + fragmentation = true; else fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg); break; @@ -1964,8 +2108,8 @@ main(int argc, char **argv) DEBUG(&null_worker, "Server is ready" - " [pipelining=%s - async=%s - debug=%s - max-frame-size=%u]", - (pipelining?"true":"false"), (async?"true":"false"), + " [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]", + (fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"), (debug?"true":"false"), max_frame_size); event_base_dispatch(base);