diff --git a/include/proto/filters.h b/include/proto/filters.h index d86042456..2e2157784 100644 --- a/include/proto/filters.h +++ b/include/proto/filters.h @@ -43,6 +43,9 @@ int flt_check(struct proxy *p); int flt_stream_start(struct stream *s); void flt_stream_stop(struct stream *s); +int flt_set_stream_backend(struct stream *s, struct proxy *be); +int flt_stream_init(struct stream *s); +void flt_stream_release(struct stream *s, int only_backend); int flt_http_headers(struct stream *s, struct http_msg *msg); int flt_http_start_chunk(struct stream *s, struct http_msg *msg); diff --git a/include/proto/flt_http_comp.h b/include/proto/flt_http_comp.h index 587db0d09..877cc9d4d 100644 --- a/include/proto/flt_http_comp.h +++ b/include/proto/flt_http_comp.h @@ -21,18 +21,9 @@ #ifndef _PROTO_FLT_HTTP_COMP_H #define _PROTO_FLT_HTTP_COMP_H -/* NOTE: This is a temporary header file. It will be removed when the - * compression filter will added */ +#include -#include -#include - -int select_compression_request_header(struct stream *s, struct buffer *req); -int select_compression_response_header(struct stream *s, struct buffer *res); - -int http_compression_buffer_init(struct stream *s, struct buffer *in, struct buffer *out); -int http_compression_buffer_add_data(struct stream *s, struct buffer *in, struct buffer *out); -int http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer **out, int end); +int check_legacy_http_comp_flt(struct proxy *proxy); -#endif /* _PROTO_FLT_HTTP_COMP_H */ +#endif // _PROTO_FLT_HTTP_COMP_H diff --git a/include/types/compression.h b/include/types/compression.h index b79d7704b..9a0cc7897 100644 --- a/include/types/compression.h +++ b/include/types/compression.h @@ -32,6 +32,8 @@ #include #endif +#include + struct comp { struct comp_algo *algos; struct comp_type *types; diff --git a/include/types/stream.h b/include/types/stream.h index 292e36a4e..d6e05e4f7 100644 --- a/include/types/stream.h +++ b/include/types/stream.h @@ -32,7 +32,6 @@ #include #include -#include #include #include #include @@ -90,8 +89,7 @@ #define SF_IGNORE_PRST 0x00080000 /* ignore persistence */ -#define SF_COMP_READY 0x00100000 /* the compression is initialized */ -#define SF_SRV_REUSED 0x00200000 /* the server-side connection was reused */ +#define SF_SRV_REUSED 0x00100000 /* the server-side connection was reused */ /* some external definitions */ struct strm_logs { @@ -158,8 +156,7 @@ struct stream { void (*do_log)(struct stream *s); /* the function to call in order to log (or NULL) */ void (*srv_error)(struct stream *s, /* the function to call upon unrecoverable server errors (or NULL) */ struct stream_interface *si); - struct comp_ctx *comp_ctx; /* HTTP compression context */ - struct comp_algo *comp_algo; /* HTTP compression algorithm if not NULL */ + char *unique_id; /* custom unique ID */ /* These two pointers are used to resume the execution of the rule lists. */ diff --git a/src/filters.c b/src/filters.c index b4af33be7..974c742f4 100644 --- a/src/filters.c +++ b/src/filters.c @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -259,6 +260,7 @@ flt_check(struct proxy *proxy) if (filter->ops->check) err += filter->ops->check(proxy, filter); } + err += check_legacy_http_comp_flt(proxy); return err; } @@ -279,6 +281,60 @@ flt_deinit(struct proxy *proxy) } } +/* Attaches a filter to a stream. Returns -1 if an error occurs, 0 otherwise. */ +static int +flt_stream_add_filter(struct stream *s, struct filter *filter, + int is_backend) +{ + struct filter *f = pool_alloc2(pool2_filter); + if (!f) /* not enough memory */ + return -1; + memset(f, 0, sizeof(*f)); + f->id = filter->id; + f->ops = filter->ops; + f->conf = filter->conf; + f->is_backend_filter = is_backend; + LIST_ADDQ(&s->strm_flt.filters, &f->list); + return 0; +} + +/* + * Called when a stream is created. It attaches all frontend filters to the + * stream. Returns -1 if an error occurs, 0 otherwise. + */ +int +flt_stream_init(struct stream *s) +{ + struct filter *filter; + + LIST_INIT(&s->strm_flt.filters); + memset(s->strm_flt.current, 0, sizeof(s->strm_flt.current)); + list_for_each_entry(filter, &strm_fe(s)->filters, list) { + if (flt_stream_add_filter(s, filter, 0) < 0) + return -1; + } + return 0; +} + +/* + * Called when a stream is closed or when analyze ends (For an HTTP stream, this + * happens after each request/response exchange). When analyze ends, backend + * filters are removed. When the stream is closed, all filters attached to the + * stream are removed. + */ +void +flt_stream_release(struct stream *s, int only_backend) +{ + struct filter *filter, *back; + + list_for_each_entry_safe(filter, back, &s->strm_flt.filters, list) { + if (!only_backend || filter->is_backend_filter) { + LIST_DEL(&filter->list); + pool_free2(pool2_filter, filter); + } + } +} + /* * Calls 'stream_start' for all filters attached to a stream. This happens when * the stream is created, just after calling flt_stream_init @@ -311,6 +367,26 @@ flt_stream_stop(struct stream *s) } } +/* + * Called when a backend is set for a stream. If the frontend and the backend + * are the same, this function does nothing. Else it attaches all backend + * filters to the stream. Returns -1 if an error occurs, 0 otherwise. + */ +int +flt_set_stream_backend(struct stream *s, struct proxy *be) +{ + struct filter *filter; + + if (strm_fe(s) == be) + return 0; + + list_for_each_entry(filter, &be->filters, list) { + if (flt_stream_add_filter(s, filter, 1) < 0) + return -1; + } + return 0; +} + int flt_http_headers(struct stream *s, struct http_msg *msg) { @@ -691,8 +767,6 @@ flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit) /* Check if 'channel_end_analyze' callback has been called for the * request and the response. */ if (!(s->req.analysers & AN_FLT_END) && !(s->res.analysers & AN_FLT_END)) { - struct filter *filter, *back; - /* When we are waiting for a new request, so we must reset * stream analyzers. The input must not be closed the request * channel, else it is useless to wait. */ @@ -701,12 +775,8 @@ flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit) s->res.analysers = 0; } - list_for_each_entry_safe(filter, back, &s->strm_flt.filters, list) { - if (filter->is_backend_filter) { - LIST_DEL(&filter->list); - pool_free2(pool2_filter, filter); - } - } + /* Remove backend filters from the list */ + flt_stream_release(s, 1); } else if (ret) { /* Analyzer ends only for one channel. So wake up the stream to diff --git a/src/flt_http_comp.c b/src/flt_http_comp.c index 5eaf0c9ee..ddc607d4f 100644 --- a/src/flt_http_comp.c +++ b/src/flt_http_comp.c @@ -22,21 +22,406 @@ #include #include +#include #include #include #include #include +static const char *http_comp_flt_id = "compression filter"; + +struct flt_ops comp_ops; + +static struct buffer *tmpbuf = &buf_empty; + +struct comp_chunk { + unsigned int start; /* start of the chunk relative to FLT_FWD offset */ + unsigned int end; /* end of the chunk relative to FLT_FWD offset */ + int skip; /* if set to 1, the chunk is skipped. Otherwise it is compressed */ + int is_last; /* if set, this is the last chunk. Data after this + * chunk will be forwarded as it is. */ + struct list list; +}; + +struct comp_state { + struct comp_ctx *comp_ctx; /* compression context */ + struct comp_algo *comp_algo; /* compression algorithm if not NULL */ + struct list comp_chunks; /* data chunks that should be compressed or skipped */ + unsigned int first; /* offset of the first chunk. Data before + * this offset will be forwarded as it + * is. */ +}; + +static int add_comp_chunk(struct comp_state *st, unsigned int start, + unsigned int len, int skip, int is_last); +static int skip_input_data(struct filter *filter, struct http_msg *msg, + unsigned int consumed); + +static int select_compression_request_header(struct comp_state *st, + struct stream *s, + struct http_msg *msg); +static int select_compression_response_header(struct comp_state *st, + struct stream *s, + struct http_msg *msg); + +static int http_compression_buffer_init(struct buffer *in, struct buffer *out); +static int http_compression_buffer_add_data(struct comp_state *st, + struct buffer *in, + struct buffer *out, int sz); +static int http_compression_buffer_end(struct comp_state *st, struct stream *s, + struct buffer **in, struct buffer **out, + unsigned int consumed, int end); + +/***********************************************************************/ +static int +comp_flt_init(struct proxy *px, struct filter *filter) +{ + + /* We need a compression buffer in the DATA state to put the output of + * compressed data, and in CRLF state to let the TRAILERS state finish + * the job of removing the trailing CRLF. + */ + if (!tmpbuf->size) { + if (b_alloc(&tmpbuf) == NULL) + return -1; + } + return 0; +} + +static void +comp_flt_deinit(struct proxy *px, struct filter *filter) +{ + if (tmpbuf->size) + b_free(&tmpbuf); +} + +static int +comp_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) +{ + if (filter->ctx == NULL) { + struct comp_state *st; + + if (!(st = malloc(sizeof(*st)))) + return -1; + + LIST_INIT(&st->comp_chunks); + st->comp_algo = NULL; + st->comp_ctx = NULL; + st->first = 0; + filter->ctx = st; + } + return 1; +} + +static int +comp_analyze(struct stream *s, struct filter *filter, struct channel *chn, + unsigned int an_bit) +{ + struct comp_state *st = filter->ctx; + + if (!strm_fe(s)->comp && !s->be->comp) + goto end; + + switch (an_bit) { + case AN_RES_HTTP_PROCESS_BE: + select_compression_response_header(st, s, &s->txn->rsp); + break; + } + end: + return 1; +} + +static int +comp_end_analyze(struct stream *s, struct filter *filter, struct channel *chn) +{ + struct comp_state *st = filter->ctx; + struct comp_chunk *cc, *back; + + if (!st || !(chn->flags & CF_ISRESP)) + goto end; + + list_for_each_entry_safe(cc, back, &st->comp_chunks, list) { + LIST_DEL(&cc->list); + free(cc); + } + + if (!st->comp_algo || !s->txn->status) + goto release_ctx; + + if (strm_fe(s)->mode == PR_MODE_HTTP) + strm_fe(s)->fe_counters.p.http.comp_rsp++; + if ((s->flags & SF_BE_ASSIGNED) && (s->be->mode == PR_MODE_HTTP)) + s->be->be_counters.p.http.comp_rsp++; + + /* release any possible compression context */ + st->comp_algo->end(&st->comp_ctx); + + release_ctx: + free(st); + filter->ctx = NULL; + end: + return 1; +} + +static int +comp_http_headers(struct stream *s, struct filter *filter, + struct http_msg *msg) +{ + struct comp_state *st = filter->ctx; + + if (strm_fe(s)->comp || s->be->comp) { + if (!(msg->chn->flags & CF_ISRESP)) + select_compression_request_header(st, s, msg); + } + return 1; +} + +static int +comp_skip_http_chunk_envelope(struct stream *s, struct filter *filter, + struct http_msg *msg) +{ + struct comp_state *st = filter->ctx; + unsigned int start; + int ret; + + if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) { + flt_set_forward_data(filter, msg->chn); + return 1; + } + + start = FLT_NXT(filter, msg->chn) - FLT_FWD(filter, msg->chn); + /* If this is the last chunk, we flag it */ + if (msg->chunk_len == 0 && msg->msg_state == HTTP_MSG_CHUNK_SIZE) + ret = add_comp_chunk(st, start, 0, 1, 1); + else + ret = add_comp_chunk(st, start, msg->sol, 1, 0); + + return !ret ? 1 : -1; +} + +static int +comp_http_data(struct stream *s, struct filter *filter, + struct http_msg *msg) +{ + struct comp_state *st = filter->ctx; + unsigned int start; + int is_last, ret; + + ret = MIN(msg->chunk_len + msg->next, msg->chn->buf->i) - FLT_NXT(filter, msg->chn); + if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) { + flt_set_forward_data(filter, msg->chn); + goto end; + } + if (!ret) + goto end; + + start = FLT_NXT(filter, msg->chn) - FLT_FWD(filter, msg->chn); + is_last = (!(msg->flags & HTTP_MSGF_TE_CHNK) && + (msg->chunk_len == ret - msg->next + FLT_NXT(filter, msg->chn))); + + if (add_comp_chunk(st, start, ret, 0, is_last) == -1) + ret = -1; + end: + return ret; +} + +static int +comp_http_forward_data(struct stream *s, struct filter *filter, + struct http_msg *msg, unsigned int len) +{ + struct comp_state *st = filter->ctx; + struct comp_chunk *cc, *back; + unsigned int sz, consumed = 0, compressed = 0; + int is_last = 0, ret = len; + + if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) { + flt_set_forward_data(filter, msg->chn); + goto end; + } + + /* no data to forward or no chunk or the first chunk is too far */ + if (!len || LIST_ISEMPTY(&st->comp_chunks)) + goto end; + if (st->first > len) { + consumed = len; + goto update_chunks; + } + + /* initialize the buffer used to write compressed data */ + b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first); + ret = http_compression_buffer_init(msg->chn->buf, tmpbuf); + b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first); + if (ret < 0) { + msg->chn->flags |= CF_WAKE_WRITE; + return 0; + } + + /* Loop on all chunks */ + list_for_each_entry_safe(cc, back, &st->comp_chunks, list) { + /* current chunk must not be handled yet */ + if (len <= cc->start) { + consumed = len; + break; + } + + /* Get the number of bytes that must be handled in the current + * chunk */ + sz = MIN(len, cc->end) - cc->start; + + if (cc->skip) { + /* No compression for this chunk, data must be + * skipped. This happens when the HTTP response is + * chunked, the chunk envelope is skipped. */ + ret = sz; + } + else { + /* Compress the chunk */ + b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + cc->start); + ret = http_compression_buffer_add_data(st, msg->chn->buf, tmpbuf, sz); + b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + cc->start); + if (ret < 0) + goto end; + compressed += ret; + } + + /* Update the chunk by removing consumed bytes. If all bytes are + * consumed, the chunk is removed from the list and we + * loop. Otherwise, we stop here. */ + cc->start += ret; + consumed = cc->start; + if (cc->start != cc->end) + break; + + /* Remember if this is the last chunk */ + is_last = cc->is_last; + LIST_DEL(&cc->list); + free(cc); + } + + if (compressed) { + /* Some data was compressed so we can switch buffers to replace + * uncompressed data by compressed ones. */ + b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first); + ret = http_compression_buffer_end(st, s, &msg->chn->buf, &tmpbuf, + consumed - st->first, is_last); + b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first); + } + else { + /* Here some data was consumed but no compression was + * preformed. This means that all consumed data must be + * skipped. + */ + ret = skip_input_data(filter, msg, consumed); + } + + if (is_last && !(msg->flags & HTTP_MSGF_TE_CHNK)) { + /* At the end of data, if the original response was not + * chunked-encoded, we must write the empty chunk 0, and + * terminate the (empty) trailers section with a last . If + * we're forwarding a chunked-encoded response, these parts are + * preserved and not rewritten. + */ + char *p = bi_end(msg->chn->buf); + memcpy(p, "0\r\n\r\n", 5); + msg->chn->buf->i += 5; + ret += 5; + } + + /* Then, the last step. We need to update state of other filters. */ + if (ret >= 0) { + flt_change_forward_size(filter, msg->chn, -(consumed - st->first - ret)); + msg->next -= (consumed - st->first - ret); + ret += st->first; + } + + update_chunks: + /* Now, we need to update all remaining chunks to keep them synchronized + * with the next position of buf->p. If the chunk list is empty, we + * forward remaining data, if any. */ + st->first -= MIN(st->first, consumed); + if (LIST_ISEMPTY(&st->comp_chunks)) + ret += len - consumed; + else { + list_for_each_entry(cc, &st->comp_chunks, list) { + cc->start -= consumed; + cc->end -= consumed; + } + } + + end: + return ret; +} + +/***********************************************************************/ +static int +add_comp_chunk(struct comp_state *st, unsigned int start, unsigned int len, + int skip, int is_last) +{ + struct comp_chunk *cc; + + if (!(cc = malloc(sizeof(*cc)))) + return -1; + cc->start = start; + cc->end = start + len; + cc->skip = skip; + cc->is_last = is_last; + + if (LIST_ISEMPTY(&st->comp_chunks)) + st->first = cc->start; + + LIST_ADDQ(&st->comp_chunks, &cc->list); + return 0; +} + +/* This function might be moved in a filter function, probably with others to + * add/remove/move/replace buffer data */ +static int +skip_input_data(struct filter *filter, struct http_msg *msg, + unsigned int consumed) +{ + struct comp_state *st = filter->ctx; + int block1, block2; + + /* 1. Copy input data, skipping consumed ones. */ + b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first + consumed); + block1 = msg->chn->buf->i; + if (block1 > bi_contig_data(msg->chn->buf)) + block1 = bi_contig_data(msg->chn->buf); + block2 = msg->chn->buf->i - block1; + + memcpy(trash.str, bi_ptr(msg->chn->buf), block1); + if (block2 > 0) + memcpy(trash.str + block1, msg->chn->buf->data, block2); + trash.len = block1 + block2; + b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first + consumed); + + /* 2. Then write back these data at the right place in the buffer */ + b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first); + block1 = trash.len; + if (block1 > bi_contig_data(msg->chn->buf)) + block1 = bi_contig_data(msg->chn->buf); + block2 = trash.len - block1; + + memcpy(bi_ptr(msg->chn->buf), trash.str, block1); + if (block2 > 0) + memcpy(msg->chn->buf->data, trash.str + block1, block2); + b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first); + + /* Then adjut the input size */ + msg->chn->buf->i -= consumed; + return 0; +} /***********************************************************************/ /* * Selects a compression algorithm depending on the client request. */ int -select_compression_request_header(struct stream *s, struct buffer *req) +select_compression_request_header(struct comp_state *st, struct stream *s, + struct http_msg *msg) { struct http_txn *txn = s->txn; - struct http_msg *msg = &txn->req; + struct buffer *req = msg->chn->buf; struct hdr_ctx ctx; struct comp_algo *comp_algo = NULL; struct comp_algo *comp_algo_back = NULL; @@ -54,12 +439,13 @@ select_compression_request_header(struct stream *s, struct buffer *req) ctx.line[ctx.val + 30] < '6' || (ctx.line[ctx.val + 30] == '6' && (ctx.vlen < 54 || memcmp(ctx.line + 51, "SV1", 3) != 0)))) { - s->comp_algo = NULL; + st->comp_algo = NULL; return 0; } /* search for the algo in the backend in priority or the frontend */ - if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) || (strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) { + if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) || + (strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) { int best_q = 0; ctx.idx = 0; @@ -107,7 +493,7 @@ select_compression_request_header(struct stream *s, struct buffer *req) for (comp_algo = comp_algo_back; comp_algo; comp_algo = comp_algo->next) { if (*(ctx.line + ctx.val) == '*' || word_match(ctx.line + ctx.val, toklen, comp_algo->ua_name, comp_algo->ua_name_len)) { - s->comp_algo = comp_algo; + st->comp_algo = comp_algo; best_q = q; break; } @@ -116,8 +502,9 @@ select_compression_request_header(struct stream *s, struct buffer *req) } /* remove all occurrences of the header when "compression offload" is set */ - if (s->comp_algo) { - if ((s->be->comp && s->be->comp->offload) || (strm_fe(s)->comp && strm_fe(s)->comp->offload)) { + if (st->comp_algo) { + if ((s->be->comp && s->be->comp->offload) || + (strm_fe(s)->comp && strm_fe(s)->comp->offload)) { http_remove_header2(msg, &txn->hdr_idx, &ctx); ctx.idx = 0; while (http_find_header2("Accept-Encoding", 15, req->p, &txn->hdr_idx, &ctx)) { @@ -128,38 +515,43 @@ select_compression_request_header(struct stream *s, struct buffer *req) } /* identity is implicit does not require headers */ - if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) || (strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) { + if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) || + (strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) { for (comp_algo = comp_algo_back; comp_algo; comp_algo = comp_algo->next) { if (comp_algo->cfg_name_len == 8 && memcmp(comp_algo->cfg_name, "identity", 8) == 0) { - s->comp_algo = comp_algo; + st->comp_algo = comp_algo; return 1; } } } - s->comp_algo = NULL; + st->comp_algo = NULL; return 0; } + /* * Selects a comression algorithm depending of the server response. */ -int -select_compression_response_header(struct stream *s, struct buffer *res) +static int +select_compression_response_header(struct comp_state *st, struct stream *s, struct http_msg *msg) { struct http_txn *txn = s->txn; - struct http_msg *msg = &txn->rsp; + struct buffer *res = msg->chn->buf; struct hdr_ctx ctx; struct comp_type *comp_type; /* no common compression algorithm was found in request header */ - if (s->comp_algo == NULL) + if (st->comp_algo == NULL) goto fail; /* HTTP < 1.1 should not be compressed */ if (!(msg->flags & HTTP_MSGF_VER_11) || !(txn->req.flags & HTTP_MSGF_VER_11)) goto fail; + if (txn->meth == HTTP_METH_HEAD) + goto fail; + /* compress 200,201,202,203 responses only */ if ((txn->status != 200) && (txn->status != 201) && @@ -210,7 +602,8 @@ select_compression_response_header(struct stream *s, struct buffer *res) } } else { /* no content-type header */ - if ((s->be->comp && s->be->comp->types) || (strm_fe(s)->comp && strm_fe(s)->comp->types)) + if ((s->be->comp && s->be->comp->types) || + (strm_fe(s)->comp && strm_fe(s)->comp->types)) goto fail; /* a content-type was required */ } @@ -224,11 +617,9 @@ select_compression_response_header(struct stream *s, struct buffer *res) goto fail; /* initialize compression */ - if (s->comp_algo->init(&s->comp_ctx, global.tune.comp_maxlevel) < 0) + if (st->comp_algo->init(&st->comp_ctx, global.tune.comp_maxlevel) < 0) goto fail; - s->flags |= SF_COMP_READY; - /* remove Content-Length header */ ctx.idx = 0; if ((msg->flags & HTTP_MSGF_CNT_LEN) && http_find_header2("Content-Length", 14, res->p, &txn->hdr_idx, &ctx)) @@ -244,18 +635,19 @@ select_compression_response_header(struct stream *s, struct buffer *res) * Accept-Encoding header, and SHOULD NOT be used in the Content-Encoding * header. */ - if (s->comp_algo->cfg_name_len != 8 || memcmp(s->comp_algo->cfg_name, "identity", 8) != 0) { + if (st->comp_algo->cfg_name_len != 8 || memcmp(st->comp_algo->cfg_name, "identity", 8) != 0) { trash.len = 18; memcpy(trash.str, "Content-Encoding: ", trash.len); - memcpy(trash.str + trash.len, s->comp_algo->ua_name, s->comp_algo->ua_name_len); - trash.len += s->comp_algo->ua_name_len; + memcpy(trash.str + trash.len, st->comp_algo->ua_name, st->comp_algo->ua_name_len); + trash.len += st->comp_algo->ua_name_len; trash.str[trash.len] = '\0'; http_header_add_tail2(&txn->rsp, &txn->hdr_idx, trash.str, trash.len); } + msg->flags |= HTTP_MSGF_COMPRESSING; return 1; fail: - s->comp_algo = NULL; + st->comp_algo = NULL; return 0; } @@ -282,8 +674,8 @@ http_emit_chunk_size(char *end, unsigned int chksz) /* * Init HTTP compression */ -int -http_compression_buffer_init(struct stream *s, struct buffer *in, struct buffer *out) +static int +http_compression_buffer_init(struct buffer *in, struct buffer *out) { /* output stream requires at least 10 bytes for the gzip header, plus * at least 8 bytes for the gzip trailer (crc+len), plus a possible @@ -307,43 +699,37 @@ http_compression_buffer_init(struct stream *s, struct buffer *in, struct buffer /* * Add data to compress */ -int -http_compression_buffer_add_data(struct stream *s, struct buffer *in, struct buffer *out) +static int +http_compression_buffer_add_data(struct comp_state *st, struct buffer *in, + struct buffer *out, int sz) { - struct http_msg *msg = &s->txn->rsp; int consumed_data = 0; int data_process_len; int block1, block2; - /* - * Temporarily skip already parsed data and chunks to jump to the - * actual data block. It is fixed before leaving. - */ - b_adv(in, msg->next); + if (!sz) + return 0; - /* - * select the smallest size between the announced chunk size, the input + /* select the smallest size between the announced chunk size, the input * data, and the available output buffer size. The compressors are - * assumed to be able to process all the bytes we pass to them at once. - */ - data_process_len = MIN(in->i, msg->chunk_len); + * assumed to be able to process all the bytes we pass to them at + * once. */ + data_process_len = sz; data_process_len = MIN(out->size - buffer_len(out), data_process_len); + block1 = data_process_len; if (block1 > bi_contig_data(in)) block1 = bi_contig_data(in); block2 = data_process_len - block1; /* compressors return < 0 upon error or the amount of bytes read */ - consumed_data = s->comp_algo->add_data(s->comp_ctx, bi_ptr(in), block1, out); + consumed_data = st->comp_algo->add_data(st->comp_ctx, bi_ptr(in), block1, out); if (consumed_data >= 0 && block2 > 0) { - consumed_data = s->comp_algo->add_data(s->comp_ctx, in->data, block2, out); + consumed_data = st->comp_algo->add_data(st->comp_ctx, in->data, block2, out); if (consumed_data >= 0) consumed_data += block1; } - - /* restore original buffer pointer */ - b_rew(in, msg->next); return consumed_data; } @@ -351,24 +737,23 @@ http_compression_buffer_add_data(struct stream *s, struct buffer *in, struct buf * Flush data in process, and write the header and footer of the chunk. Upon * success, in and out buffers are swapped to avoid a copy. */ -int -http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer **out, int end) +static int +http_compression_buffer_end(struct comp_state *st, struct stream *s, + struct buffer **in, struct buffer **out, + unsigned int consumed, int end) { - int to_forward; - int left; - struct http_msg *msg = &s->txn->rsp; struct buffer *ib = *in, *ob = *out; char *tail; + int to_forward, left; #if defined(USE_SLZ) || defined(USE_ZLIB) int ret; /* flush data here */ - if (end) - ret = s->comp_algo->finish(s->comp_ctx, ob); /* end of data */ + ret = st->comp_algo->finish(st->comp_ctx, ob); /* end of data */ else - ret = s->comp_algo->flush(s->comp_ctx, ob); /* end of buffer */ + ret = st->comp_algo->flush(st->comp_ctx, ob); /* end of buffer */ if (ret < 0) return -1; /* flush failed */ @@ -419,39 +804,21 @@ http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer *tail++ = '\r'; *tail++ = '\n'; - /* At the end of data, we must write the empty chunk 0, - * and terminate the trailers section with a last . If - * we're forwarding a chunked-encoded response, we'll have a - * trailers section after the empty chunk which needs to be - * forwarded and which will provide the last CRLF. Otherwise - * we write it ourselves. - */ - if (msg->msg_state >= HTTP_MSG_TRAILERS) { - memcpy(tail, "0\r\n", 3); - tail += 3; - if (msg->msg_state >= HTTP_MSG_ENDING) { - memcpy(tail, "\r\n", 2); - tail += 2; - } - } ob->i = tail - ob->p; - to_forward = ob->i; /* update input rate */ - if (s->comp_ctx && s->comp_ctx->cur_lvl > 0) { - update_freq_ctr(&global.comp_bps_in, msg->next); - strm_fe(s)->fe_counters.comp_in += msg->next; - s->be->be_counters.comp_in += msg->next; + if (st->comp_ctx && st->comp_ctx->cur_lvl > 0) { + update_freq_ctr(&global.comp_bps_in, consumed); + strm_fe(s)->fe_counters.comp_in += consumed; + s->be->be_counters.comp_in += consumed; } else { - strm_fe(s)->fe_counters.comp_byp += msg->next; - s->be->be_counters.comp_byp += msg->next; + strm_fe(s)->fe_counters.comp_byp += consumed; + s->be->be_counters.comp_byp += consumed; } /* copy the remaining data in the tmp buffer. */ - b_adv(ib, msg->next); - msg->next = 0; - + b_adv(ib, consumed); if (ib->i > 0) { left = bi_contig_data(ib); memcpy(ob->p + ob->i, bi_ptr(ib), left); @@ -466,26 +833,40 @@ http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer *in = ob; *out = ib; - if (s->comp_ctx && s->comp_ctx->cur_lvl > 0) { + + if (st->comp_ctx && st->comp_ctx->cur_lvl > 0) { update_freq_ctr(&global.comp_bps_out, to_forward); strm_fe(s)->fe_counters.comp_out += to_forward; s->be->be_counters.comp_out += to_forward; } - /* forward the new chunk without remaining data */ - b_adv(ob, to_forward); - return to_forward; } /***********************************************************************/ +struct flt_ops comp_ops = { + .init = comp_flt_init, + .deinit = comp_flt_deinit, + + .channel_start_analyze = comp_start_analyze, + .channel_analyze = comp_analyze, + .channel_end_analyze = comp_end_analyze, + + .http_headers = comp_http_headers, + .http_start_chunk = comp_skip_http_chunk_envelope, + .http_end_chunk = comp_skip_http_chunk_envelope, + .http_last_chunk = comp_skip_http_chunk_envelope, + .http_data = comp_http_data, + .http_forward_data = comp_http_forward_data, +}; + static int parse_compression_options(char **args, int section, struct proxy *proxy, struct proxy *defpx, const char *file, int line, char **err) { - struct comp *comp; + struct comp *comp; if (proxy->comp == NULL) { comp = calloc(1, sizeof(struct comp)); @@ -544,27 +925,107 @@ parse_compression_options(char **args, int section, struct proxy *proxy, return 0; } -/* boolean, returns true if compression is used (either gzip or deflate) in the response */ static int -smp_fetch_res_comp(const struct arg *args, struct sample *smp, const char *kw, void *private) +parse_http_comp_flt(char **args, int *cur_arg, struct proxy *px, + struct filter *filter, char **err) { + struct filter *flt, *back; + + list_for_each_entry_safe(flt, back, &px->filters, list) { + if (flt->id == http_comp_flt_id) { + memprintf(err, "%s: Proxy supports only one compression filter\n", px->id); + return -1; + } + } + + filter->id = http_comp_flt_id; + filter->conf = NULL; + filter->ops = &comp_ops; + (*cur_arg)++; + + return 0; +} + + +int +check_legacy_http_comp_flt(struct proxy *proxy) +{ + struct filter *filter; + int err = 0; + + if (proxy->comp == NULL) + goto end; + if (!LIST_ISEMPTY(&proxy->filters)) { + list_for_each_entry(filter, &proxy->filters, list) { + if (filter->id == http_comp_flt_id) + goto end; + } + Alert("config: %s '%s': require an explicit filter declaration to use HTTP compression\n", + proxy_type_str(proxy), proxy->id); + err++; + goto end; + } + + filter = pool_alloc2(pool2_filter); + if (!filter) { + Alert("config: %s '%s': out of memory\n", + proxy_type_str(proxy), proxy->id); + err++; + goto end; + } + memset(filter, 0, sizeof(*filter)); + filter->id = http_comp_flt_id; + filter->conf = NULL; + filter->ops = &comp_ops; + LIST_ADDQ(&proxy->filters, &filter->list); + + end: + return err; +} + +/* + * boolean, returns true if compression is used (either gzip or deflate) in the + * response. + */ +static int +smp_fetch_res_comp(const struct arg *args, struct sample *smp, const char *kw, + void *private) +{ + struct http_txn *txn = smp->strm->txn; + smp->data.type = SMP_T_BOOL; - smp->data.u.sint = (smp->strm->comp_algo != NULL); + smp->data.u.sint = (txn && (txn->rsp.flags & HTTP_MSGF_COMPRESSING)); return 1; } -/* string, returns algo */ +/* + * string, returns algo + */ static int -smp_fetch_res_comp_algo(const struct arg *args, struct sample *smp, const char *kw, void *private) +smp_fetch_res_comp_algo(const struct arg *args, struct sample *smp, + const char *kw, void *private) { - if (!smp->strm->comp_algo) + struct http_txn *txn = smp->strm->txn; + struct filter *filter; + struct comp_state *st; + + if (!(txn || !(txn->rsp.flags & HTTP_MSGF_COMPRESSING))) return 0; - smp->data.type = SMP_T_STR; - smp->flags = SMP_F_CONST; - smp->data.u.str.str = smp->strm->comp_algo->cfg_name; - smp->data.u.str.len = smp->strm->comp_algo->cfg_name_len; - return 1; + list_for_each_entry(filter, &smp->strm->strm_flt.filters, list) { + if (filter->id != http_comp_flt_id) + continue; + + if (!(st = filter->ctx)) + break; + + smp->data.type = SMP_T_STR; + smp->flags = SMP_F_CONST; + smp->data.u.str.str = st->comp_algo->cfg_name; + smp->data.u.str.len = st->comp_algo->cfg_name_len; + return 1; + } + return 0; } /* Declare the config parser for "compression" keyword */ @@ -574,16 +1035,26 @@ static struct cfg_kw_list cfg_kws = {ILH, { } }; +/* Declare the filter parser for "compression" keyword */ +static struct flt_kw_list filter_kws = { "COMP", { }, { + { "compression", parse_http_comp_flt }, + { NULL, NULL }, + } +}; + /* Note: must not be declared as its list will be overwritten */ static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, { - { "res.comp", smp_fetch_res_comp, 0, NULL, SMP_T_BOOL, SMP_USE_HRSHP }, - { "res.comp_algo", smp_fetch_res_comp_algo, 0, NULL, SMP_T_STR, SMP_USE_HRSHP }, - { /* END */ }, -}}; + { "res.comp", smp_fetch_res_comp, 0, NULL, SMP_T_BOOL, SMP_USE_HRSHP }, + { "res.comp_algo", smp_fetch_res_comp_algo, 0, NULL, SMP_T_STR, SMP_USE_HRSHP }, + { /* END */ }, + } +}; __attribute__((constructor)) -static void __flt_http_comp_init(void) +static void +__flt_http_comp_init(void) { cfg_register_keywords(&cfg_kws); + flt_register_keywords(&filter_kws); sample_register_fetches(&sample_fetch_keywords); } diff --git a/src/haproxy.c b/src/haproxy.c index 8ffdb677b..8ceabde9a 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -76,6 +76,7 @@ #include #include +#include #include #include #include diff --git a/src/proto_http.c b/src/proto_http.c index 3cb3b412b..48d12f0f6 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -69,8 +69,6 @@ #include #include -#include /* NOTE: temporary include, will be removed very soon */ - const char HTTP_100[] = "HTTP/1.1 100 Continue\r\n\r\n"; @@ -4193,10 +4191,6 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s if (!(s->flags & SF_FINST_MASK)) s->flags |= SF_FINST_R; - /* we may want to compress the stats page */ - if (sess->fe->comp || s->be->comp) - select_compression_request_header(s, req->buf); - /* enable the minimally required analyzers to handle keep-alive and compression on the HTTP response */ req->analysers &= (AN_REQ_HTTP_BODY | AN_FLT_END); req->analysers &= ~AN_FLT_XFER_DATA; @@ -4335,9 +4329,6 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit) req->buf->i, req->analysers); - if (sess->fe->comp || s->be->comp) - select_compression_request_header(s, req->buf); - /* * Right now, we know that we have processed the entire headers * and that unwanted requests have been filtered out. We can do @@ -4942,15 +4933,11 @@ void http_end_txn_clean_session(struct stream *s) if (fe->mode == PR_MODE_HTTP) { fe->fe_counters.p.http.rsp[n]++; - if (s->comp_algo && (s->flags & SF_COMP_READY)) - fe->fe_counters.p.http.comp_rsp++; } if ((s->flags & SF_BE_ASSIGNED) && (be->mode == PR_MODE_HTTP)) { be->be_counters.p.http.rsp[n]++; be->be_counters.p.http.cum_req++; - if (s->comp_algo && (s->flags & SF_COMP_READY)) - be->be_counters.p.http.comp_rsp++; } } @@ -6289,7 +6276,6 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) (txn->status >= 100 && txn->status < 200) || txn->status == 204 || txn->status == 304) { msg->flags |= HTTP_MSGF_XFER_LEN; - s->comp_algo = NULL; goto skip_content_length; } @@ -6339,9 +6325,6 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) msg->body_len = msg->chunk_len = cl; } - if (sess->fe->comp || s->be->comp) - select_compression_response_header(s, rep->buf); - skip_content_length: /* Now we have to check if we need to modify the Connection header. * This is more difficult on the response than it is on the request, @@ -7038,8 +7021,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit if (msg->sov > 0) msg->sov -= ret; - if ((s->comp_algo == NULL || msg->msg_state >= HTTP_MSG_TRAILERS) && - LIST_ISEMPTY(&s->strm_flt.filters)) + if (LIST_ISEMPTY(&s->strm_flt.filters)) msg->chunk_len -= channel_forward(res, msg->chunk_len); if (res->flags & CF_SHUTW) @@ -7073,7 +7055,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit * Similarly, with keep-alive on the client side, we don't want to forward a * close. */ - if ((msg->flags & HTTP_MSGF_TE_CHNK) || s->comp_algo || !msg->body_len || + if ((msg->flags & HTTP_MSGF_TE_CHNK) || !msg->body_len || + (msg->flags & HTTP_MSGF_COMPRESSING) || (txn->flags & TX_CON_WANT_MSK) == TX_CON_WANT_KAL || (txn->flags & TX_CON_WANT_MSK) == TX_CON_WANT_SCL) channel_dont_close(res); @@ -7086,7 +7069,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit * flag with the last block of forwarded data, which would cause an * additional delay to be observed by the receiver. */ - if ((msg->flags & HTTP_MSGF_TE_CHNK) || s->comp_algo) + if ((msg->flags & HTTP_MSGF_TE_CHNK) || (msg->flags & HTTP_MSGF_COMPRESSING)) res->flags |= CF_EXPECT_MORE; /* the stream handler will take care of timeouts and errors */ @@ -8810,12 +8793,6 @@ void http_end_txn(struct stream *s) struct http_txn *txn = s->txn; struct proxy *fe = strm_fe(s); - /* release any possible compression context */ - if (s->flags & SF_COMP_READY) - s->comp_algo->end(&s->comp_ctx); - s->comp_algo = NULL; - s->flags &= ~SF_COMP_READY; - /* these ones will have been dynamically allocated */ pool_free2(pool2_requri, txn->uri); pool_free2(pool2_capture, txn->cli_cookie); diff --git a/src/proxy.c b/src/proxy.c index 2014c739e..f22c7462f 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -1130,8 +1130,6 @@ void resume_proxies(void) */ int stream_set_backend(struct stream *s, struct proxy *be) { - struct filter *filter; - if (s->flags & SF_BE_ASSIGNED) return 1; s->be = be; @@ -1140,19 +1138,8 @@ int stream_set_backend(struct stream *s, struct proxy *be) be->be_counters.conn_max = be->beconn; proxy_inc_be_ctr(be); - if (strm_fe(s) != be) { - list_for_each_entry(filter, &be->filters, list) { - struct filter *f = pool_alloc2(pool2_filter); - if (!f) - return 0; /* not enough memory */ - memset(f, 0, sizeof(*f)); - f->id = filter->id; - f->ops = filter->ops; - f->conf = filter->conf; - f->is_backend_filter = 1; - LIST_ADDQ(&s->strm_flt.filters, &f->list); - } - } + if (flt_set_stream_backend(s, be) < 0) + return 0; /* assign new parameters to the stream from the new backend */ s->si[1].flags &= ~SI_FL_INDEP_STR; diff --git a/src/stream.c b/src/stream.c index a98ecb06d..c5d6d437b 100644 --- a/src/stream.c +++ b/src/stream.c @@ -76,7 +76,6 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords); struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin) { struct stream *s; - struct filter *filter, *back; struct connection *conn = objt_conn(origin); struct appctx *appctx = objt_appctx(origin); @@ -147,7 +146,6 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o * when the default backend is assigned. */ s->be = sess->fe; - s->comp_algo = NULL; s->req.buf = s->res.buf = NULL; s->req_cap = NULL; s->res_cap = NULL; @@ -217,19 +215,7 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o HLUA_INIT(&s->hlua); - LIST_INIT(&s->strm_flt.filters); - memset(s->strm_flt.current, 0, sizeof(s->strm_flt.current)); - list_for_each_entry(filter, &sess->fe->filters, list) { - struct filter *f = pool_alloc2(pool2_filter); - if (!f) - goto out_fail_accept; - memset(f, 0, sizeof(*f)); - f->id = filter->id; - f->ops = filter->ops; - f->conf = filter->conf; - LIST_ADDQ(&s->strm_flt.filters, &f->list); - } - if (flt_stream_start(s) < 0) + if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0) goto out_fail_accept; /* finish initialization of the accepted file descriptor */ @@ -250,10 +236,7 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o /* Error unrolling */ out_fail_accept: - list_for_each_entry_safe(filter, back, &s->strm_flt.filters, list) { - LIST_DEL(&filter->list); - pool_free2(pool2_filter, filter); - } + flt_stream_release(s, 0); LIST_DEL(&s->list); pool_free2(pool2_stream, s); return NULL; @@ -268,7 +251,6 @@ static void stream_free(struct stream *s) struct proxy *fe = sess->fe; struct bref *bref, *back; struct connection *cli_conn = objt_conn(sess->origin); - struct filter *filter, *fback; int i; if (s->pend_pos) @@ -330,10 +312,7 @@ static void stream_free(struct stream *s) } flt_stream_stop(s); - list_for_each_entry_safe(filter, fback, &s->strm_flt.filters, list) { - LIST_DEL(&filter->list); - pool_free2(pool2_filter, filter); - } + flt_stream_release(s, 0); if (fe) { pool_free2(fe->rsp_cap_pool, s->res_cap); @@ -2552,15 +2531,11 @@ struct task *process_stream(struct task *t) if (sess->fe->mode == PR_MODE_HTTP) { sess->fe->fe_counters.p.http.rsp[n]++; - if (s->comp_algo && (s->flags & SF_COMP_READY)) - sess->fe->fe_counters.p.http.comp_rsp++; } if ((s->flags & SF_BE_ASSIGNED) && (s->be->mode == PR_MODE_HTTP)) { s->be->be_counters.p.http.rsp[n]++; s->be->be_counters.p.http.cum_req++; - if (s->comp_algo && (s->flags & SF_COMP_READY)) - s->be->be_counters.p.http.comp_rsp++; } }