From 14bfe9af12cfe38ae20dfb17ecb923cf6e6112bd Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 19 Dec 2018 15:19:27 +0100 Subject: [PATCH] CLEANUP: stream-int: consistently call the si/stream_int functions As long-time changes have accumulated over time, the exported functions of the stream-interface were almost all prefixed "si_" while most private ones (mostly callbacks) were called "stream_int_". There were still a few confusing exceptions, which were addressed to follow this shcme : - stream_sock_read0(), only used internally, was renamed stream_int_read0() and made static - stream_int_notify() is only private and was made static - stream_int_{check_timeouts,report_error,retnclose,register_handler,update} were renamed si_. Now it is clearer when checking one of these if it risks to be used outside or not. --- include/proto/stream_interface.h | 19 +++++-------- include/types/stream_interface.h | 2 +- src/cache.c | 2 +- src/cli.c | 2 +- src/hlua.c | 28 +++++++++---------- src/proto_http.c | 4 +-- src/proto_htx.c | 2 +- src/stream.c | 12 ++++---- src/stream_interface.c | 48 +++++++++++++++++++++----------- 9 files changed, 65 insertions(+), 54 deletions(-) diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 2a896afc2..f4ba6ee89 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -33,25 +33,20 @@ #include -/* main event functions used to move data between sockets and buffers */ -int stream_int_check_timeouts(struct stream_interface *si); -void stream_int_report_error(struct stream_interface *si); -void stream_int_retnclose(struct stream_interface *si, - const struct buffer *msg); -int conn_si_send_proxy(struct connection *conn, unsigned int flag); -void stream_sock_read0(struct stream_interface *si); - extern struct si_ops si_embedded_ops; extern struct si_ops si_conn_ops; extern struct si_ops si_applet_ops; extern struct data_cb si_conn_cb; -struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app); +/* main event functions used to move data between sockets and buffers */ +int si_check_timeouts(struct stream_interface *si); +void si_report_error(struct stream_interface *si); +void si_retnclose(struct stream_interface *si, const struct buffer *msg); +int conn_si_send_proxy(struct connection *conn, unsigned int flag); +struct appctx *si_register_handler(struct stream_interface *si, struct applet *app); void si_applet_wake_cb(struct stream_interface *si); -void stream_int_update(struct stream_interface *si); -void stream_int_notify(struct stream_interface *si); +void si_update(struct stream_interface *si); int si_cs_recv(struct conn_stream *cs); -int si_cs_send(struct conn_stream *cs); struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state); void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b); diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index c7334bf9f..eab1fba1d 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -94,7 +94,7 @@ enum { /* Note that if an applet is registered, the update function will not be called * by the session handler, so it may be used to resync flags at the end of the - * applet handler. See stream_int_update() for reference. + * applet handler. See si_update() for reference. */ struct stream_interface { /* struct members used by the "buffer" side */ diff --git a/src/cache.c b/src/cache.c index 032255224..020650a33 100644 --- a/src/cache.c +++ b/src/cache.c @@ -1384,7 +1384,7 @@ enum act_return http_action_req_cache_use(struct act_rule *rule, struct proxy *p shctx_row_inc_hot(shctx_ptr(cache), block_ptr(res)); shctx_unlock(shctx_ptr(cache)); s->target = &http_cache_applet.obj_type; - if ((appctx = stream_int_register_handler(&s->si[1], objt_applet(s->target)))) { + if ((appctx = si_register_handler(&s->si[1], objt_applet(s->target)))) { appctx->st0 = HTTP_CACHE_INIT; appctx->rule = rule; appctx->ctx.cache.entry = res; diff --git a/src/cli.c b/src/cli.c index cc377fc57..fba75138a 100644 --- a/src/cli.c +++ b/src/cli.c @@ -1749,7 +1749,7 @@ void pcli_reply_and_close(struct stream *s, const char *msg) struct buffer *buf = get_trash_chunk(); chunk_initstr(buf, msg); - stream_int_retnclose(&s->si[0], buf); + si_retnclose(&s->si[0], buf); } static enum obj_type *pcli_pid_to_server(int proc_pid) diff --git a/src/hlua.c b/src/hlua.c index 6efabeea1..a3886cfaa 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1638,7 +1638,7 @@ static void hlua_socket_handler(struct appctx *appctx) * interface. */ if (!channel_is_empty(si_ic(si))) - stream_int_update(si); + si_update(si); /* If write notifications are registered, we considers we want * to write, so we clear the blocking flag. @@ -6564,7 +6564,7 @@ static int hlua_sample_fetch_wrapper(const struct arg *arg_p, struct sample *smp /* finished. */ case HLUA_E_OK: if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) { - stream_int_retnclose(&stream->si[0], &msg); + si_retnclose(&stream->si[0], &msg); return 0; } /* If the stack is empty, the function fails. */ @@ -6582,14 +6582,14 @@ static int hlua_sample_fetch_wrapper(const struct arg *arg_p, struct sample *smp /* yield. */ case HLUA_E_AGAIN: if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) - stream_int_retnclose(&stream->si[0], &msg); + si_retnclose(&stream->si[0], &msg); SEND_ERR(smp->px, "Lua sample-fetch '%s': cannot use yielded functions.\n", fcn->name); return 0; /* finished with error. */ case HLUA_E_ERRMSG: if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) - stream_int_retnclose(&stream->si[0], &msg); + si_retnclose(&stream->si[0], &msg); /* Display log. */ SEND_ERR(smp->px, "Lua sample-fetch '%s': %s.\n", fcn->name, lua_tostring(stream->hlua->T, -1)); @@ -6598,25 +6598,25 @@ static int hlua_sample_fetch_wrapper(const struct arg *arg_p, struct sample *smp case HLUA_E_ETMOUT: if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) - stream_int_retnclose(&stream->si[0], &msg); + si_retnclose(&stream->si[0], &msg); SEND_ERR(smp->px, "Lua sample-fetch '%s': execution timeout.\n", fcn->name); return 0; case HLUA_E_NOMEM: if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) - stream_int_retnclose(&stream->si[0], &msg); + si_retnclose(&stream->si[0], &msg); SEND_ERR(smp->px, "Lua sample-fetch '%s': out of memory error.\n", fcn->name); return 0; case HLUA_E_YIELD: if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) - stream_int_retnclose(&stream->si[0], &msg); + si_retnclose(&stream->si[0], &msg); SEND_ERR(smp->px, "Lua sample-fetch '%s': yield not allowed.\n", fcn->name); return 0; case HLUA_E_ERR: if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) - stream_int_retnclose(&stream->si[0], &msg); + si_retnclose(&stream->si[0], &msg); /* Display log. */ SEND_ERR(smp->px, "Lua sample-fetch '%s' returns an unknown error.\n", fcn->name); @@ -6844,7 +6844,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px, /* finished. */ case HLUA_E_OK: if (!consistency_check(s, dir, &s->hlua->cons)) { - stream_int_retnclose(&s->si[0], &msg); + si_retnclose(&s->si[0], &msg); return ACT_RET_ERR; } if (s->hlua->flags & HLUA_STOP) @@ -6879,7 +6879,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px, /* finished with error. */ case HLUA_E_ERRMSG: if (!consistency_check(s, dir, &s->hlua->cons)) { - stream_int_retnclose(&s->si[0], &msg); + si_retnclose(&s->si[0], &msg); return ACT_RET_ERR; } /* Display log. */ @@ -6890,7 +6890,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px, case HLUA_E_ETMOUT: if (!consistency_check(s, dir, &s->hlua->cons)) { - stream_int_retnclose(&s->si[0], &msg); + si_retnclose(&s->si[0], &msg); return ACT_RET_ERR; } SEND_ERR(px, "Lua function '%s': execution timeout.\n", rule->arg.hlua_rule->fcn.name); @@ -6898,7 +6898,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px, case HLUA_E_NOMEM: if (!consistency_check(s, dir, &s->hlua->cons)) { - stream_int_retnclose(&s->si[0], &msg); + si_retnclose(&s->si[0], &msg); return ACT_RET_ERR; } SEND_ERR(px, "Lua function '%s': out of memory error.\n", rule->arg.hlua_rule->fcn.name); @@ -6906,7 +6906,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px, case HLUA_E_YIELD: if (!consistency_check(s, dir, &s->hlua->cons)) { - stream_int_retnclose(&s->si[0], &msg); + si_retnclose(&s->si[0], &msg); return ACT_RET_ERR; } SEND_ERR(px, "Lua function '%s': aborting Lua processing on expired timeout.\n", @@ -6915,7 +6915,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px, case HLUA_E_ERR: if (!consistency_check(s, dir, &s->hlua->cons)) { - stream_int_retnclose(&s->si[0], &msg); + si_retnclose(&s->si[0], &msg); return ACT_RET_ERR; } /* Display log. */ diff --git a/src/proto_http.c b/src/proto_http.c index 100a7baf3..71609f228 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -140,7 +140,7 @@ http_reply_and_close(struct stream *s, short status, struct buffer *msg) s->txn->flags &= ~TX_WAIT_NEXT_RQ; FLT_STRM_CB(s, flt_http_reply(s, status, msg)); - stream_int_retnclose(&s->si[0], msg); + si_retnclose(&s->si[0], msg); } /* Parse the URI from the given transaction (which is assumed to be in request @@ -2577,7 +2577,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s */ if (stats_check_uri(&s->si[1], txn, px)) { s->target = &http_stats_applet.obj_type; - if (unlikely(!stream_int_register_handler(&s->si[1], objt_applet(s->target)))) { + if (unlikely(!si_register_handler(&s->si[1], objt_applet(s->target)))) { txn->status = 500; s->logs.tv_request = now; http_reply_and_close(s, txn->status, http_error_message(s)); diff --git a/src/proto_htx.c b/src/proto_htx.c index 86bf38eb9..4573c6891 100644 --- a/src/proto_htx.c +++ b/src/proto_htx.c @@ -548,7 +548,7 @@ int htx_process_req_common(struct stream *s, struct channel *req, int an_bit, st */ if (htx_stats_check_uri(s, txn, px)) { s->target = &http_stats_applet.obj_type; - if (unlikely(!stream_int_register_handler(&s->si[1], objt_applet(s->target)))) { + if (unlikely(!si_register_handler(&s->si[1], objt_applet(s->target)))) { txn->status = 500; s->logs.tv_request = now; htx_reply_and_close(s, txn->status, htx_error_message(s)); diff --git a/src/stream.c b/src/stream.c index eec2d0a18..ac323b1e7 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1127,7 +1127,7 @@ static void sess_prepare_conn_req(struct stream *s) struct appctx *appctx = objt_appctx(si->end); if (!appctx || appctx->applet != __objt_applet(s->target)) - appctx = stream_int_register_handler(si, objt_applet(s->target)); + appctx = si_register_handler(si, objt_applet(s->target)); if (!appctx) { /* No more memory, let's immediately abort. Force the @@ -1199,7 +1199,7 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px, if (flags & ACT_FLAG_FIRST) { /* Register applet. this function schedules the applet. */ s->target = &rule->applet.obj_type; - if (unlikely(!stream_int_register_handler(&s->si[1], objt_applet(s->target)))) + if (unlikely(!si_register_handler(&s->si[1], objt_applet(s->target)))) return ACT_RET_ERR; /* Initialise the context. */ @@ -1733,8 +1733,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) * stream interfaces when their timeouts have expired. */ if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) { - stream_int_check_timeouts(si_f); - stream_int_check_timeouts(si_b); + si_check_timeouts(si_f); + si_check_timeouts(si_b); /* check channel timeouts, and close the corresponding stream interfaces * for future reads or writes. Note: this will also concern upper layers @@ -1811,7 +1811,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) if (si_f->state == SI_ST_EST || si_f->state == SI_ST_DIS) { si_shutr(si_f); si_shutw(si_f); - stream_int_report_error(si_f); + si_report_error(si_f); if (!(req->analysers) && !(res->analysers)) { HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); @@ -1829,7 +1829,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) if (si_b->state == SI_ST_EST || si_b->state == SI_ST_DIS) { si_shutr(si_b); si_shutw(si_b); - stream_int_report_error(si_b); + si_report_error(si_b); HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (srv) HA_ATOMIC_ADD(&srv->counters.failed_resp, 1); diff --git a/src/stream_interface.c b/src/stream_interface.c index 53d030d8e..d8fd6bdcc 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -38,22 +38,29 @@ #include -/* socket functions used when running a stream interface as a task */ +/* functions used by default on a detached stream-interface */ static void stream_int_shutr(struct stream_interface *si); static void stream_int_shutw(struct stream_interface *si); static void stream_int_chk_rcv(struct stream_interface *si); static void stream_int_chk_snd(struct stream_interface *si); + +/* functions used on a conn_stream-based stream-interface */ static void stream_int_shutr_conn(struct stream_interface *si); static void stream_int_shutw_conn(struct stream_interface *si); static void stream_int_chk_rcv_conn(struct stream_interface *si); static void stream_int_chk_snd_conn(struct stream_interface *si); + +/* functions used on an applet-based stream-interface */ static void stream_int_shutr_applet(struct stream_interface *si); static void stream_int_shutw_applet(struct stream_interface *si); static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si); -int si_cs_recv(struct conn_stream *cs); -static int si_cs_process(struct conn_stream *cs); -int si_cs_send(struct conn_stream *cs); + +/* last read notification */ +static void stream_int_read0(struct stream_interface *si); + +/* post-IO notification callback */ +static void stream_int_notify(struct stream_interface *si); /* stream-interface operations for embedded tasks */ struct si_ops si_embedded_ops = { @@ -79,6 +86,15 @@ struct si_ops si_applet_ops = { .shutw = stream_int_shutw_applet, }; + +/* Functions used to communicate with a conn_stream. The first two may be used + * directly, the last one is mostly a wake callback. + */ +int si_cs_recv(struct conn_stream *cs); +int si_cs_send(struct conn_stream *cs); +static int si_cs_process(struct conn_stream *cs); + + struct data_cb si_conn_cb = { .wake = si_cs_process, .name = "STRM", @@ -91,7 +107,7 @@ struct data_cb si_conn_cb = { * be used for any purpose. It returns 1 if the timeout fired, otherwise * zero. */ -int stream_int_check_timeouts(struct stream_interface *si) +int si_check_timeouts(struct stream_interface *si) { if (tick_is_expired(si->exp, now_ms)) { si->flags |= SI_FL_EXP; @@ -101,7 +117,7 @@ int stream_int_check_timeouts(struct stream_interface *si) } /* to be called only when in SI_ST_DIS with SI_FL_ERR */ -void stream_int_report_error(struct stream_interface *si) +void si_report_error(struct stream_interface *si) { if (!si->err_type) si->err_type = SI_ET_DATA_ERR; @@ -119,7 +135,7 @@ void stream_int_report_error(struct stream_interface *si) * not need to be empty before this, and its contents will not be overwritten. * The primary goal of this function is to return error messages to a client. */ -void stream_int_retnclose(struct stream_interface *si, +void si_retnclose(struct stream_interface *si, const struct buffer *msg) { struct channel *ic = si_ic(si); @@ -286,7 +302,7 @@ static void stream_int_chk_snd(struct stream_interface *si) * It also pre-initializes the applet's context and returns it (or NULL in case * it could not be allocated). */ -struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app) +struct appctx *si_register_handler(struct stream_interface *si, struct applet *app) { struct appctx *appctx; @@ -405,17 +421,17 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) } -/* This function is the equivalent to stream_int_update() except that it's +/* This function is the equivalent to si_update() except that it's * designed to be called from outside the stream handlers, typically the lower * layers (applets, connections) after I/O completion. After updating the stream * interface and timeouts, it will try to forward what can be forwarded, then to * wake the associated task up if an important event requires special handling. * It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are * encouraged to watch to take appropriate action. - * It should not be called from within the stream itself, stream_int_update() + * It should not be called from within the stream itself, si_update() * is designed for this. */ -void stream_int_notify(struct stream_interface *si) +static void stream_int_notify(struct stream_interface *si) { struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); @@ -716,7 +732,7 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state) * performance). It must not be called from outside of the stream handler, * as what it does will be used to compute the stream task's expiration. */ -void stream_int_update(struct stream_interface *si) +void si_update(struct stream_interface *si) { struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); @@ -830,10 +846,10 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b /* let's recompute both sides states */ if (si_f->state == SI_ST_EST) - stream_int_update(si_f); + si_update(si_f); if (si_b->state == SI_ST_EST) - stream_int_update(si_b); + si_update(si_b); /* stream ints are processed outside of process_stream() and must be * handled at the latest moment. @@ -1349,7 +1365,7 @@ int si_cs_recv(struct conn_stream *cs) ic->flags |= CF_READ_NULL; if (ic->flags & CF_AUTO_CLOSE) channel_shutw_now(ic); - stream_sock_read0(si); + stream_int_read0(si); } return 1; } @@ -1359,7 +1375,7 @@ int si_cs_recv(struct conn_stream *cs) * It updates the stream interface. If the stream interface has SI_FL_NOHALF, * the close is also forwarded to the write side as an abort. */ -void stream_sock_read0(struct stream_interface *si) +static void stream_int_read0(struct stream_interface *si) { struct conn_stream *cs = __objt_cs(si->end); struct channel *ic = si_ic(si);