diff --git a/include/proto/cli.h b/include/proto/cli.h index 467a86ea7..74052f714 100644 --- a/include/proto/cli.h +++ b/include/proto/cli.h @@ -32,5 +32,12 @@ int mworker_cli_proxy_create(); int mworker_cli_proxy_new_listener(char *line); int mworker_cli_sockpair_new(struct mworker_proc *mworker_proc, int proc); +/* proxy mode cli functions */ + +/* analyzers */ +int pcli_wait_for_request(struct stream *s, struct channel *req, int an_bit); +int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit); + + #endif /* _PROTO_CLI_H */ diff --git a/include/types/channel.h b/include/types/channel.h index f7ddd1a57..e4f2824ec 100644 --- a/include/types/channel.h +++ b/include/types/channel.h @@ -180,6 +180,9 @@ #define AN_RES_FLT_XFER_DATA 0x04000000 #define AN_RES_FLT_END 0x08000000 +#define AN_REQ_WAIT_CLI 0x10000000 +#define AN_RES_WAIT_CLI 0x20000000 + /* Magic value to forward infinite size (TCP, ...), used with ->to_forward */ #define CHN_INFINITE_FORWARD MAX_RANGE(unsigned int) diff --git a/include/types/proxy.h b/include/types/proxy.h index da098485e..b7c9038d6 100644 --- a/include/types/proxy.h +++ b/include/types/proxy.h @@ -65,6 +65,7 @@ enum pr_mode { PR_MODE_TCP = 0, PR_MODE_HTTP, PR_MODE_HEALTH, + PR_MODE_CLI, } __attribute__((packed)); enum PR_SRV_STATE_FILE { diff --git a/include/types/stream.h b/include/types/stream.h index feeb56b12..87bdf46ed 100644 --- a/include/types/stream.h +++ b/include/types/stream.h @@ -162,6 +162,7 @@ struct stream { void (*srv_error)(struct stream *s, /* the function to call upon unrecoverable server errors (or NULL) */ struct stream_interface *si); + int pcli_next_pid; /* next target PID to use for the CLI proxy */ char *unique_id; /* custom unique ID */ /* These two pointers are used to resume the execution of the rule lists. */ diff --git a/src/cfgparse.c b/src/cfgparse.c index 8d8b6ea32..65afadca6 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -7654,6 +7654,10 @@ int check_config_validity() case PR_MODE_HTTP: curproxy->http_needed = 1; break; + + case PR_MODE_CLI: + cfgerr += proxy_cfg_ensure_no_http(curproxy); + break; } if (curproxy != global.stats_fe && (curproxy->cap & PR_CAP_FE) && LIST_ISEMPTY(&curproxy->conf.listeners)) { @@ -8745,6 +8749,11 @@ int check_config_validity() curproxy->fe_rsp_ana |= AN_RES_WAIT_HTTP | AN_RES_HTTP_PROCESS_FE; } + if (curproxy->mode == PR_MODE_CLI) { + curproxy->fe_req_ana |= AN_REQ_WAIT_CLI; + curproxy->fe_rsp_ana |= AN_RES_WAIT_CLI; + } + /* both TCP and HTTP must check switching rules */ curproxy->fe_req_ana |= AN_REQ_SWITCHING_RULES; diff --git a/src/cli.c b/src/cli.c index 2d4d1281c..f7fb6f0e7 100644 --- a/src/cli.c +++ b/src/cli.c @@ -1619,6 +1619,8 @@ static int cli_parse_simple(char **args, char *payload, struct appctx *appctx, v } +/* The pcli_* functions are used for the CLI proxy in the master */ + static enum obj_type *pcli_pid_to_server(int proc_pid) { struct mworker_proc *child; @@ -1692,6 +1694,379 @@ static int pcli_prefix_to_pid(const char *prefix) return -1; } +/* Parse the CLI request: + * + * - it can rewrite the buffer by trimming the prefix + * - fill dst with the destination server if there is one + * + * Return: + * - the amount of data to forward or + * - -1 if there is no end to the command or + * - 0 everything has been trimmed (only a prefix) + */ +#define PCLI_REQ_INIT 0 +#define PCLI_REQ_PFX 1 +#define PCLI_REQ_TRIM 2 +#define PCLI_REQ_CMD 3 + +int pcli_parse_request(struct channel *req, int *target_pid) +{ + char *input = (char *)ci_head(req); + const char *end; + char *ptr, *trim = NULL, *pfx_b = NULL, *cmd_b = NULL; + struct buffer *buf = &req->buf; + int ret = 0; + int state = PCLI_REQ_INIT; + + ptr = input; + end = b_stop(buf); + + /* The while loop condition is checking the end of the command. + It is needed to iterate for each ptr++ done in the parser */ + while (ptr < end && *ptr != '\n' && *ptr != '\r' && *ptr != ';') { + switch (state) { + /* The init state only trims the useless chars */ + case PCLI_REQ_INIT: + + /* skip every spaces at the start of the command */ + if (*ptr == ' ') { + ptr++; + continue; + } + pfx_b = ptr; /* this is the start of the command or of the @ prefix */ + state = PCLI_REQ_PFX; + + /* the atprefix state looks for a @ prefix. If it finds + it, it will check to which server send the request. + It also ajust the trim pointer */ + case PCLI_REQ_PFX: + + if (*pfx_b != '@') { + /* there is no prefix */ + pfx_b = NULL; + cmd_b = ptr; + state = PCLI_REQ_CMD; + continue; + } + + if (*ptr != ' ') { + ptr++; + continue; + } + *ptr = '\0'; /* this the end of the prefix */ + ptr++; + trim = ptr; + state = PCLI_REQ_TRIM; + break; + + /* we really need to trim there because that's the only + way to know if we are going to send a command or if + there is only a prefix */ + case PCLI_REQ_TRIM: + if (*ptr == ' ') { + ptr++; + continue; + } + cmd_b = trim = ptr; + state = PCLI_REQ_CMD; + + /* just look for the end of the command */ + case PCLI_REQ_CMD: + ptr++; + continue; + } + } + + /* we didn't find a command separator, not enough data */ + if (ptr >= end) + return -1; + + if (!pfx_b && !cmd_b) { + /* probably just a \n or a ; */ + return 1; + } else if (pfx_b && !cmd_b) { + /* it's only a prefix, we don't want to forward it */ + *ptr = '\0'; + trim = ptr + 1; /* we want to trim the whole command */ + ret = 0; + } else if (cmd_b) { + /* command without a prefix */ + *ptr = '\n'; + ret = ptr - cmd_b + 1; + } + + if (pfx_b) + *target_pid = pcli_prefix_to_pid(pfx_b); + + /* trim the useless chars */ + if (trim) + b_del(&req->buf, trim - input); + + return ret; +} + +int pcli_wait_for_request(struct stream *s, struct channel *req, int an_bit) +{ + int target_pid; + int to_forward; + + target_pid = s->pcli_next_pid; + +read_again: + /* if the channel is closed for read, we won't receive any more data + from the client, but we don't want to forward this close to the + server */ + channel_dont_close(req); + + /* We don't know yet to which server we will connect */ + channel_dont_connect(req); + + + /* we are not waiting for a response, there is no more request and we + * receive a close from the client, we can leave */ + if (!(ci_data(req)) && req->flags & CF_SHUTR) { + channel_shutw_now(&s->res); + s->req.analysers &= ~AN_REQ_WAIT_CLI; + return 1; + } + + req->flags |= CF_READ_DONTWAIT; + + /* need more data */ + if (!ci_data(req)) + return 0; + + /* If there is data available for analysis, log the end of the idle time. */ + if (c_data(req) && s->logs.t_idle == -1) + s->logs.t_idle = tv_ms_elapsed(&s->logs.tv_accept, &now) - s->logs.t_handshake; + + to_forward = pcli_parse_request(req, &target_pid); + if (to_forward > 0) { + /* enough data */ + + /* we didn't find the process, send an error and close */ + if (target_pid < 0) { + pcli_reply_and_close(s, "Can't find the target CLI!\n"); + return 0; + } + + /* forward only 1 command */ + channel_forward(req, to_forward); + /* we send only 1 command per request, and we write close after it */ + channel_shutw_now(req); + + /* remove the XFER_DATA analysers, which forwards all + * the data, we don't want to forward the next requests + * We need to add CF_FLT_ANALYZE to abort the forward too. + */ + req->analysers &= ~(AN_REQ_FLT_XFER_DATA|AN_REQ_WAIT_CLI); + req->analysers |= AN_REQ_FLT_END|CF_FLT_ANALYZE; + s->res.analysers |= AN_RES_WAIT_CLI; + + /* we can connect now */ + s->target = pcli_pid_to_server(target_pid); + if (!s->target) { + s->target = &cli_applet.obj_type; + } + + s->flags |= (SF_DIRECT | SF_ASSIGNED); + channel_auto_connect(req); + + } else if (to_forward == 0) { + /* we only received a prefix without command, which + mean that we want to store it for every other + command for this session */ + if (target_pid > -1) { + s->pcli_next_pid = target_pid; + // TODO: pcli_reply the prompt + } else { + // TODO: pcli_reply() error + s->pcli_next_pid = 0; + } + + /* we trimmed things but we might have other commands to consume */ + goto read_again; + } else if (to_forward == -1 && channel_full(req, global.tune.maxrewrite)) { + /* buffer is full and we didn't catch the end of a command */ + goto send_help; + } + + return 0; + +send_help: + b_reset(&req->buf); + b_putblk(&req->buf, "help\n", 5); + goto read_again; +} + +int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit) +{ + struct proxy *fe = strm_fe(s); + struct proxy *be = s->be; + + rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */ + rep->flags |= CF_NEVER_WAIT; + + /* don't forward the close */ + channel_dont_close(&s->res); + channel_dont_close(&s->req); + + /* forward the data */ + if (ci_data(rep)) { + c_adv(rep, ci_data(rep)); + return 0; + } + + if ((rep->flags & (CF_SHUTR|CF_READ_NULL))) { + /* stream cleanup */ + + s->si[1].flags |= SI_FL_NOLINGER | SI_FL_NOHALF; + si_shutr(&s->si[1]); + si_shutw(&s->si[1]); + + /* + * starting from there this the same code as + * http_end_txn_clean_session(). + * + * It allows to do frontend keepalive while reconnecting to a + * new server for each request. + */ + + if (s->flags & SF_BE_ASSIGNED) { + HA_ATOMIC_SUB(&be->beconn, 1); + if (unlikely(s->srv_conn)) + sess_change_server(s, NULL); + } + + s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now); + stream_process_counters(s); + + /* don't count other requests' data */ + s->logs.bytes_in -= ci_data(&s->req); + s->logs.bytes_out -= ci_data(&s->res); + + /* we may need to know the position in the queue */ + pendconn_free(s); + + /* let's do a final log if we need it */ + if (!LIST_ISEMPTY(&fe->logformat) && s->logs.logwait && + !(s->flags & SF_MONITOR) && + (!(fe->options & PR_O_NULLNOLOG) || s->req.total)) { + s->do_log(s); + } + + /* stop tracking content-based counters */ + stream_stop_content_counters(s); + stream_update_time_stats(s); + + s->logs.accept_date = date; /* user-visible date for logging */ + s->logs.tv_accept = now; /* corrected date for internal use */ + s->logs.t_handshake = 0; /* There are no handshake in keep alive connection. */ + s->logs.t_idle = -1; + tv_zero(&s->logs.tv_request); + s->logs.t_queue = -1; + s->logs.t_connect = -1; + s->logs.t_data = -1; + s->logs.t_close = 0; + s->logs.prx_queue_pos = 0; /* we get the number of pending conns before us */ + s->logs.srv_queue_pos = 0; /* we will get this number soon */ + + s->logs.bytes_in = s->req.total = ci_data(&s->req); + s->logs.bytes_out = s->res.total = ci_data(&s->res); + + stream_del_srv_conn(s); + if (objt_server(s->target)) { + if (s->flags & SF_CURR_SESS) { + s->flags &= ~SF_CURR_SESS; + HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1); + } + if (may_dequeue_tasks(objt_server(s->target), be)) + process_srv_queue(objt_server(s->target)); + } + + s->target = NULL; + + /* only release our endpoint if we don't intend to reuse the + * connection. + */ + if (!si_conn_ready(&s->si[1])) { + si_release_endpoint(&s->si[1]); + s->srv_conn = NULL; + } + + s->si[1].state = s->si[1].prev_state = SI_ST_INI; + s->si[1].err_type = SI_ET_NONE; + s->si[1].conn_retries = 0; /* used for logging too */ + s->si[1].exp = TICK_ETERNITY; + s->si[1].flags &= SI_FL_ISBACK | SI_FL_DONT_WAKE; /* we're in the context of process_stream */ + s->req.flags &= ~(CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CONNECT|CF_WRITE_ERROR|CF_STREAMER|CF_STREAMER_FAST|CF_NEVER_WAIT|CF_WAKE_CONNECT|CF_WROTE_DATA); + s->res.flags &= ~(CF_SHUTR|CF_SHUTR_NOW|CF_READ_ATTACHED|CF_READ_ERROR|CF_READ_NOEXP|CF_STREAMER|CF_STREAMER_FAST|CF_WRITE_PARTIAL|CF_NEVER_WAIT|CF_WROTE_DATA); + s->flags &= ~(SF_DIRECT|SF_ASSIGNED|SF_ADDR_SET|SF_BE_ASSIGNED|SF_FORCE_PRST|SF_IGNORE_PRST); + s->flags &= ~(SF_CURR_SESS|SF_REDIRECTABLE|SF_SRV_REUSED); + s->flags &= ~(SF_ERR_MASK|SF_FINST_MASK|SF_REDISP); + /* reinitialise the current rule list pointer to NULL. We are sure that + * any rulelist match the NULL pointer. + */ + s->current_rule_list = NULL; + + s->be = strm_fe(s); + s->logs.logwait = strm_fe(s)->to_log; + s->logs.level = 0; + stream_del_srv_conn(s); + s->target = NULL; + /* re-init store persistence */ + s->store_count = 0; + s->uniq_id = global.req_count++; + + s->req.flags |= CF_READ_DONTWAIT; /* one read is usually enough */ + + s->req.flags |= CF_WAKE_ONCE; /* need to be called again if there is some command left in the request */ + + s->req.analysers |= AN_REQ_WAIT_CLI; + s->res.analysers &= ~AN_RES_WAIT_CLI; + + /* We must trim any excess data from the response buffer, because we + * may have blocked an invalid response from a server that we don't + * want to accidentely forward once we disable the analysers, nor do + * we want those data to come along with next response. A typical + * example of such data would be from a buggy server responding to + * a HEAD with some data, or sending more than the advertised + * content-length. + */ + if (unlikely(ci_data(&s->res))) + b_set_data(&s->res.buf, co_data(&s->res)); + + /* Now we can realign the response buffer */ + c_realign_if_empty(&s->res); + + s->req.rto = strm_fe(s)->timeout.client; + s->req.wto = TICK_ETERNITY; + + s->res.rto = TICK_ETERNITY; + s->res.wto = strm_fe(s)->timeout.client; + + s->req.rex = TICK_ETERNITY; + s->req.wex = TICK_ETERNITY; + s->req.analyse_exp = TICK_ETERNITY; + s->res.rex = TICK_ETERNITY; + s->res.wex = TICK_ETERNITY; + s->res.analyse_exp = TICK_ETERNITY; + s->si[1].hcto = TICK_ETERNITY; + + /* we're removing the analysers, we MUST re-enable events detection. + * We don't enable close on the response channel since it's either + * already closed, or in keep-alive with an idle connection handler. + */ + channel_auto_read(&s->req); + channel_auto_close(&s->req); + channel_auto_read(&s->res); + + + return 1; + } + return 0; +} + /* * The mworker functions are used to initialize the CLI in the master process */ @@ -1711,7 +2086,7 @@ int mworker_cli_proxy_create() mworker_proxy->next = proxies_list; proxies_list = mworker_proxy; mworker_proxy->id = strdup("MASTER"); - mworker_proxy->mode = PR_MODE_TCP; + mworker_proxy->mode = PR_MODE_CLI; mworker_proxy->state = PR_STNEW; mworker_proxy->last_change = now.tv_sec; mworker_proxy->cap = PR_CAP_LISTEN; /* this is a listen section */ @@ -1768,9 +2143,9 @@ int mworker_cli_proxy_create() /* no port specified */ newsrv->flags |= SRV_F_MAPPORTS; newsrv->addr = *sk; - newsrv->iweight = 1; - newsrv->uweight = 1; - mworker_proxy->srv_act++; + /* don't let the server participate to load balancing */ + newsrv->iweight = 0; + newsrv->uweight = 0; srv_lb_commit_status(newsrv); child->srv = newsrv; diff --git a/src/proxy.c b/src/proxy.c index 0fbfce9c6..9a6313a1c 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -88,6 +88,8 @@ const char *proxy_mode_str(int mode) { return "http"; else if (mode == PR_MODE_HEALTH) return "health"; + else if (mode == PR_MODE_CLI) + return "cli"; else return "unknown"; } diff --git a/src/stream.c b/src/stream.c index f33713dc9..a3a65e17f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -160,6 +160,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup; s->flags |= SF_INITIALIZED; + s->pcli_next_pid = 0; s->unique_id = NULL; if ((t = task_new(tid_bit)) == NULL) @@ -1959,6 +1960,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) FLT_ANALYZE(s, req, process_sticking_rules, ana_list, ana_back, AN_REQ_STICKING_RULES); ANALYZE (s, req, flt_analyze_http_headers, ana_list, ana_back, AN_REQ_FLT_HTTP_HDRS); ANALYZE (s, req, http_request_forward_body, ana_list, ana_back, AN_REQ_HTTP_XFER_BODY); + ANALYZE (s, req, pcli_wait_for_request, ana_list, ana_back, AN_REQ_WAIT_CLI); ANALYZE (s, req, flt_xfer_data, ana_list, ana_back, AN_REQ_FLT_XFER_DATA); ANALYZE (s, req, flt_end_analyze, ana_list, ana_back, AN_REQ_FLT_END); break; @@ -2027,6 +2029,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) FLT_ANALYZE(s, res, http_process_res_common, ana_list, ana_back, AN_RES_HTTP_PROCESS_BE, s->be); ANALYZE (s, res, flt_analyze_http_headers, ana_list, ana_back, AN_RES_FLT_HTTP_HDRS); ANALYZE (s, res, http_response_forward_body, ana_list, ana_back, AN_RES_HTTP_XFER_BODY); + ANALYZE (s, res, pcli_wait_for_response, ana_list, ana_back, AN_RES_WAIT_CLI); ANALYZE (s, res, flt_xfer_data, ana_list, ana_back, AN_RES_FLT_XFER_DATA); ANALYZE (s, res, flt_end_analyze, ana_list, ana_back, AN_RES_FLT_END); break;