diff --git a/include/proto/filters.h b/include/proto/filters.h index 4ed81a8e5..dd7490fc0 100644 --- a/include/proto/filters.h +++ b/include/proto/filters.h @@ -35,18 +35,27 @@ #define FLT_NXT(flt, chn) ((flt)->next[CHN_IDX(chn)]) #define FLT_FWD(flt, chn) ((flt)->fwd[CHN_IDX(chn)]) -#define HAS_FILTERS(strm) ((strm)->strm_flt.has_filters) +#define HAS_FILTERS(strm) ((strm)->strm_flt.flags & STRM_FLT_FL_HAS_FILTERS) -#define FLT_STRM_CB_IMPL_0(strm, call) \ +#define HAS_REQ_DATA_FILTERS(strm) ((strm)->strm_flt.nb_req_data_filters != 0) +#define HAS_RSP_DATA_FILTERS(strm) ((strm)->strm_flt.nb_rsp_data_filters != 0) +#define HAS_DATA_FILTERS(strm, chn) ((chn->flags & CF_ISRESP) ? HAS_RSP_DATA_FILTERS(strm) : HAS_REQ_DATA_FILTERS(strm)) + +#define IS_REQ_DATA_FILTER(flt) ((flt)->flags & FLT_FL_IS_REQ_DATA_FILTER) +#define IS_RSP_DATA_FILTER(flt) ((flt)->flags & FLT_FL_IS_RSP_DATA_FILTER) +#define IS_DATA_FILTER(flt, chn) ((chn->flags & CF_ISRESP) ? IS_RSP_DATA_FILTER(flt) : IS_REQ_DATA_FILTER(flt)) + +#define FLT_STRM_CB(strm, call) \ do { \ if (HAS_FILTERS(strm)) { call; } \ } while (0) -#define FLT_STRM_CB_IMPL_1(strm, call, default_ret, ...) \ - (HAS_FILTERS(strm) ? call : default_ret) -#define FLT_STRM_CB_IMPL_2(strm, call, default_ret, on_error) \ + +#define FLT_STRM_DATA_CB_IMPL_1(strm, chn, call, default_ret) \ + (HAS_DATA_FILTERS(strm, chn) ? call : default_ret) +#define FLT_STRM_DATA_CB_IMPL_2(strm, chn, call, default_ret, on_error) \ ({ \ int _ret; \ - if (HAS_FILTERS(strm)) { \ + if (HAS_DATA_FILTERS(strm, chn)) { \ _ret = call; \ if (_ret < 0) { on_error; } \ } \ @@ -54,10 +63,10 @@ _ret = default_ret; \ _ret; \ }) -#define FLT_STRM_CB_IMPL_3(strm, call, default_ret, on_error, on_wait) \ +#define FLT_STRM_DATA_CB_IMPL_3(strm, chn, call, default_ret, on_error, on_wait) \ ({ \ int _ret; \ - if (HAS_FILTERS(strm)) { \ + if (HAS_DATA_FILTERS(strm, chn)) { \ _ret = call; \ if (_ret < 0) { on_error; } \ if (!_ret) { on_wait; } \ @@ -67,14 +76,14 @@ _ret; \ }) -#define FLT_STRM_CB_IMPL_X(strm, call, A, B, C, CB_IMPL, ...) CB_IMPL +#define FLT_STRM_DATA_CB_IMPL_X(strm, chn, call, A, B, C, DATA_CB_IMPL, ...) \ + DATA_CB_IMPL -#define FLT_STRM_CB(strm, call, ...) \ - FLT_STRM_CB_IMPL_X(strm, call, ##__VA_ARGS__, \ - FLT_STRM_CB_IMPL_3(strm, call, ##__VA_ARGS__), \ - FLT_STRM_CB_IMPL_2(strm, call, ##__VA_ARGS__), \ - FLT_STRM_CB_IMPL_1(strm, call, ##__VA_ARGS__), \ - FLT_STRM_CB_IMPL_0(strm, call)) +#define FLT_STRM_DATA_CB(strm, chn, call, ...) \ + FLT_STRM_DATA_CB_IMPL_X(strm, chn, call, ##__VA_ARGS__, \ + FLT_STRM_DATA_CB_IMPL_3(strm, chn, call, ##__VA_ARGS__), \ + FLT_STRM_DATA_CB_IMPL_2(strm, chn, call, ##__VA_ARGS__), \ + FLT_STRM_DATA_CB_IMPL_1(strm, chn, call, ##__VA_ARGS__)) #define CALL_FILTER_ANALYZER(analyzer, strm, chn, bit) \ if (!HAS_FILTERS(strm) || analyzer((strm), (chn), bit)) ; else break @@ -118,25 +127,41 @@ strm_flt(struct stream *s) return &s->strm_flt; } +/* Registers a filter to a channel. If a filter was already registered, this + * function do nothing. Once registered, the filter becomes a "data" filter for + * this channel. */ static inline void -flt_set_forward_data(struct filter *filter, struct channel *chn) +register_data_filter(struct stream *s, struct channel *chn, struct filter *filter) { - filter->flags[CHN_IDX(chn)] |= FILTER_FL_FORWARD_DATA; + if (!IS_DATA_FILTER(filter, chn)) { + if (chn->flags & CF_ISRESP) { + filter->flags |= FLT_FL_IS_RSP_DATA_FILTER; + strm_flt(s)->nb_rsp_data_filters++; + } + else { + filter->flags |= FLT_FL_IS_REQ_DATA_FILTER; + strm_flt(s)->nb_req_data_filters++; + } + } } +/* Unregisters a "data" filter from a channel. */ static inline void -flt_reset_forward_data(struct filter *filter, struct channel *chn) +unregister_data_filter(struct stream *s, struct channel *chn, struct filter *filter) { - filter->flags[CHN_IDX(chn)] &= ~FILTER_FL_FORWARD_DATA; -} + if (IS_DATA_FILTER(filter, chn)) { + if (chn->flags & CF_ISRESP) { + filter->flags &= ~FLT_FL_IS_RSP_DATA_FILTER; + strm_flt(s)->nb_rsp_data_filters--; -static inline int -flt_want_forward_data(struct filter *filter, const struct channel *chn) -{ - return filter->flags[CHN_IDX(chn)] & FILTER_FL_FORWARD_DATA; + } + else { + filter->flags &= ~FLT_FL_IS_REQ_DATA_FILTER; + strm_flt(s)->nb_req_data_filters--; + } + } } - /* This function must be called when a filter alter incoming data. It updates * next offset value of all filter's predecessors. Do not call this function * when a filter change the size of incomding data leads to an undefined diff --git a/include/types/filters.h b/include/types/filters.h index 20b0c9504..38f96d9ca 100644 --- a/include/types/filters.h +++ b/include/types/filters.h @@ -172,6 +172,15 @@ struct flt_ops { unsigned int len); }; +/* Flags set on a filter instance */ +#define FLT_FL_IS_BACKEND_FILTER 0x0001 /* The filter is a backend filter */ +#define FLT_FL_IS_REQ_DATA_FILTER 0x0002 /* The filter will parse data on the request channel */ +#define FLT_FL_IS_RSP_DATA_FILTER 0x0004 /* The filter will parse data on the response channel */ + + +/* Flags set on the stream, common to all filters attached to its stream */ +#define STRM_FLT_FL_HAS_FILTERS 0x0001 /* The stream has at least one filter */ + /* * Structure representing the state of a filter. When attached to a proxy, only * and field (and optionnaly ) are set. All other fields are @@ -188,8 +197,7 @@ struct filter { struct flt_ops *ops; /* The filter callbacks */ void *conf; /* The filter configuration */ void *ctx; /* The filter context (opaque) */ - int is_backend_filter; /* Flag to specify if the filter is a "backend" filter */ - unsigned int flags[2]; /* 0: request, 1: response */ + unsigned short flags; /* FLT_FL_* */ unsigned int next[2]; /* Offset, relative to buf->p, to the next byte to parse for a specific channel * 0: request channel, 1: response channel */ unsigned int fwd[2]; /* Offset, relative to buf->p, to the next byte to forward for a specific channel @@ -197,10 +205,18 @@ struct filter { struct list list; /* Next filter for the same proxy/stream */ }; +/* + * Structure reprensenting the "global" state of filters attached to a stream. + */ struct strm_flt { - struct list filters; - struct filter *current[2]; // 0: request, 1: response - int has_filters; + struct list filters; /* List of filters attached to a stream */ + struct filter *current[2]; /* From which filter resume processing, for a specific channel. + * This is used for resumable callbacks only, + * If NULL, we start from the first filter. + * 0: request channel, 1: response channel */ + unsigned short flags; /* STRM_FL_* */ + unsigned char nb_req_data_filters; /* Number of data filters registerd on the request channel */ + unsigned char nb_rsp_data_filters; /* Number of data filters registerd on the response channel */ }; #endif /* _TYPES_FILTERS_H */ diff --git a/src/filters.c b/src/filters.c index 399f4178e..1ab2b376d 100644 --- a/src/filters.c +++ b/src/filters.c @@ -57,23 +57,23 @@ static int handle_analyzer_result(struct stream *s, struct channel *chn, unsigne do { \ struct filter *filter; \ \ - if ((strm)->strm_flt.current[CHN_IDX(chn)]) { \ - filter = (strm)->strm_flt.current[CHN_IDX(chn)]; \ - (strm)->strm_flt.current[CHN_IDX(chn)] = NULL; \ + if (strm_flt(strm)->current[CHN_IDX(chn)]) { \ + filter = strm_flt(strm)->current[CHN_IDX(chn)]; \ + strm_flt(strm)->current[CHN_IDX(chn)] = NULL; \ goto resume_execution; \ } \ \ list_for_each_entry(filter, &strm_flt(s)->filters, list) { \ - resume_execution: + resume_execution: #define RESUME_FILTER_END \ } \ } while(0) -#define BREAK_EXECUTION(strm, chn, label) \ - do { \ - (strm)->strm_flt.current[CHN_IDX(chn)] = filter; \ - goto label; \ +#define BREAK_EXECUTION(strm, chn, label) \ + do { \ + strm_flt(strm)->current[CHN_IDX(chn)] = filter; \ + goto label; \ } while (0) @@ -283,8 +283,7 @@ flt_deinit(struct proxy *proxy) /* Attaches a filter to a stream. Returns -1 if an error occurs, 0 otherwise. */ static int -flt_stream_add_filter(struct stream *s, struct filter *filter, - int is_backend) +flt_stream_add_filter(struct stream *s, struct filter *filter, unsigned int flags) { struct filter *f = pool_alloc2(pool2_filter); if (!f) /* not enough memory */ @@ -293,9 +292,9 @@ flt_stream_add_filter(struct stream *s, struct filter *filter, f->id = filter->id; f->ops = filter->ops; f->conf = filter->conf; - f->is_backend_filter = is_backend; + f->flags |= flags; LIST_ADDQ(&strm_flt(s)->filters, &f->list); - strm_flt(s)->has_filters = 1; + strm_flt(s)->flags |= STRM_FLT_FL_HAS_FILTERS; return 0; } @@ -329,13 +328,13 @@ flt_stream_release(struct stream *s, int only_backend) struct filter *filter, *back; list_for_each_entry_safe(filter, back, &strm_flt(s)->filters, list) { - if (!only_backend || filter->is_backend_filter) { + if (!only_backend || (filter->flags & FLT_FL_IS_BACKEND_FILTER)) { LIST_DEL(&filter->list); pool_free2(pool2_filter, filter); } } if (LIST_ISEMPTY(&strm_flt(s)->filters)) - strm_flt(s)->has_filters = 0; + strm_flt(s)->flags &= ~STRM_FLT_FL_HAS_FILTERS; } /* @@ -384,7 +383,7 @@ flt_set_stream_backend(struct stream *s, struct proxy *be) return 0; list_for_each_entry(filter, &be->filters, list) { - if (flt_stream_add_filter(s, filter, 1) < 0) + if (flt_stream_add_filter(s, filter, FLT_FL_IS_BACKEND_FILTER) < 0) return -1; } return 0; @@ -410,31 +409,38 @@ flt_http_data(struct stream *s, struct http_msg *msg) /* Save buffer state */ buf_i = buf->i; - buf->i = MIN(msg->chunk_len + msg->next, buf->i); list_for_each_entry(filter, &strm_flt(s)->filters, list) { + unsigned int *nxt; + + /* Call "data" filters only */ + if (!IS_DATA_FILTER(filter, msg->chn)) + continue; + /* If the HTTP parser is ahead, we update the next offset of the * current filter. This happens for chunked messages, at the * begining of a new chunk. */ - if (msg->next > FLT_NXT(filter, msg->chn)) - FLT_NXT(filter, msg->chn) = msg->next; - if (filter->ops->http_data && !flt_want_forward_data(filter, msg->chn)) { + nxt = &FLT_NXT(filter, msg->chn); + if (msg->next > *nxt) + *nxt = msg->next; + + if (filter->ops->http_data) { ret = filter->ops->http_data(s, filter, msg); - if (ret <= 0) + if (ret < 0) break; /* Update the next offset of the current filter */ - FLT_NXT(filter, msg->chn) += ret; + *nxt += ret; /* And set this value as the bound for the next * filter. It will not able to parse more data than this * one. */ - buf->i = FLT_NXT(filter, msg->chn); + buf->i = *nxt; } else { /* Consume all available data and update the next offset * of the current filter. buf->i is untouched here. */ - ret = buf->i - FLT_NXT(filter, msg->chn); - FLT_NXT(filter, msg->chn) = buf->i; + ret = MIN(msg->chunk_len + msg->next, buf->i) - *nxt; + *nxt += ret; } } @@ -456,13 +462,21 @@ int flt_http_chunk_trailers(struct stream *s, struct http_msg *msg) { struct filter *filter; - int ret = 1; + int ret = 1; list_for_each_entry(filter, &strm_flt(s)->filters, list) { + unsigned int *nxt; + + /* Call "data" filters only */ + if (!IS_DATA_FILTER(filter, msg->chn)) + continue; + /* Be sure to set the next offset of the filter at the right * place. This is really useful when the first part of the * trailers was parsed. */ - FLT_NXT(filter, msg->chn) = msg->next; + nxt = &FLT_NXT(filter, msg->chn); + *nxt = msg->next; + if (filter->ops->http_chunk_trailers) { ret = filter->ops->http_chunk_trailers(s, filter, msg); if (ret < 0) @@ -470,7 +484,7 @@ flt_http_chunk_trailers(struct stream *s, struct http_msg *msg) } /* Update the next offset of the current filter. Here all data * are always consumed. */ - FLT_NXT(filter, msg->chn) += msg->sol; + *nxt += msg->sol; } return ret; } @@ -493,7 +507,6 @@ flt_http_end(struct stream *s, struct http_msg *msg) if (ret <= 0) BREAK_EXECUTION(s, msg->chn, end); } - flt_reset_forward_data(filter, msg->chn); } RESUME_FILTER_END; end: return ret; @@ -545,27 +558,35 @@ flt_http_forward_data(struct stream *s, struct http_msg *msg, unsigned int len) int ret = len; list_for_each_entry(filter, &strm_flt(s)->filters, list) { + unsigned int *nxt, *fwd; + + /* Call "data" filters only */ + if (!IS_DATA_FILTER(filter, msg->chn)) + continue; + /* If the HTTP parser is ahead, we update the next offset of the * current filter. This happens for chunked messages, when the * chunk envelope is parsed. */ - if (msg->next > FLT_NXT(filter, msg->chn)) - FLT_NXT(filter, msg->chn) = msg->next; + nxt = &FLT_NXT(filter, msg->chn); + fwd = &FLT_FWD(filter, msg->chn); + if (msg->next > *nxt) + *nxt = msg->next; + if (filter->ops->http_forward_data) { - /* Remove bytes that the current filter considered as - * forwarded */ - ret = filter->ops->http_forward_data(s, filter, msg, - ret - FLT_FWD(filter, msg->chn)); + /* Remove bytes that the current filter considered as + * forwarded */ + ret = filter->ops->http_forward_data(s, filter, msg, ret - *fwd); if (ret < 0) goto end; } /* Adjust bytes that the current filter considers as * forwarded */ - FLT_FWD(filter, msg->chn) += ret; + *fwd += ret; /* And set this value as the bound for the next filter. It will * not able to forward more data than the current one. */ - ret = FLT_FWD(filter, msg->chn); + ret = *fwd; } if (!ret) @@ -574,6 +595,8 @@ flt_http_forward_data(struct stream *s, struct http_msg *msg, unsigned int len) /* Finally, adjust filters offsets by removing data that HAProxy will * forward. */ list_for_each_entry(filter, &strm_flt(s)->filters, list) { + if (!IS_DATA_FILTER(filter, msg->chn)) + continue; FLT_NXT(filter, msg->chn) -= ret; FLT_FWD(filter, msg->chn) -= ret; } @@ -599,7 +622,7 @@ flt_start_analyze(struct stream *s, struct channel *chn, unsigned int an_bit) * so we do not need to check the filter list's emptiness. */ RESUME_FILTER_LOOP(s, chn) { - if (an_bit == AN_FLT_START_BE && !filter->is_backend_filter) + if (an_bit == AN_FLT_START_BE && !(filter->flags & FLT_FL_IS_BACKEND_FILTER)) continue; FLT_NXT(filter, chn) = 0; @@ -649,9 +672,8 @@ flt_analyze(struct stream *s, struct channel *chn, unsigned int an_bit) int flt_analyze_http_headers(struct stream *s, struct channel *chn, unsigned int an_bit) { - struct filter *filter; - struct http_msg *msg; - int ret = 1; + struct filter *filter; + int ret = 1; RESUME_FILTER_LOOP(s, chn) { if (filter->ops->channel_analyze) { @@ -665,9 +687,13 @@ flt_analyze_http_headers(struct stream *s, struct channel *chn, unsigned int an_ * headers because any filter can alter them. So the definitive size of * headers (msg->sov) is only known when all filters have been * called. */ - msg = ((chn->flags & CF_ISRESP) ? &s->txn->rsp : &s->txn->req); list_for_each_entry(filter, &strm_flt(s)->filters, list) { - FLT_NXT(filter, msg->chn) = msg->sov; + /* Handle "data" filters only */ + if (!IS_DATA_FILTER(filter, chn)) + continue; + + FLT_NXT(filter, chn) = ((chn->flags & CF_ISRESP) + ? s->txn->rsp.sov : s->txn->req.sov); } check_result: @@ -686,12 +712,10 @@ flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit) { int ret = 1; - /* If this function is called, this means there is at least one filter, - * so we do not need to check the filter list's emptiness. */ - RESUME_FILTER_LOOP(s, chn) { FLT_NXT(filter, chn) = 0; FLT_FWD(filter, chn) = 0; + unregister_data_filter(s, chn, filter); if (filter->ops->channel_end_analyze) { ret = filter->ops->channel_end_analyze(s, filter, chn); @@ -738,7 +762,7 @@ end: static int flt_data(struct stream *s, struct channel *chn) { - struct filter *filter = NULL; + struct filter *filter; struct buffer *buf = chn->buf; unsigned int buf_i; int ret = 0; @@ -747,27 +771,35 @@ flt_data(struct stream *s, struct channel *chn) buf_i = buf->i; list_for_each_entry(filter, &strm_flt(s)->filters, list) { - if (filter->ops->tcp_data && !flt_want_forward_data(filter, chn)) { + unsigned int *nxt; + + /* Call "data" filters only */ + if (!IS_DATA_FILTER(filter, chn)) + continue; + + nxt = &FLT_NXT(filter, chn); + if (filter->ops->tcp_data) { ret = filter->ops->tcp_data(s, filter, chn); if (ret < 0) break; /* Increase next offset of the current filter */ - FLT_NXT(filter, chn) += ret; + *nxt += ret; /* And set this value as the bound for the next * filter. It will not able to parse more data than the * current one. */ - buf->i = FLT_NXT(filter, chn); + buf->i = *nxt; } else { /* Consume all available data */ - FLT_NXT(filter, chn) = buf->i; + *nxt = buf->i; } /* Update value to be sure to have the last one when we - * exit from the loop. */ - ret = FLT_NXT(filter, chn); + * exit from the loop. This value will be used to know how much + * data are "forwardable" */ + ret = *nxt; } /* Restore the original buffer state */ @@ -787,40 +819,46 @@ flt_data(struct stream *s, struct channel *chn) static int flt_forward_data(struct stream *s, struct channel *chn, unsigned int len) { - struct filter *filter = NULL; + struct filter *filter; int ret = len; list_for_each_entry(filter, &strm_flt(s)->filters, list) { + unsigned int *fwd; + + /* Call "data" filters only */ + if (!IS_DATA_FILTER(filter, chn)) + continue; + + fwd = &FLT_FWD(filter, chn); if (filter->ops->tcp_forward_data) { /* Remove bytes that the current filter considered as * forwarded */ - ret = filter->ops->tcp_forward_data(s, filter, chn, - ret - FLT_FWD(filter, chn)); + ret = filter->ops->tcp_forward_data(s, filter, chn, ret - *fwd); if (ret < 0) goto end; } - /* Adjust bytes taht the current filter considers as + /* Adjust bytes that the current filter considers as * forwarded */ - FLT_FWD(filter, chn) += ret; + *fwd += ret; /* And set this value as the bound for the next filter. It will * not able to forward more data than the current one. */ - ret = FLT_FWD(filter, chn); + ret = *fwd; } if (!ret) goto end; - /* Adjust forward counter and next offset of filters by removing data - * that HAProxy will consider as forwarded. */ + /* Finally, adjust filters offsets by removing data that HAProxy will + * forward. */ list_for_each_entry(filter, &strm_flt(s)->filters, list) { + if (!IS_DATA_FILTER(filter, chn)) + continue; FLT_NXT(filter, chn) -= ret; FLT_FWD(filter, chn) -= ret; } - /* Consume data that all filters consider as forwarded. */ - b_adv(chn->buf, ret); end: return ret; } @@ -838,8 +876,9 @@ flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit) { int ret = 1; - /* If this function is called, this means there is at least one filter, - * so we do not need to check the filter list's emptiness. */ + /* If there is no "data" filters, we do nothing */ + if (!HAS_DATA_FILTERS(s, chn)) + goto end; /* Be sure that the output is still opened. Else we stop the data * filtering. */ @@ -857,6 +896,9 @@ flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit) if (ret < 0) goto end; + /* Consume data that all filters consider as forwarded. */ + b_adv(chn->buf, ret); + /* Stop waiting data if the input in closed and no data is pending or if * the output is closed. */ if ((chn->flags & CF_SHUTW) || diff --git a/src/flt_http_comp.c b/src/flt_http_comp.c index e7d3577da..12d9e3330 100644 --- a/src/flt_http_comp.c +++ b/src/flt_http_comp.c @@ -113,8 +113,10 @@ comp_analyze(struct stream *s, struct filter *filter, struct channel *chn, select_compression_request_header(st, s, &s->txn->req); else { select_compression_response_header(st, s, &s->txn->rsp); - if (st->comp_algo) + if (st->comp_algo) { st->sov = s->txn->rsp.sov; + register_data_filter(s, chn, filter); + } } } @@ -155,10 +157,8 @@ comp_http_data(struct stream *s, struct filter *filter, struct http_msg *msg) unsigned int len; int ret; - if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) { - flt_set_forward_data(filter, msg->chn); + if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) return 1; - } len = MIN(msg->chunk_len + msg->next, msg->chn->buf->i) - FLT_NXT(filter, msg->chn); if (!len) @@ -193,11 +193,6 @@ comp_http_chunk_trailers(struct stream *s, struct filter *filter, struct comp_state *st = filter->ctx; int ret; - if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) { - flt_set_forward_data(filter, msg->chn); - return 1; - } - if (!st->initialized) return 1; @@ -223,12 +218,6 @@ comp_http_forward_data(struct stream *s, struct filter *filter, struct comp_state *st = filter->ctx; int ret; - if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) { - flt_set_forward_data(filter, msg->chn); - ret = len; - return ret; - } - /* To work, previous filters MUST forward all data */ if (FLT_FWD(filter, msg->chn) + len != FLT_NXT(filter, msg->chn)) { Warning("HTTP compression failed: unexpected behavior of previous filters\n"); diff --git a/src/proto_http.c b/src/proto_http.c index b25d18a2f..0e035a51d 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -6800,9 +6800,9 @@ http_msg_forward_body(struct stream *s, struct http_msg *msg) msg->chunk_len += len; msg->body_len += len; } - ret = FLT_STRM_CB(s, flt_http_data(s, msg), - /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next), - /* on_error */ goto error); + ret = FLT_STRM_DATA_CB(s, chn, flt_http_data(s, msg), + /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next), + /* on_error */ goto error); msg->next += ret; msg->chunk_len -= ret; if (msg->chunk_len) { @@ -6821,26 +6821,26 @@ http_msg_forward_body(struct stream *s, struct http_msg *msg) ending: /* we may have some pending data starting at res->buf->p such as a last * chunk of data or trailers. */ - ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next), - /* default_ret */ msg->next, - /* on_error */ goto error); + ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next), + /* default_ret */ msg->next, + /* on_error */ goto error); b_adv(chn->buf, ret); msg->next -= ret; if (msg->next) goto missing_data_or_waiting; - FLT_STRM_CB(s, flt_http_end(s, msg), - /* default_ret */ 1, - /* on_error */ goto error, - /* on_wait */ goto waiting); + FLT_STRM_DATA_CB(s, chn, flt_http_end(s, msg), + /* default_ret */ 1, + /* on_error */ goto error, + /* on_wait */ goto waiting); msg->msg_state = HTTP_MSG_DONE; return 1; missing_data_or_waiting: /* we may have some pending data starting at chn->buf->p */ - ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next), - /* default_ret */ msg->next, - /* on_error */ goto error); + ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next), + /* default_ret */ msg->next, + /* on_error */ goto error); b_adv(chn->buf, ret); msg->next -= ret; if (!(chn->flags & CF_WROTE_DATA) || msg->sov > 0) @@ -6866,10 +6866,10 @@ http_msg_forward_chunked_body(struct stream *s, struct http_msg *msg) switch_states: switch (msg->msg_state) { case HTTP_MSG_DATA: - ret = FLT_STRM_CB(s, flt_http_data(s, msg), - /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next), - /* on_error */ goto error); - msg->next += ret; + ret = FLT_STRM_DATA_CB(s, chn, flt_http_data(s, msg), + /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next), + /* on_error */ goto error); + msg->next += ret; msg->chunk_len -= ret; if (msg->chunk_len) { /* input empty or output full */ @@ -6917,9 +6917,9 @@ http_msg_forward_chunked_body(struct stream *s, struct http_msg *msg) ret = http_forward_trailers(msg); if (ret < 0) goto chunk_parsing_error; - FLT_STRM_CB(s, flt_http_chunk_trailers(s, msg), - /* default_ret */ 1, - /* on_error */ goto error); + FLT_STRM_DATA_CB(s, chn, flt_http_chunk_trailers(s, msg), + /* default_ret */ 1, + /* on_error */ goto error); msg->next += msg->sol; msg->sol = 0; if (!ret) @@ -6938,7 +6938,7 @@ http_msg_forward_chunked_body(struct stream *s, struct http_msg *msg) ending: /* we may have some pending data starting at res->buf->p such as a last * chunk of data or trailers. */ - ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next), + ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next), /* default_ret */ msg->next, /* on_error */ goto error); b_adv(chn->buf, ret); @@ -6946,7 +6946,7 @@ http_msg_forward_chunked_body(struct stream *s, struct http_msg *msg) if (msg->next) goto missing_data_or_waiting; - FLT_STRM_CB(s, flt_http_end(s, msg), + FLT_STRM_DATA_CB(s, chn, flt_http_end(s, msg), /* default_ret */ 1, /* on_error */ goto error, /* on_wait */ goto waiting); @@ -6955,7 +6955,7 @@ http_msg_forward_chunked_body(struct stream *s, struct http_msg *msg) missing_data_or_waiting: /* we may have some pending data starting at chn->buf->p */ - ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next), + ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next), /* default_ret */ msg->next, /* on_error */ goto error); b_adv(chn->buf, ret);