diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c index e2459844a..7c6d50424 100644 --- a/contrib/spoa_example/spoa.c +++ b/contrib/spoa_example/spoa.c @@ -401,6 +401,42 @@ check_engine_id(struct spoe_frame *frame, char **buf, char *end) return ret; } +static int +acc_payload(struct spoe_frame *frame) +{ + struct client *client = frame->client; + char *buf; + size_t len = frame->len - frame->offset; + int ret = frame->offset; + + /* No need to accumulation payload */ + if (frame->fragmented == false) + return ret; + + buf = realloc(frame->frag_buf, frame->frag_len + len); + if (buf == NULL) { + client->status_code = SPOE_FRM_ERR_RES; + return -1; + } + memcpy(buf + frame->frag_len, frame->buf + frame->offset, len); + frame->frag_buf = buf; + frame->frag_len += len; + + if (!(frame->flags & SPOE_FRM_FL_FIN)) { + /* Wait for next parts */ + frame->buf = (char *)(frame->data); + frame->offset = 0; + frame->len = 0; + frame->flags = 0; + return 1; + } + + frame->buf = frame->frag_buf; + frame->len = frame->frag_len; + frame->offset = 0; + return ret; +} + /* Check disconnect status code. It returns -1 if an error occurred, the number * of read bytes otherwise. */ static int @@ -454,6 +490,8 @@ check_discon_message(struct spoe_frame *frame, char **buf, char *end) return ret; } + + /* Decode a HELLO frame received from HAProxy. It returns -1 if an error * occurred, otherwise the number of read bytes. HELLO frame cannot be * ignored and having another frame than a HELLO frame is an error. */ @@ -664,7 +702,7 @@ handle_hanotify(struct spoe_frame *frame) memcpy((char *)&(frame->flags), p, 4); p += 4; - /* Fragmentation is not supported for DISCONNECT frame */ + /* Fragmentation is not supported */ if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) { client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; goto error; @@ -676,44 +714,87 @@ handle_hanotify(struct spoe_frame *frame) if (spoe_decode_varint(&p, end, &frame_id) == -1) goto ignore; - if (frame->fragmented == true) { - if (frame->stream_id != (unsigned int)stream_id || - frame->frame_id != (unsigned int)frame_id) { - client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES; - goto error; - } + frame->stream_id = (unsigned int)stream_id; + frame->frame_id = (unsigned int)frame_id; - if (frame->flags & SPOE_FRM_FL_ABRT) { - DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" - " - Abort processing of a fragmented frame" - " - frag_len=%u - len=%u - offset=%ld", - client->id, frame->stream_id, frame->frame_id, - frame->frag_len, frame->len, p - frame->buf); - goto ignore; - } + DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" + " - %s frame received" + " - frag_len=%u - len=%u - offset=%ld", + client->id, frame->stream_id, frame->frame_id, + (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented", + frame->frag_len, frame->len, p - frame->buf); + + frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN); + frame->offset = (p - frame->buf); + return acc_payload(frame); + + ignore: + return 0; + + error: + return -1; +} + +/* Decode next part of a fragmented frame received from HAProxy. It returns -1 + * if an error occurred, 0 if it must be must be ignored, otherwise the number + * of read bytes. */ +static int +handle_hafrag(struct spoe_frame *frame) +{ + struct client *client = frame->client; + char *p, *end; + uint64_t stream_id, frame_id; + + p = frame->buf; + end = frame->buf + frame->len; + + /* Check frame type */ + if (*p++ != SPOE_FRM_T_UNSET) + goto ignore; + + DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id); + + /* Fragmentation is not supported */ + if (fragmentation == false) { + client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + goto error; + } + + /* Retrieve flags */ + memcpy((char *)&(frame->flags), p, 4); + p+= 4; + + /* Read the stream-id and frame-id */ + if (spoe_decode_varint(&p, end, &stream_id) == -1) + goto ignore; + if (spoe_decode_varint(&p, end, &frame_id) == -1) + goto ignore; + + if (frame->fragmented == false || + frame->stream_id != (unsigned int)stream_id || + frame->frame_id != (unsigned int)frame_id) { + client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES; + goto error; + } + + if (frame->flags & SPOE_FRM_FL_ABRT) { DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" - " - %s fragment of a fragmented frame received" + " - Abort processing of a fragmented frame" " - frag_len=%u - len=%u - offset=%ld", client->id, frame->stream_id, frame->frame_id, - (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next", frame->frag_len, frame->len, p - frame->buf); + goto ignore; } - else { - frame->stream_id = (unsigned int)stream_id; - frame->frame_id = (unsigned int)frame_id; - DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" - " - %s frame received" - " - frag_len=%u - len=%u - offset=%ld", - client->id, frame->stream_id, frame->frame_id, - (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented", - frame->frag_len, frame->len, p - frame->buf); - - frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN); - } + DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" + " - %s fragment of a fragmented frame received" + " - frag_len=%u - len=%u - offset=%ld", + client->id, frame->stream_id, frame->frame_id, + (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next", + frame->frag_len, frame->len, p - frame->buf); frame->offset = (p - frame->buf); - return frame->offset; + return acc_payload(frame); ignore: return 0; @@ -1356,7 +1437,11 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg) client->state = SPOA_ST_DISCONNECTING; goto disconnecting; } - n = handle_hanotify(frame); + if (frame->buf[0] == SPOE_FRM_T_UNSET) + n = handle_hafrag(frame); + else + n = handle_hanotify(frame); + if (n < 0) { LOG(client->worker, "Failed to decode frame: %s", spoe_frm_err_reasons[client->status_code]); @@ -1366,6 +1451,8 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg) LOG(client->worker, "Ignore invalid/unknown/aborted frame"); goto ignore_frame; } + else if (n == 1) + goto noop; else goto process_frame; @@ -1382,39 +1469,14 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg) goto disconnect; } + noop: + return; + ignore_frame: reset_frame(frame); return; process_frame: - if (frame->fragmented == true) { - char *buf; - size_t len = frame->len - frame->offset; - - buf = realloc(frame->frag_buf, frame->frag_len + len); - if (buf == NULL) { - client->status_code = SPOE_FRM_ERR_RES; - goto disconnect; - } - memcpy(buf + frame->frag_len, frame->buf + frame->offset, len); - frame->frag_buf = buf; - frame->frag_len += len; - - if (!(frame->flags & SPOE_FRM_FL_FIN)) { - /* Wait for next fragments */ - frame->buf = (char *)(frame->data); - frame->offset = 0; - frame->len = 0; - frame->flags = 0; - return; - } - - frame->buf = frame->frag_buf; - frame->len = frame->frag_len; - frame->offset = 0; - /* fall through */ - } - process_incoming_frame(frame); client->incoming_frame = NULL; return; diff --git a/include/types/spoe.h b/include/types/spoe.h index a6b0ffc27..b65bc7a91 100644 --- a/include/types/spoe.h +++ b/include/types/spoe.h @@ -294,6 +294,8 @@ struct spoe_appctx { /* Frame Types sent by HAProxy and by agents */ enum spoe_frame_type { + SPOE_FRM_T_UNSET = 0, + /* Frames sent by HAProxy */ SPOE_FRM_T_HAPROXY_HELLO = 1, SPOE_FRM_T_HAPROXY_DISCON, diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 8156287c6..ddfb67ba2 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -495,26 +495,16 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, p = frame; end = frame+size; - /* is null when the stream has aborted the processing of a - * fragmented frame. In this case, we must notify the corresponding - * agent using ids stored in . */ - if (ctx == NULL) { - flags |= SPOE_FRM_FL_ABRT; - stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid; - frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid; - } - else { - stream_id = ctx->stream_id; - frame_id = ctx->frame_id; + stream_id = ctx->stream_id; + frame_id = ctx->frame_id; - if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) { - /* The fragmentation is not supported by the applet */ - if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) { - SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; - return -1; - } - flags = ctx->frag_ctx.flags; + if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) { + /* The fragmentation is not supported by the applet */ + if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) { + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; + return -1; } + flags = ctx->frag_ctx.flags; } /* Set Frame type */ @@ -531,10 +521,10 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, goto too_big; /* Copy encoded messages, if possible */ - sz = SPOE_APPCTX(appctx)->buffer->i; + sz = ctx->buffer->i; if (p + sz >= end) goto too_big; - memcpy(p, SPOE_APPCTX(appctx)->buffer->p, sz); + memcpy(p, ctx->buffer->p, sz); p += sz; return (p - frame); @@ -544,6 +534,66 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, return 0; } +/* Encode next part of a fragmented frame sent by HAProxy to an agent. It + * returns the number of encoded bytes in the frame on success, 0 if an encoding + * error occurred and -1 if a fatal error occurred. */ +static int +spoe_prepare_hafrag_frame(struct appctx *appctx, struct spoe_context *ctx, + char *frame, size_t size) +{ + char *p, *end; + unsigned int stream_id, frame_id; + unsigned int flags; + size_t sz; + + p = frame; + end = frame+size; + + /* is null when the stream has aborted the processing of a + * fragmented frame. In this case, we must notify the corresponding + * agent using ids stored in . */ + if (ctx == NULL) { + flags = (SPOE_FRM_FL_FIN|SPOE_FRM_FL_ABRT); + stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid; + frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid; + } + else { + flags = ctx->frag_ctx.flags; + stream_id = ctx->stream_id; + frame_id = ctx->frame_id; + } + + /* Set Frame type */ + *p++ = SPOE_FRM_T_UNSET; + + /* Set flags */ + memcpy(p, (char *)&flags, 4); + p += 4; + + /* Set stream-id and frame-id */ + if (spoe_encode_varint(stream_id, &p, end) == -1) + goto too_big; + if (spoe_encode_varint(frame_id, &p, end) == -1) + goto too_big; + + if (ctx == NULL) + goto end; + + /* Copy encoded messages, if possible */ + sz = ctx->buffer->i; + if (p + sz >= end) + goto too_big; + memcpy(p, ctx->buffer->p, sz); + p += sz; + + end: + return (p - frame); + + too_big: + SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG; + return 0; +} + /* Decode and process the HELLO frame sent by an agent. It returns the number of * read bytes on success, 0 if a decoding error occurred, and -1 if a fatal * error occurred. */ @@ -1150,12 +1200,13 @@ spoe_release_appctx(struct appctx *appctx) if (appctx->st0 == SPOE_APPCTX_ST_IDLE) agent->applets_idle--; - si_shutw(si); - si_shutr(si); - si_ic(si)->flags |= CF_READ_NULL; appctx->st0 = SPOE_APPCTX_ST_END; if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE) spoe_appctx->status_code = SPOE_FRM_ERR_IO; + + si_shutw(si); + si_shutr(si); + si_ic(si)->flags |= CF_READ_NULL; } /* Destroy the task attached to this applet */ @@ -1351,19 +1402,36 @@ spoe_handle_connecting_appctx(struct appctx *appctx) return 0; } + static int -spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx, - int *skip) +spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) { - struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; + struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; + struct spoe_context *ctx = NULL; char *frame, *buf; int ret; /* 4 bytes are reserved at the beginning of to store the frame * length. */ buf = trash.str; frame = buf+4; - ret = spoe_prepare_hanotify_frame(appctx, ctx, frame, - SPOE_APPCTX(appctx)->max_frame_size); + + if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) { + ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx; + ret = spoe_prepare_hafrag_frame(appctx, ctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); + } + else if (LIST_ISEMPTY(&agent->sending_queue)) { + *skip = 1; + ret = 1; + goto end; + } + else { + ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); + ret = spoe_prepare_hanotify_frame(appctx, ctx, frame, + SPOE_APPCTX(appctx)->max_frame_size); + + } + if (ret > 1) ret = spoe_send_frame(appctx, buf, ret); @@ -1376,6 +1444,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx if (ctx == NULL) goto abort_frag_frame; + spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); ctx->state = SPOE_CTX_ST_ERROR; @@ -1391,6 +1460,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx if (ctx == NULL) goto abort_frag_frame; + spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) || @@ -1506,7 +1576,6 @@ spoe_handle_processing_appctx(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; - struct spoe_context *ctx = NULL; unsigned int fpa = 0; int ret, skip_sending = 0, skip_receiving = 0; @@ -1531,39 +1600,21 @@ spoe_handle_processing_appctx(struct appctx *appctx) skip_sending, skip_receiving, spoe_appctx_state_str[appctx->st0]); - if (fpa > agent->max_fpa || (skip_sending && skip_receiving)) + if (fpa > agent->max_fpa) goto stop; - else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) { + else if (skip_sending || appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) { if (skip_receiving) goto stop; goto recv_frame; } - else if (skip_sending) - goto recv_frame; - else if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) { - ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx; - goto send_frame; - } - else if (LIST_ISEMPTY(&agent->sending_queue)) { - skip_sending = 1; - goto recv_frame; - } - ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); - send_frame: - /* Transfer the buffer ownership to the SPOE appctx */ - if (ctx) { - SPOE_APPCTX(appctx)->buffer = ctx->buffer; - ctx->buffer = &buf_empty; - } - ret = spoe_handle_sending_frame_appctx(appctx, ctx, &skip_sending); + /* send_frame */ + ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending); switch (ret) { case -1: /* error */ goto next; case 0: /* ignore */ - spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer, - &SPOE_APPCTX(appctx)->buffer_wait); agent->sending_rate++; fpa++; break; @@ -1572,8 +1623,6 @@ spoe_handle_processing_appctx(struct appctx *appctx) break; default: - spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer, - &SPOE_APPCTX(appctx)->buffer_wait); agent->sending_rate++; fpa++; break; @@ -2571,7 +2620,7 @@ static void spoe_reset_context(struct spoe_context *ctx) { ctx->state = SPOE_CTX_ST_READY; - ctx->flags &= ~SPOE_CTX_FL_PROCESS; + ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED); }