mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 15:47:01 +02:00
CLEANUP: stream-int: consistently call the si/stream_int functions
As long-time changes have accumulated over time, the exported functions of the stream-interface were almost all prefixed "si_<something>" while most private ones (mostly callbacks) were called "stream_int_<something>". There were still a few confusing exceptions, which were addressed to follow this shcme : - stream_sock_read0(), only used internally, was renamed stream_int_read0() and made static - stream_int_notify() is only private and was made static - stream_int_{check_timeouts,report_error,retnclose,register_handler,update} were renamed si_<something>. Now it is clearer when checking one of these if it risks to be used outside or not.
This commit is contained in:
parent
94031d30d7
commit
14bfe9af12
@ -33,25 +33,20 @@
|
|||||||
#include <proto/connection.h>
|
#include <proto/connection.h>
|
||||||
|
|
||||||
|
|
||||||
/* main event functions used to move data between sockets and buffers */
|
|
||||||
int stream_int_check_timeouts(struct stream_interface *si);
|
|
||||||
void stream_int_report_error(struct stream_interface *si);
|
|
||||||
void stream_int_retnclose(struct stream_interface *si,
|
|
||||||
const struct buffer *msg);
|
|
||||||
int conn_si_send_proxy(struct connection *conn, unsigned int flag);
|
|
||||||
void stream_sock_read0(struct stream_interface *si);
|
|
||||||
|
|
||||||
extern struct si_ops si_embedded_ops;
|
extern struct si_ops si_embedded_ops;
|
||||||
extern struct si_ops si_conn_ops;
|
extern struct si_ops si_conn_ops;
|
||||||
extern struct si_ops si_applet_ops;
|
extern struct si_ops si_applet_ops;
|
||||||
extern struct data_cb si_conn_cb;
|
extern struct data_cb si_conn_cb;
|
||||||
|
|
||||||
struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app);
|
/* main event functions used to move data between sockets and buffers */
|
||||||
|
int si_check_timeouts(struct stream_interface *si);
|
||||||
|
void si_report_error(struct stream_interface *si);
|
||||||
|
void si_retnclose(struct stream_interface *si, const struct buffer *msg);
|
||||||
|
int conn_si_send_proxy(struct connection *conn, unsigned int flag);
|
||||||
|
struct appctx *si_register_handler(struct stream_interface *si, struct applet *app);
|
||||||
void si_applet_wake_cb(struct stream_interface *si);
|
void si_applet_wake_cb(struct stream_interface *si);
|
||||||
void stream_int_update(struct stream_interface *si);
|
void si_update(struct stream_interface *si);
|
||||||
void stream_int_notify(struct stream_interface *si);
|
|
||||||
int si_cs_recv(struct conn_stream *cs);
|
int si_cs_recv(struct conn_stream *cs);
|
||||||
int si_cs_send(struct conn_stream *cs);
|
|
||||||
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state);
|
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state);
|
||||||
void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b);
|
void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b);
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ enum {
|
|||||||
|
|
||||||
/* Note that if an applet is registered, the update function will not be called
|
/* Note that if an applet is registered, the update function will not be called
|
||||||
* by the session handler, so it may be used to resync flags at the end of the
|
* by the session handler, so it may be used to resync flags at the end of the
|
||||||
* applet handler. See stream_int_update() for reference.
|
* applet handler. See si_update() for reference.
|
||||||
*/
|
*/
|
||||||
struct stream_interface {
|
struct stream_interface {
|
||||||
/* struct members used by the "buffer" side */
|
/* struct members used by the "buffer" side */
|
||||||
|
@ -1384,7 +1384,7 @@ enum act_return http_action_req_cache_use(struct act_rule *rule, struct proxy *p
|
|||||||
shctx_row_inc_hot(shctx_ptr(cache), block_ptr(res));
|
shctx_row_inc_hot(shctx_ptr(cache), block_ptr(res));
|
||||||
shctx_unlock(shctx_ptr(cache));
|
shctx_unlock(shctx_ptr(cache));
|
||||||
s->target = &http_cache_applet.obj_type;
|
s->target = &http_cache_applet.obj_type;
|
||||||
if ((appctx = stream_int_register_handler(&s->si[1], objt_applet(s->target)))) {
|
if ((appctx = si_register_handler(&s->si[1], objt_applet(s->target)))) {
|
||||||
appctx->st0 = HTTP_CACHE_INIT;
|
appctx->st0 = HTTP_CACHE_INIT;
|
||||||
appctx->rule = rule;
|
appctx->rule = rule;
|
||||||
appctx->ctx.cache.entry = res;
|
appctx->ctx.cache.entry = res;
|
||||||
|
@ -1749,7 +1749,7 @@ void pcli_reply_and_close(struct stream *s, const char *msg)
|
|||||||
struct buffer *buf = get_trash_chunk();
|
struct buffer *buf = get_trash_chunk();
|
||||||
|
|
||||||
chunk_initstr(buf, msg);
|
chunk_initstr(buf, msg);
|
||||||
stream_int_retnclose(&s->si[0], buf);
|
si_retnclose(&s->si[0], buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static enum obj_type *pcli_pid_to_server(int proc_pid)
|
static enum obj_type *pcli_pid_to_server(int proc_pid)
|
||||||
|
28
src/hlua.c
28
src/hlua.c
@ -1638,7 +1638,7 @@ static void hlua_socket_handler(struct appctx *appctx)
|
|||||||
* interface.
|
* interface.
|
||||||
*/
|
*/
|
||||||
if (!channel_is_empty(si_ic(si)))
|
if (!channel_is_empty(si_ic(si)))
|
||||||
stream_int_update(si);
|
si_update(si);
|
||||||
|
|
||||||
/* If write notifications are registered, we considers we want
|
/* If write notifications are registered, we considers we want
|
||||||
* to write, so we clear the blocking flag.
|
* to write, so we clear the blocking flag.
|
||||||
@ -6564,7 +6564,7 @@ static int hlua_sample_fetch_wrapper(const struct arg *arg_p, struct sample *smp
|
|||||||
/* finished. */
|
/* finished. */
|
||||||
case HLUA_E_OK:
|
case HLUA_E_OK:
|
||||||
if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) {
|
if (!consistency_check(stream, smp->opt, &stream->hlua->cons)) {
|
||||||
stream_int_retnclose(&stream->si[0], &msg);
|
si_retnclose(&stream->si[0], &msg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
/* If the stack is empty, the function fails. */
|
/* If the stack is empty, the function fails. */
|
||||||
@ -6582,14 +6582,14 @@ static int hlua_sample_fetch_wrapper(const struct arg *arg_p, struct sample *smp
|
|||||||
/* yield. */
|
/* yield. */
|
||||||
case HLUA_E_AGAIN:
|
case HLUA_E_AGAIN:
|
||||||
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
||||||
stream_int_retnclose(&stream->si[0], &msg);
|
si_retnclose(&stream->si[0], &msg);
|
||||||
SEND_ERR(smp->px, "Lua sample-fetch '%s': cannot use yielded functions.\n", fcn->name);
|
SEND_ERR(smp->px, "Lua sample-fetch '%s': cannot use yielded functions.\n", fcn->name);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
/* finished with error. */
|
/* finished with error. */
|
||||||
case HLUA_E_ERRMSG:
|
case HLUA_E_ERRMSG:
|
||||||
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
||||||
stream_int_retnclose(&stream->si[0], &msg);
|
si_retnclose(&stream->si[0], &msg);
|
||||||
/* Display log. */
|
/* Display log. */
|
||||||
SEND_ERR(smp->px, "Lua sample-fetch '%s': %s.\n",
|
SEND_ERR(smp->px, "Lua sample-fetch '%s': %s.\n",
|
||||||
fcn->name, lua_tostring(stream->hlua->T, -1));
|
fcn->name, lua_tostring(stream->hlua->T, -1));
|
||||||
@ -6598,25 +6598,25 @@ static int hlua_sample_fetch_wrapper(const struct arg *arg_p, struct sample *smp
|
|||||||
|
|
||||||
case HLUA_E_ETMOUT:
|
case HLUA_E_ETMOUT:
|
||||||
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
||||||
stream_int_retnclose(&stream->si[0], &msg);
|
si_retnclose(&stream->si[0], &msg);
|
||||||
SEND_ERR(smp->px, "Lua sample-fetch '%s': execution timeout.\n", fcn->name);
|
SEND_ERR(smp->px, "Lua sample-fetch '%s': execution timeout.\n", fcn->name);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
case HLUA_E_NOMEM:
|
case HLUA_E_NOMEM:
|
||||||
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
||||||
stream_int_retnclose(&stream->si[0], &msg);
|
si_retnclose(&stream->si[0], &msg);
|
||||||
SEND_ERR(smp->px, "Lua sample-fetch '%s': out of memory error.\n", fcn->name);
|
SEND_ERR(smp->px, "Lua sample-fetch '%s': out of memory error.\n", fcn->name);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
case HLUA_E_YIELD:
|
case HLUA_E_YIELD:
|
||||||
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
||||||
stream_int_retnclose(&stream->si[0], &msg);
|
si_retnclose(&stream->si[0], &msg);
|
||||||
SEND_ERR(smp->px, "Lua sample-fetch '%s': yield not allowed.\n", fcn->name);
|
SEND_ERR(smp->px, "Lua sample-fetch '%s': yield not allowed.\n", fcn->name);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
case HLUA_E_ERR:
|
case HLUA_E_ERR:
|
||||||
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
if (!consistency_check(stream, smp->opt, &stream->hlua->cons))
|
||||||
stream_int_retnclose(&stream->si[0], &msg);
|
si_retnclose(&stream->si[0], &msg);
|
||||||
/* Display log. */
|
/* Display log. */
|
||||||
SEND_ERR(smp->px, "Lua sample-fetch '%s' returns an unknown error.\n", fcn->name);
|
SEND_ERR(smp->px, "Lua sample-fetch '%s' returns an unknown error.\n", fcn->name);
|
||||||
|
|
||||||
@ -6844,7 +6844,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px,
|
|||||||
/* finished. */
|
/* finished. */
|
||||||
case HLUA_E_OK:
|
case HLUA_E_OK:
|
||||||
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
||||||
stream_int_retnclose(&s->si[0], &msg);
|
si_retnclose(&s->si[0], &msg);
|
||||||
return ACT_RET_ERR;
|
return ACT_RET_ERR;
|
||||||
}
|
}
|
||||||
if (s->hlua->flags & HLUA_STOP)
|
if (s->hlua->flags & HLUA_STOP)
|
||||||
@ -6879,7 +6879,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px,
|
|||||||
/* finished with error. */
|
/* finished with error. */
|
||||||
case HLUA_E_ERRMSG:
|
case HLUA_E_ERRMSG:
|
||||||
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
||||||
stream_int_retnclose(&s->si[0], &msg);
|
si_retnclose(&s->si[0], &msg);
|
||||||
return ACT_RET_ERR;
|
return ACT_RET_ERR;
|
||||||
}
|
}
|
||||||
/* Display log. */
|
/* Display log. */
|
||||||
@ -6890,7 +6890,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px,
|
|||||||
|
|
||||||
case HLUA_E_ETMOUT:
|
case HLUA_E_ETMOUT:
|
||||||
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
||||||
stream_int_retnclose(&s->si[0], &msg);
|
si_retnclose(&s->si[0], &msg);
|
||||||
return ACT_RET_ERR;
|
return ACT_RET_ERR;
|
||||||
}
|
}
|
||||||
SEND_ERR(px, "Lua function '%s': execution timeout.\n", rule->arg.hlua_rule->fcn.name);
|
SEND_ERR(px, "Lua function '%s': execution timeout.\n", rule->arg.hlua_rule->fcn.name);
|
||||||
@ -6898,7 +6898,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px,
|
|||||||
|
|
||||||
case HLUA_E_NOMEM:
|
case HLUA_E_NOMEM:
|
||||||
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
||||||
stream_int_retnclose(&s->si[0], &msg);
|
si_retnclose(&s->si[0], &msg);
|
||||||
return ACT_RET_ERR;
|
return ACT_RET_ERR;
|
||||||
}
|
}
|
||||||
SEND_ERR(px, "Lua function '%s': out of memory error.\n", rule->arg.hlua_rule->fcn.name);
|
SEND_ERR(px, "Lua function '%s': out of memory error.\n", rule->arg.hlua_rule->fcn.name);
|
||||||
@ -6906,7 +6906,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px,
|
|||||||
|
|
||||||
case HLUA_E_YIELD:
|
case HLUA_E_YIELD:
|
||||||
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
||||||
stream_int_retnclose(&s->si[0], &msg);
|
si_retnclose(&s->si[0], &msg);
|
||||||
return ACT_RET_ERR;
|
return ACT_RET_ERR;
|
||||||
}
|
}
|
||||||
SEND_ERR(px, "Lua function '%s': aborting Lua processing on expired timeout.\n",
|
SEND_ERR(px, "Lua function '%s': aborting Lua processing on expired timeout.\n",
|
||||||
@ -6915,7 +6915,7 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px,
|
|||||||
|
|
||||||
case HLUA_E_ERR:
|
case HLUA_E_ERR:
|
||||||
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
if (!consistency_check(s, dir, &s->hlua->cons)) {
|
||||||
stream_int_retnclose(&s->si[0], &msg);
|
si_retnclose(&s->si[0], &msg);
|
||||||
return ACT_RET_ERR;
|
return ACT_RET_ERR;
|
||||||
}
|
}
|
||||||
/* Display log. */
|
/* Display log. */
|
||||||
|
@ -140,7 +140,7 @@ http_reply_and_close(struct stream *s, short status, struct buffer *msg)
|
|||||||
|
|
||||||
s->txn->flags &= ~TX_WAIT_NEXT_RQ;
|
s->txn->flags &= ~TX_WAIT_NEXT_RQ;
|
||||||
FLT_STRM_CB(s, flt_http_reply(s, status, msg));
|
FLT_STRM_CB(s, flt_http_reply(s, status, msg));
|
||||||
stream_int_retnclose(&s->si[0], msg);
|
si_retnclose(&s->si[0], msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Parse the URI from the given transaction (which is assumed to be in request
|
/* Parse the URI from the given transaction (which is assumed to be in request
|
||||||
@ -2577,7 +2577,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
|
|||||||
*/
|
*/
|
||||||
if (stats_check_uri(&s->si[1], txn, px)) {
|
if (stats_check_uri(&s->si[1], txn, px)) {
|
||||||
s->target = &http_stats_applet.obj_type;
|
s->target = &http_stats_applet.obj_type;
|
||||||
if (unlikely(!stream_int_register_handler(&s->si[1], objt_applet(s->target)))) {
|
if (unlikely(!si_register_handler(&s->si[1], objt_applet(s->target)))) {
|
||||||
txn->status = 500;
|
txn->status = 500;
|
||||||
s->logs.tv_request = now;
|
s->logs.tv_request = now;
|
||||||
http_reply_and_close(s, txn->status, http_error_message(s));
|
http_reply_and_close(s, txn->status, http_error_message(s));
|
||||||
|
@ -548,7 +548,7 @@ int htx_process_req_common(struct stream *s, struct channel *req, int an_bit, st
|
|||||||
*/
|
*/
|
||||||
if (htx_stats_check_uri(s, txn, px)) {
|
if (htx_stats_check_uri(s, txn, px)) {
|
||||||
s->target = &http_stats_applet.obj_type;
|
s->target = &http_stats_applet.obj_type;
|
||||||
if (unlikely(!stream_int_register_handler(&s->si[1], objt_applet(s->target)))) {
|
if (unlikely(!si_register_handler(&s->si[1], objt_applet(s->target)))) {
|
||||||
txn->status = 500;
|
txn->status = 500;
|
||||||
s->logs.tv_request = now;
|
s->logs.tv_request = now;
|
||||||
htx_reply_and_close(s, txn->status, htx_error_message(s));
|
htx_reply_and_close(s, txn->status, htx_error_message(s));
|
||||||
|
12
src/stream.c
12
src/stream.c
@ -1127,7 +1127,7 @@ static void sess_prepare_conn_req(struct stream *s)
|
|||||||
struct appctx *appctx = objt_appctx(si->end);
|
struct appctx *appctx = objt_appctx(si->end);
|
||||||
|
|
||||||
if (!appctx || appctx->applet != __objt_applet(s->target))
|
if (!appctx || appctx->applet != __objt_applet(s->target))
|
||||||
appctx = stream_int_register_handler(si, objt_applet(s->target));
|
appctx = si_register_handler(si, objt_applet(s->target));
|
||||||
|
|
||||||
if (!appctx) {
|
if (!appctx) {
|
||||||
/* No more memory, let's immediately abort. Force the
|
/* No more memory, let's immediately abort. Force the
|
||||||
@ -1199,7 +1199,7 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px,
|
|||||||
if (flags & ACT_FLAG_FIRST) {
|
if (flags & ACT_FLAG_FIRST) {
|
||||||
/* Register applet. this function schedules the applet. */
|
/* Register applet. this function schedules the applet. */
|
||||||
s->target = &rule->applet.obj_type;
|
s->target = &rule->applet.obj_type;
|
||||||
if (unlikely(!stream_int_register_handler(&s->si[1], objt_applet(s->target))))
|
if (unlikely(!si_register_handler(&s->si[1], objt_applet(s->target))))
|
||||||
return ACT_RET_ERR;
|
return ACT_RET_ERR;
|
||||||
|
|
||||||
/* Initialise the context. */
|
/* Initialise the context. */
|
||||||
@ -1733,8 +1733,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
* stream interfaces when their timeouts have expired.
|
* stream interfaces when their timeouts have expired.
|
||||||
*/
|
*/
|
||||||
if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
|
if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
|
||||||
stream_int_check_timeouts(si_f);
|
si_check_timeouts(si_f);
|
||||||
stream_int_check_timeouts(si_b);
|
si_check_timeouts(si_b);
|
||||||
|
|
||||||
/* check channel timeouts, and close the corresponding stream interfaces
|
/* check channel timeouts, and close the corresponding stream interfaces
|
||||||
* for future reads or writes. Note: this will also concern upper layers
|
* for future reads or writes. Note: this will also concern upper layers
|
||||||
@ -1811,7 +1811,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
if (si_f->state == SI_ST_EST || si_f->state == SI_ST_DIS) {
|
if (si_f->state == SI_ST_EST || si_f->state == SI_ST_DIS) {
|
||||||
si_shutr(si_f);
|
si_shutr(si_f);
|
||||||
si_shutw(si_f);
|
si_shutw(si_f);
|
||||||
stream_int_report_error(si_f);
|
si_report_error(si_f);
|
||||||
if (!(req->analysers) && !(res->analysers)) {
|
if (!(req->analysers) && !(res->analysers)) {
|
||||||
HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
|
||||||
HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
|
||||||
@ -1829,7 +1829,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
|||||||
if (si_b->state == SI_ST_EST || si_b->state == SI_ST_DIS) {
|
if (si_b->state == SI_ST_EST || si_b->state == SI_ST_DIS) {
|
||||||
si_shutr(si_b);
|
si_shutr(si_b);
|
||||||
si_shutw(si_b);
|
si_shutw(si_b);
|
||||||
stream_int_report_error(si_b);
|
si_report_error(si_b);
|
||||||
HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
|
HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
|
||||||
if (srv)
|
if (srv)
|
||||||
HA_ATOMIC_ADD(&srv->counters.failed_resp, 1);
|
HA_ATOMIC_ADD(&srv->counters.failed_resp, 1);
|
||||||
|
@ -38,22 +38,29 @@
|
|||||||
|
|
||||||
#include <types/pipe.h>
|
#include <types/pipe.h>
|
||||||
|
|
||||||
/* socket functions used when running a stream interface as a task */
|
/* functions used by default on a detached stream-interface */
|
||||||
static void stream_int_shutr(struct stream_interface *si);
|
static void stream_int_shutr(struct stream_interface *si);
|
||||||
static void stream_int_shutw(struct stream_interface *si);
|
static void stream_int_shutw(struct stream_interface *si);
|
||||||
static void stream_int_chk_rcv(struct stream_interface *si);
|
static void stream_int_chk_rcv(struct stream_interface *si);
|
||||||
static void stream_int_chk_snd(struct stream_interface *si);
|
static void stream_int_chk_snd(struct stream_interface *si);
|
||||||
|
|
||||||
|
/* functions used on a conn_stream-based stream-interface */
|
||||||
static void stream_int_shutr_conn(struct stream_interface *si);
|
static void stream_int_shutr_conn(struct stream_interface *si);
|
||||||
static void stream_int_shutw_conn(struct stream_interface *si);
|
static void stream_int_shutw_conn(struct stream_interface *si);
|
||||||
static void stream_int_chk_rcv_conn(struct stream_interface *si);
|
static void stream_int_chk_rcv_conn(struct stream_interface *si);
|
||||||
static void stream_int_chk_snd_conn(struct stream_interface *si);
|
static void stream_int_chk_snd_conn(struct stream_interface *si);
|
||||||
|
|
||||||
|
/* functions used on an applet-based stream-interface */
|
||||||
static void stream_int_shutr_applet(struct stream_interface *si);
|
static void stream_int_shutr_applet(struct stream_interface *si);
|
||||||
static void stream_int_shutw_applet(struct stream_interface *si);
|
static void stream_int_shutw_applet(struct stream_interface *si);
|
||||||
static void stream_int_chk_rcv_applet(struct stream_interface *si);
|
static void stream_int_chk_rcv_applet(struct stream_interface *si);
|
||||||
static void stream_int_chk_snd_applet(struct stream_interface *si);
|
static void stream_int_chk_snd_applet(struct stream_interface *si);
|
||||||
int si_cs_recv(struct conn_stream *cs);
|
|
||||||
static int si_cs_process(struct conn_stream *cs);
|
/* last read notification */
|
||||||
int si_cs_send(struct conn_stream *cs);
|
static void stream_int_read0(struct stream_interface *si);
|
||||||
|
|
||||||
|
/* post-IO notification callback */
|
||||||
|
static void stream_int_notify(struct stream_interface *si);
|
||||||
|
|
||||||
/* stream-interface operations for embedded tasks */
|
/* stream-interface operations for embedded tasks */
|
||||||
struct si_ops si_embedded_ops = {
|
struct si_ops si_embedded_ops = {
|
||||||
@ -79,6 +86,15 @@ struct si_ops si_applet_ops = {
|
|||||||
.shutw = stream_int_shutw_applet,
|
.shutw = stream_int_shutw_applet,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/* Functions used to communicate with a conn_stream. The first two may be used
|
||||||
|
* directly, the last one is mostly a wake callback.
|
||||||
|
*/
|
||||||
|
int si_cs_recv(struct conn_stream *cs);
|
||||||
|
int si_cs_send(struct conn_stream *cs);
|
||||||
|
static int si_cs_process(struct conn_stream *cs);
|
||||||
|
|
||||||
|
|
||||||
struct data_cb si_conn_cb = {
|
struct data_cb si_conn_cb = {
|
||||||
.wake = si_cs_process,
|
.wake = si_cs_process,
|
||||||
.name = "STRM",
|
.name = "STRM",
|
||||||
@ -91,7 +107,7 @@ struct data_cb si_conn_cb = {
|
|||||||
* be used for any purpose. It returns 1 if the timeout fired, otherwise
|
* be used for any purpose. It returns 1 if the timeout fired, otherwise
|
||||||
* zero.
|
* zero.
|
||||||
*/
|
*/
|
||||||
int stream_int_check_timeouts(struct stream_interface *si)
|
int si_check_timeouts(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
if (tick_is_expired(si->exp, now_ms)) {
|
if (tick_is_expired(si->exp, now_ms)) {
|
||||||
si->flags |= SI_FL_EXP;
|
si->flags |= SI_FL_EXP;
|
||||||
@ -101,7 +117,7 @@ int stream_int_check_timeouts(struct stream_interface *si)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* to be called only when in SI_ST_DIS with SI_FL_ERR */
|
/* to be called only when in SI_ST_DIS with SI_FL_ERR */
|
||||||
void stream_int_report_error(struct stream_interface *si)
|
void si_report_error(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
if (!si->err_type)
|
if (!si->err_type)
|
||||||
si->err_type = SI_ET_DATA_ERR;
|
si->err_type = SI_ET_DATA_ERR;
|
||||||
@ -119,7 +135,7 @@ void stream_int_report_error(struct stream_interface *si)
|
|||||||
* not need to be empty before this, and its contents will not be overwritten.
|
* not need to be empty before this, and its contents will not be overwritten.
|
||||||
* The primary goal of this function is to return error messages to a client.
|
* The primary goal of this function is to return error messages to a client.
|
||||||
*/
|
*/
|
||||||
void stream_int_retnclose(struct stream_interface *si,
|
void si_retnclose(struct stream_interface *si,
|
||||||
const struct buffer *msg)
|
const struct buffer *msg)
|
||||||
{
|
{
|
||||||
struct channel *ic = si_ic(si);
|
struct channel *ic = si_ic(si);
|
||||||
@ -286,7 +302,7 @@ static void stream_int_chk_snd(struct stream_interface *si)
|
|||||||
* It also pre-initializes the applet's context and returns it (or NULL in case
|
* It also pre-initializes the applet's context and returns it (or NULL in case
|
||||||
* it could not be allocated).
|
* it could not be allocated).
|
||||||
*/
|
*/
|
||||||
struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app)
|
struct appctx *si_register_handler(struct stream_interface *si, struct applet *app)
|
||||||
{
|
{
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
|
|
||||||
@ -405,17 +421,17 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* This function is the equivalent to stream_int_update() except that it's
|
/* This function is the equivalent to si_update() except that it's
|
||||||
* designed to be called from outside the stream handlers, typically the lower
|
* designed to be called from outside the stream handlers, typically the lower
|
||||||
* layers (applets, connections) after I/O completion. After updating the stream
|
* layers (applets, connections) after I/O completion. After updating the stream
|
||||||
* interface and timeouts, it will try to forward what can be forwarded, then to
|
* interface and timeouts, it will try to forward what can be forwarded, then to
|
||||||
* wake the associated task up if an important event requires special handling.
|
* wake the associated task up if an important event requires special handling.
|
||||||
* It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are
|
* It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are
|
||||||
* encouraged to watch to take appropriate action.
|
* encouraged to watch to take appropriate action.
|
||||||
* It should not be called from within the stream itself, stream_int_update()
|
* It should not be called from within the stream itself, si_update()
|
||||||
* is designed for this.
|
* is designed for this.
|
||||||
*/
|
*/
|
||||||
void stream_int_notify(struct stream_interface *si)
|
static void stream_int_notify(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
struct channel *ic = si_ic(si);
|
struct channel *ic = si_ic(si);
|
||||||
struct channel *oc = si_oc(si);
|
struct channel *oc = si_oc(si);
|
||||||
@ -716,7 +732,7 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
|
|||||||
* performance). It must not be called from outside of the stream handler,
|
* performance). It must not be called from outside of the stream handler,
|
||||||
* as what it does will be used to compute the stream task's expiration.
|
* as what it does will be used to compute the stream task's expiration.
|
||||||
*/
|
*/
|
||||||
void stream_int_update(struct stream_interface *si)
|
void si_update(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
struct channel *ic = si_ic(si);
|
struct channel *ic = si_ic(si);
|
||||||
struct channel *oc = si_oc(si);
|
struct channel *oc = si_oc(si);
|
||||||
@ -830,10 +846,10 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
|
|||||||
|
|
||||||
/* let's recompute both sides states */
|
/* let's recompute both sides states */
|
||||||
if (si_f->state == SI_ST_EST)
|
if (si_f->state == SI_ST_EST)
|
||||||
stream_int_update(si_f);
|
si_update(si_f);
|
||||||
|
|
||||||
if (si_b->state == SI_ST_EST)
|
if (si_b->state == SI_ST_EST)
|
||||||
stream_int_update(si_b);
|
si_update(si_b);
|
||||||
|
|
||||||
/* stream ints are processed outside of process_stream() and must be
|
/* stream ints are processed outside of process_stream() and must be
|
||||||
* handled at the latest moment.
|
* handled at the latest moment.
|
||||||
@ -1349,7 +1365,7 @@ int si_cs_recv(struct conn_stream *cs)
|
|||||||
ic->flags |= CF_READ_NULL;
|
ic->flags |= CF_READ_NULL;
|
||||||
if (ic->flags & CF_AUTO_CLOSE)
|
if (ic->flags & CF_AUTO_CLOSE)
|
||||||
channel_shutw_now(ic);
|
channel_shutw_now(ic);
|
||||||
stream_sock_read0(si);
|
stream_int_read0(si);
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -1359,7 +1375,7 @@ int si_cs_recv(struct conn_stream *cs)
|
|||||||
* It updates the stream interface. If the stream interface has SI_FL_NOHALF,
|
* It updates the stream interface. If the stream interface has SI_FL_NOHALF,
|
||||||
* the close is also forwarded to the write side as an abort.
|
* the close is also forwarded to the write side as an abort.
|
||||||
*/
|
*/
|
||||||
void stream_sock_read0(struct stream_interface *si)
|
static void stream_int_read0(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
struct conn_stream *cs = __objt_cs(si->end);
|
struct conn_stream *cs = __objt_cs(si->end);
|
||||||
struct channel *ic = si_ic(si);
|
struct channel *ic = si_ic(si);
|
||||||
|
Loading…
Reference in New Issue
Block a user