diff --git a/include/types/hlua.h b/include/types/hlua.h index f21f4e1b7..49251556c 100644 --- a/include/types/hlua.h +++ b/include/types/hlua.h @@ -2,9 +2,14 @@ #define _TYPES_HLUA_H #include +#include + +#include +#include #define CLASS_CORE "Core" #define CLASS_TXN "TXN" +#define CLASS_SOCKET "Socket" struct session; @@ -100,4 +105,12 @@ struct hlua_sleep { unsigned int wakeup_ms; /* hour to wakeup. */ }; +/* This struct is used to create coprocess doing TCP or + * SSL I/O. It uses a fake session. + */ +struct hlua_socket { + struct session *s; /* Session used for socket I/O. */ + luaL_Buffer b; /* buffer used to prepare strings. */ +}; + #endif /* _TYPES_HLUA_H */ diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index e06063410..ab0e750c4 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -27,6 +27,9 @@ #include #include +#ifdef USE_LUA +#include +#endif #include #include @@ -150,6 +153,14 @@ struct appctx { struct pattern_expr *expr; struct chunk chunk; } map; +#ifdef USE_LUA + struct { + int connected; + struct hlua_socket *socket; + struct list wake_on_read; + struct list wake_on_write; + } hlua; +#endif } ctx; /* used by stats I/O handlers to dump the stats */ }; diff --git a/src/hlua.c b/src/hlua.c index 6f85d999b..0f06188ae 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -6,16 +8,24 @@ #include +#include #include #include #include #include +#include #include +#include #include #include #include +#include #include +#include +#include +#include +#include #include /* Lua uses longjmp to perform yield or throwing errors. This @@ -38,6 +48,13 @@ struct hlua gL; struct pool_head *pool2_hlua_com; struct pool_head *pool2_hlua_sleep; +/* Used for Socket connection. */ +static struct proxy socket_proxy; +static struct server socket_tcp; +#ifdef USE_OPENSSL +static struct server socket_ssl; +#endif + /* List head of the function called at the initialisation time. */ struct list hlua_init_functions = LIST_HEAD_INIT(hlua_init_functions); @@ -53,6 +70,7 @@ struct eb_root hlua_ctx = EB_ROOT_UNIQUE; */ static int class_core_ref; static int class_txn_ref; +static int class_socket_ref; /* These functions converts types between HAProxy internal args or * sample and LUA types. Another function permits to check if the @@ -774,6 +792,937 @@ static enum hlua_exec hlua_ctx_resume(struct hlua *lua, int yield_allowed) * __le "<=" */ +/* + * + * + * Class Socket + * + * + */ + +__LJMP static struct hlua_socket *hlua_checksocket(lua_State *L, int ud) +{ + return (struct hlua_socket *)MAY_LJMP(hlua_checkudata(L, ud, class_socket_ref)); +} + +/* This function is the handler called for each I/O on the established + * connection. It is used for notify space avalaible to send or data + * received. + */ +static void hlua_socket_handler(struct stream_interface *si) +{ + struct appctx *appctx = objt_appctx(si->end); + struct connection *c = objt_conn(si->ib->cons->end); + + /* Wakeup the main session if the client connection is closed. */ + if (!c || channel_output_closed(si->ib) || channel_input_closed(si->ob)) { + if (appctx->ctx.hlua.socket) { + appctx->ctx.hlua.socket->s = NULL; + appctx->ctx.hlua.socket = NULL; + } + si_shutw(si); + si_shutr(si); + si->ib->flags |= CF_READ_NULL; + hlua_com_wake(&appctx->ctx.hlua.wake_on_read); + hlua_com_wake(&appctx->ctx.hlua.wake_on_write); + return; + } + + if (!(c->flags & CO_FL_CONNECTED)) + return; + + /* This function is called after the connect. */ + appctx->ctx.hlua.connected = 1; + + /* Wake the tasks which wants to write if the buffer have avalaible space. */ + if (channel_may_recv(si->ob)) + hlua_com_wake(&appctx->ctx.hlua.wake_on_write); + + /* Wake the tasks which wants to read if the buffer contains data. */ + if (channel_is_empty(si->ib)) + hlua_com_wake(&appctx->ctx.hlua.wake_on_read); +} + +/* This function is called when the "struct session" is destroyed. + * Remove the link from the object to this session. + * Wake all the pending signals. + */ +static void hlua_socket_release(struct stream_interface *si) +{ + struct appctx *appctx = objt_appctx(si->end); + + /* Remove my link in the original object. */ + if (appctx->ctx.hlua.socket) + appctx->ctx.hlua.socket->s = NULL; + + /* Wake all the task waiting for me. */ + hlua_com_wake(&appctx->ctx.hlua.wake_on_read); + hlua_com_wake(&appctx->ctx.hlua.wake_on_write); +} + +/* If the garbage collectio of the object is launch, nobody + * uses this object. If the session does not exists, just quit. + * Send the shutdown signal to the session. In some cases, + * pending signal can rest in the read and write lists. destroy + * it. + */ +__LJMP static int hlua_socket_gc(lua_State *L) +{ + MAY_LJMP(check_args(L, 1, "__gc")); + + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + struct appctx *appctx; + + if (!socket->s) + return 0; + + /* Remove all reference between the Lua stack and the coroutine session. */ + appctx = objt_appctx(socket->s->si[0].end); + session_shutdown(socket->s, SN_ERR_KILLED); + socket->s = NULL; + appctx->ctx.hlua.socket = NULL; + + return 0; +} + +/* The close function send shutdown signal and break the + * links between the session and the object. + */ +__LJMP static int hlua_socket_close(lua_State *L) +{ + MAY_LJMP(check_args(L, 1, "close")); + + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + struct appctx *appctx; + + if (!socket->s) + return 0; + + /* Close the session and remove the associated stop task. */ + session_shutdown(socket->s, SN_ERR_KILLED); + appctx = objt_appctx(socket->s->si[0].end); + appctx->ctx.hlua.socket = NULL; + socket->s = NULL; + + return 0; +} + +/* This Lua function assumes that the stack contain three parameters. + * 1 - USERDATA containing a struct socket + * 2 - INTEGER with values of the macro defined below + * If the integer is -1, we must read at most one line. + * If the integer is -2, we ust read all the data until the + * end of the stream. + * If the integer is positive value, we must read a number of + * bytes corresponding to this value. + */ +#define HLSR_READ_LINE (-1) +#define HLSR_READ_ALL (-2) +__LJMP static int hlua_socket_receive_yield(struct lua_State *L) +{ + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + int wanted = lua_tointeger(L, 2); + struct hlua *hlua = hlua_gethlua(L); + struct appctx *appctx; + int len; + int nblk; + char *blk1; + int len1; + char *blk2; + int len2; + + /* Check if this lua stack is schedulable. */ + if (!hlua || !hlua->task) + WILL_LJMP(luaL_error(L, "The 'receive' function is only allowed in " + "'frontend', 'backend' or 'task'")); + + /* check for connection closed. If some data where read, return it. */ + if (!socket->s) + goto connection_closed; + + if (wanted == HLSR_READ_LINE) { + + /* Read line. */ + nblk = bo_getline_nc(socket->s->si[0].ob, &blk1, &len1, &blk2, &len2); + if (nblk < 0) /* Connection close. */ + goto connection_closed; + if (nblk == 0) /* No data avalaible. */ + goto connection_empty; + } + + else if (wanted == HLSR_READ_ALL) { + + /* Read all the available data. */ + nblk = bo_getblk_nc(socket->s->si[0].ob, &blk1, &len1, &blk2, &len2); + if (nblk < 0) /* Connection close. */ + goto connection_closed; + if (nblk == 0) /* No data avalaible. */ + goto connection_empty; + } + + else { + + /* Read a block of data. */ + nblk = bo_getblk_nc(socket->s->si[0].ob, &blk1, &len1, &blk2, &len2); + if (nblk < 0) /* Connection close. */ + goto connection_closed; + if (nblk == 0) /* No data avalaible. */ + goto connection_empty; + + if (len1 > wanted) { + nblk = 1; + len1 = wanted; + } if (nblk == 2 && len1 + len2 > wanted) + len2 = wanted - len1; + } + + len = len1; + + luaL_addlstring(&socket->b, blk1, len1); + if (nblk == 2) { + len += len2; + luaL_addlstring(&socket->b, blk2, len2); + } + + /* Consume data. */ + bo_skip(socket->s->si[0].ob, len); + + /* Don't wait anything. */ + si_update(&socket->s->si[0]); + + /* If the pattern reclaim to read all the data + * in the connection, got out. + */ + if (wanted == HLSR_READ_ALL) + goto connection_empty; + else if (wanted >= 0 && len < wanted) + goto connection_empty; + + /* Return result. */ + luaL_pushresult(&socket->b); + return 1; + +connection_closed: + + /* If the buffer containds data. */ + if (socket->b.n > 0) { + luaL_pushresult(&socket->b); + return 1; + } + lua_pushnil(L); + lua_pushstring(L, "connection closed."); + return 2; + +connection_empty: + + appctx = objt_appctx(socket->s->si[0].end); + if (!hlua_com_new(hlua, &appctx->ctx.hlua.wake_on_read)) + WILL_LJMP(luaL_error(L, "out of memory")); + WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_receive_yield)); + return 0; +} + +/* This Lus function gets two parameters. The first one can be string + * or a number. If the string is "*l", the user require one line. If + * the string is "*a", the user require all the content of the stream. + * If the value is a number, the user require a number of bytes equal + * to the value. The default value is "*l" (a line). + * + * This paraeter with a variable type is converted in integer. This + * integer takes this values: + * -1 : read a line + * -2 : read all the stream + * >0 : amount if bytes. + * + * The second parameter is optinal. It contains a string that must be + * concatenated with the read data. + */ +__LJMP static int hlua_socket_receive(struct lua_State *L) +{ + int wanted = HLSR_READ_LINE; + const char *pattern; + int type; + char *error; + size_t len; + + if (lua_gettop(L) < 1 || lua_gettop(L) > 3) + WILL_LJMP(luaL_error(L, "The 'receive' function requires between 1 and 3 arguments.")); + + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + + /* check for pattern. */ + if (lua_gettop(L) >= 2) { + type = lua_type(L, 2); + if (type == LUA_TSTRING) { + pattern = lua_tostring(L, 2); + if (strcmp(pattern, "*a") == 0) + wanted = HLSR_READ_ALL; + else if (strcmp(pattern, "*l") == 0) + wanted = HLSR_READ_LINE; + else { + wanted = strtoll(pattern, &error, 10); + if (*error != '\0') + WILL_LJMP(luaL_error(L, "Unsupported pattern.")); + } + } + else if (type == LUA_TNUMBER) { + wanted = lua_tointeger(L, 2); + if (wanted < 0) + WILL_LJMP(luaL_error(L, "Unsupported size.")); + } + } + + /* Set pattern. */ + lua_pushinteger(L, wanted); + lua_replace(L, 2); + + /* init bufffer, and fiil it wih prefix. */ + luaL_buffinit(L, &socket->b); + + /* Check prefix. */ + if (lua_gettop(L) >= 3) { + if (lua_type(L, 3) != LUA_TSTRING) + WILL_LJMP(luaL_error(L, "Expect a 'string' for the prefix")); + pattern = lua_tolstring(L, 3, &len); + luaL_addlstring(&socket->b, pattern, len); + } + + return __LJMP(hlua_socket_receive_yield(L)); +} + +/* Write the Lua input string in the output buffer. + * This fucntion returns a yield if no space are available. + */ +static int hlua_socket_write_yield(struct lua_State *L) +{ + struct hlua_socket *socket; + struct hlua *hlua = hlua_gethlua(L); + struct appctx *appctx; + size_t buf_len; + const char *buf; + int len; + int send_len; + int sent; + + /* Check if this lua stack is schedulable. */ + if (!hlua || !hlua->task) + WILL_LJMP(luaL_error(L, "The 'write' function is only allowed in " + "'frontend', 'backend' or 'task'")); + + /* Get object */ + socket = MAY_LJMP(hlua_checksocket(L, 1)); + buf = MAY_LJMP(luaL_checklstring(L, 2, &buf_len)); + sent = MAY_LJMP(luaL_checkunsigned(L, 3)); + + /* Check for connection close. */ + if (!socket->s || channel_output_closed(socket->s->req)) { + lua_pushinteger(L, -1); + return 1; + } + + /* Update the input buffer data. */ + buf += sent; + send_len = buf_len - sent; + + /* All the data are sent. */ + if (sent >= buf_len) + return 1; /* Implicitly return the length sent. */ + + /* Check for avalaible space. */ + len = buffer_total_space(socket->s->si[0].ib->buf); + if (len <= 0) + goto hlua_socket_write_yield_return; + + /* send data */ + if (len < send_len) + send_len = len; + len = bi_putblk(socket->s->si[0].ib, buf+sent, send_len); + + /* "Not enough space" (-1), "Buffer too little to contain + * the data" (-2) are not expected because the available length + * is tested. + * Other unknown error are also not expected. + */ + if (len <= 0) { + MAY_LJMP(hlua_socket_close(L)); + lua_pop(L, 1); + lua_pushunsigned(L, -1); + return 1; + } + + /* update buffers. */ + si_update(&socket->s->si[0]); + socket->s->si[0].ib->rex = TICK_ETERNITY; + socket->s->si[0].ob->wex = TICK_ETERNITY; + + /* Update length sent. */ + lua_pop(L, 1); + lua_pushunsigned(L, sent + len); + + /* All the data buffer is sent ? */ + if (sent + len >= buf_len) + return 1; + +hlua_socket_write_yield_return: + appctx = objt_appctx(socket->s->si[0].end); + if (!hlua_com_new(hlua, &appctx->ctx.hlua.wake_on_write)) + WILL_LJMP(luaL_error(L, "out of memory")); + WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_write_yield)); + return 0; +} + +/* This function initiate the send of data. It just check the input + * parameters and push an integer in the Lua stack that contain the + * amount of data writed in the buffer. This is used by the function + * "hlua_socket_write_yield" that can yield. + * + * The Lua function gets between 3 and 4 parameters. The first one is + * the associated object. The second is a string buffer. The third is + * a facultative integer that represents where is the buffer position + * of the start of the data that can send. The first byte is the + * position "1". The default value is "1". The fourth argument is a + * facultative integer that represents where is the buffer position + * of the end of the data that can send. The default is the last byte. + */ +static int hlua_socket_send(struct lua_State *L) +{ + int i; + int j; + const char *buf; + size_t buf_len; + + /* Check number of arguments. */ + if (lua_gettop(L) < 2 || lua_gettop(L) > 4) + WILL_LJMP(luaL_error(L, "'send' needs between 2 and 4 arguments")); + + /* Get the string. */ + buf = MAY_LJMP(luaL_checklstring(L, 2, &buf_len)); + + /* Get and check j. */ + if (lua_gettop(L) == 4) { + j = MAY_LJMP(luaL_checkinteger(L, 4)); + if (j < 0) + j = buf_len + j + 1; + if (j > buf_len) + j = buf_len + 1; + lua_pop(L, 1); + } + else + j = buf_len; + + /* Get and check i. */ + if (lua_gettop(L) == 3) { + i = MAY_LJMP(luaL_checkinteger(L, 3)); + if (i < 0) + i = buf_len + i + 1; + if (i > buf_len) + i = buf_len + 1; + lua_pop(L, 1); + } else + i = 1; + + /* Check bth i and j. */ + if (i > j) { + lua_pushunsigned(L, 0); + return 1; + } + if (i == 0 && j == 0) { + lua_pushunsigned(L, 0); + return 1; + } + if (i == 0) + i = 1; + if (j == 0) + j = 1; + + /* Pop the string. */ + lua_pop(L, 1); + + /* Update the buffer length. */ + buf += i - 1; + buf_len = j - i + 1; + lua_pushlstring(L, buf, buf_len); + + /* This unsigned is used to remember the amount of sent data. */ + lua_pushunsigned(L, 0); + + return MAY_LJMP(hlua_socket_write_yield(L)); +} + +#define SOCKET_INFO_EXPANDED_FORM "[0000:0000:0000:0000:0000:0000:0000:0000]:12345" +static char _socket_info_expanded_form[] = SOCKET_INFO_EXPANDED_FORM; +#define SOCKET_INFO_MAX_LEN (sizeof(_socket_info_expanded_form)) +__LJMP static inline int hlua_socket_info(struct lua_State *L, struct sockaddr_storage *addr) +{ + static char buffer[SOCKET_INFO_MAX_LEN]; + int ret; + int len; + char *p; + + ret = addr_to_str(addr, buffer+1, SOCKET_INFO_MAX_LEN-1); + if (ret <= 0) { + lua_pushnil(L); + return 1; + } + + if (ret == AF_UNIX) { + lua_pushstring(L, buffer+1); + return 1; + } + else if (ret == AF_INET6) { + buffer[0] = '['; + len = strlen(buffer); + buffer[len] = ']'; + len++; + buffer[len] = ':'; + len++; + p = buffer; + } + else if (ret == AF_INET) { + p = buffer + 1; + len = strlen(p); + p[len] = ':'; + len++; + } + else { + lua_pushnil(L); + return 1; + } + + if (port_to_str(addr, p + len, SOCKET_INFO_MAX_LEN-1 - len) <= 0) { + lua_pushnil(L); + return 1; + } + + lua_pushstring(L, p); + return 1; +} + +/* Returns information about the peer of the connection. */ +__LJMP static int hlua_socket_getpeername(struct lua_State *L) +{ + MAY_LJMP(check_args(L, 1, "getpeername")); + + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + + /* Check if the tcp object is avalaible. */ + if (!socket->s) { + lua_pushnil(L); + return 1; + } + + struct connection *conn = objt_conn(socket->s->si[1].end); + if (!conn) { + lua_pushnil(L); + return 1; + } + + if (!(conn->flags & CO_FL_ADDR_TO_SET)) { + unsigned int salen = sizeof(conn->addr.to); + if (getpeername(conn->t.sock.fd, (struct sockaddr *)&conn->addr.to, &salen) == -1) { + lua_pushnil(L); + return 1; + } + conn->flags |= CO_FL_ADDR_TO_SET; + } + + return MAY_LJMP(hlua_socket_info(L, &conn->addr.to)); +} + +/* Returns information about my connection side. */ +static int hlua_socket_getsockname(struct lua_State *L) +{ + MAY_LJMP(check_args(L, 1, "getsockname")); + + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + + /* Check if the tcp object is avalaible. */ + if (!socket->s) { + lua_pushnil(L); + return 1; + } + + struct connection *conn = objt_conn(socket->s->si[1].end); + if (!conn) { + lua_pushnil(L); + return 1; + } + + if (!(conn->flags & CO_FL_ADDR_FROM_SET)) { + unsigned int salen = sizeof(conn->addr.from); + if (getsockname(conn->t.sock.fd, (struct sockaddr *)&conn->addr.from, &salen) == -1) { + lua_pushnil(L); + return 1; + } + conn->flags |= CO_FL_ADDR_FROM_SET; + } + + return hlua_socket_info(L, &conn->addr.from); +} + +/* This struct define the applet. */ +static struct si_applet update_applet = { + .obj_type = OBJ_TYPE_APPLET, + .name = "", + .fct = hlua_socket_handler, + .release = hlua_socket_release, +}; + +__LJMP static int hlua_socket_connect_yield(struct lua_State *L) +{ + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + struct hlua *hlua = hlua_gethlua(L); + struct appctx *appctx; + + /* Check for connection close. */ + if (!hlua || !socket->s || channel_output_closed(socket->s->req)) { + lua_pushnil(L); + lua_pushstring(L, "Can't connect"); + return 2; + } + + appctx = objt_appctx(socket->s->si[0].end); + + /* Check for connection established. */ + if (appctx->ctx.hlua.connected) { + lua_pushinteger(L, 1); + return 1; + } + + if (!hlua_com_new(hlua, &appctx->ctx.hlua.wake_on_write)) + WILL_LJMP(luaL_error(L, "out of memory error")); + WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_connect_yield)); + return 0; +} + +/* This function fail or initite the connection. */ +__LJMP static int hlua_socket_connect(struct lua_State *L) +{ + struct hlua_socket *socket; + unsigned int port; + const char *ip; + struct connection *conn; + + MAY_LJMP(check_args(L, 3, "connect")); + + /* Get args. */ + socket = MAY_LJMP(hlua_checksocket(L, 1)); + ip = MAY_LJMP(luaL_checkstring(L, 2)); + port = MAY_LJMP(luaL_checkunsigned(L, 3)); + + conn = si_alloc_conn(socket->s->req->cons, 0); + if (!conn) + WILL_LJMP(luaL_error(L, "connect: internal error")); + + /* Parse ip address. */ + conn->addr.to.ss_family = AF_UNSPEC; + if (!str2ip2(ip, &conn->addr.to, 0)) + WILL_LJMP(luaL_error(L, "connect: cannot parse ip address '%s'", ip)); + + /* Set port. */ + if (conn->addr.to.ss_family == AF_INET) + ((struct sockaddr_in *)&conn->addr.to)->sin_port = htons(port); + else if (conn->addr.to.ss_family == AF_INET6) + ((struct sockaddr_in6 *)&conn->addr.to)->sin6_port = htons(port); + + /* it is important not to call the wakeup function directly but to + * pass through task_wakeup(), because this one knows how to apply + * priorities to tasks. + */ + task_wakeup(socket->s->task, TASK_WOKEN_INIT); + + WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_connect_yield)); + + return 0; +} + +__LJMP static int hlua_socket_connect_ssl(struct lua_State *L) +{ + struct hlua_socket *socket; + + MAY_LJMP(check_args(L, 3, "connect_ssl")); + socket = MAY_LJMP(hlua_checksocket(L, 1)); + socket->s->target = &socket_ssl.obj_type; + return MAY_LJMP(hlua_socket_connect(L)); +} + +__LJMP static int hlua_socket_setoption(struct lua_State *L) +{ + return 0; +} + +__LJMP static int hlua_socket_settimeout(struct lua_State *L) +{ + MAY_LJMP(check_args(L, 2, "settimeout")); + + struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1)); + unsigned int tmout = MAY_LJMP(luaL_checkunsigned(L, 2)) * 1000; + + socket->s->req->rto = tmout; + socket->s->req->wto = tmout; + socket->s->rep->rto = tmout; + socket->s->rep->wto = tmout; + + return 0; +} + +__LJMP static int hlua_socket_new(lua_State *L) +{ + struct hlua_socket *socket; + struct appctx *appctx; + + /* Check stack size. */ + if (!lua_checkstack(L, 2)) { + hlua_pusherror(L, "socket: full stack"); + goto out_fail_conf; + } + + socket = MAY_LJMP(lua_newuserdata(L, sizeof(*socket))); + memset(socket, 0, sizeof(*socket)); + + /* Pop a class session metatable and affect it to the userdata. */ + lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref); + lua_setmetatable(L, -2); + + /* + * + * Get memory for the request. + * + */ + + socket->s = pool_alloc2(pool2_session); + if (!socket->s) { + hlua_pusherror(L, "socket: out of memory"); + goto out_fail_conf; + } + + socket->s->task = task_new(); + if (!socket->s->task) { + hlua_pusherror(L, "socket: out of memory"); + goto out_free_session; + } + + socket->s->req = pool_alloc2(pool2_channel); + if (!socket->s->req) { + hlua_pusherror(L, "socket: out of memory"); + goto out_fail_req; + } + + socket->s->req->buf = pool_alloc2(pool2_buffer); + if (!socket->s->req->buf) { + hlua_pusherror(L, "socket: out of memory"); + goto out_fail_req_buf; + } + + socket->s->rep = pool_alloc2(pool2_channel); + if (!socket->s->rep) { + hlua_pusherror(L, "socket: out of memory"); + goto out_fail_rep; + } + + socket->s->rep->buf = pool_alloc2(pool2_buffer); + if (!socket->s->rep->buf) { + hlua_pusherror(L, "socket: out of memory"); + goto out_fail_rep_buf; + } + + /* Configura empty Lua for the session. */ + socket->s->hlua.T = NULL; + socket->s->hlua.Tref = LUA_REFNIL; + socket->s->hlua.Mref = LUA_REFNIL; + socket->s->hlua.nargs = 0; + socket->s->hlua.state = HLUA_STOP; + LIST_INIT(&socket->s->hlua.com); + + /* session initialisation. */ + session_init_srv_conn(socket->s); + + /* + * + * Configure the associated task. + * + */ + + /* This is the dedicated function to process the session. This function + * is able to establish the conection, process the timeouts, etc ... + */ + socket->s->task->process = process_session; + + /* Back reference to session. This is used by process_session(). */ + socket->s->task->context = socket->s; + + /* The priority of the task is normal. */ + socket->s->task->nice = 0; + + /* Init the next run to eternity. Later in this function, this task is + * waked. + */ + socket->s->task->expire = TICK_ETERNITY; + + /* + * + * Initialize the attached buffers + * + */ + socket->s->req->buf->size = global.tune.bufsize; + socket->s->rep->buf->size = global.tune.bufsize; + + /* + * + * Initialize channels. + * + */ + + /* This function reset the struct. It must be called + * before the configuration. + */ + channel_init(socket->s->req); + channel_init(socket->s->rep); + + socket->s->req->prod = &socket->s->si[0]; + socket->s->req->cons = &socket->s->si[1]; + + socket->s->rep->prod = &socket->s->si[1]; + socket->s->rep->cons = &socket->s->si[0]; + + socket->s->si[0].ib = socket->s->req; + socket->s->si[0].ob = socket->s->rep; + + socket->s->si[1].ib = socket->s->rep; + socket->s->si[1].ob = socket->s->req; + + socket->s->req->analysers = 0; + socket->s->req->rto = socket_proxy.timeout.client; + socket->s->req->wto = socket_proxy.timeout.server; + socket->s->req->rex = TICK_ETERNITY; + socket->s->req->wex = TICK_ETERNITY; + socket->s->req->analyse_exp = TICK_ETERNITY; + + socket->s->rep->analysers = 0; + socket->s->rep->rto = socket_proxy.timeout.server; + socket->s->rep->wto = socket_proxy.timeout.client; + socket->s->rep->rex = TICK_ETERNITY; + socket->s->rep->wex = TICK_ETERNITY; + socket->s->rep->analyse_exp = TICK_ETERNITY; + + /* + * + * Configure the session. + * + */ + + /* The session dont have listener. The listener is used with real + * proxies. + */ + socket->s->listener = NULL; + + /* The flags are initialized to 0. Values are setted later. */ + socket->s->flags = 0; + + /* Assign the configured proxy to the new session. */ + socket->s->be = &socket_proxy; + socket->s->fe = &socket_proxy; + + /* XXX: Set namy variables */ + socket->s->store_count = 0; + memset(socket->s->stkctr, 0, sizeof(socket->s->stkctr)); + + /* Configure logs. */ + socket->s->logs.logwait = 0; + socket->s->logs.level = 0; + socket->s->logs.accept_date = date; /* user-visible date for logging */ + socket->s->logs.tv_accept = now; /* corrected date for internal use */ + socket->s->do_log = NULL; + + /* Function used if an error is occured. */ + socket->s->srv_error = default_srv_error; + + /* Init the list of buffers. */ + LIST_INIT(&socket->s->buffer_wait); + + /* Dont configure the unique ID. */ + socket->s->uniq_id = 0; + socket->s->unique_id = NULL; + + /* XXX: ? */ + socket->s->pend_pos = NULL; + + /* XXX: See later. */ + socket->s->txn.sessid = NULL; + socket->s->txn.srv_cookie = NULL; + socket->s->txn.cli_cookie = NULL; + socket->s->txn.uri = NULL; + socket->s->txn.req.cap = NULL; + socket->s->txn.rsp.cap = NULL; + socket->s->txn.hdr_idx.v = NULL; + socket->s->txn.hdr_idx.size = 0; + socket->s->txn.hdr_idx.used = 0; + + /* Configure "left" stream interface as applet. This "si" produce + * and use the data received from the server. The applet is initialized + * and is attached to the stream interface. + */ + + /* The data producer is already connected. It is the applet. */ + socket->s->req->flags = CF_READ_ATTACHED; + + channel_auto_connect(socket->s->req); /* don't wait to establish connection */ + channel_auto_close(socket->s->req); /* let the producer forward close requests */ + + si_reset(&socket->s->si[0], socket->s->task); + si_set_state(&socket->s->si[0], SI_ST_EST); /* connection established (resource exists) */ + + appctx = stream_int_register_handler(&socket->s->si[0], &update_applet); + if (!appctx) + goto out_fail_conn1; + appctx->ctx.hlua.socket = socket; + appctx->ctx.hlua.connected = 0; + LIST_INIT(&appctx->ctx.hlua.wake_on_write); + LIST_INIT(&appctx->ctx.hlua.wake_on_read); + + /* Configure "right" stream interface. this "si" is used to connect + * and retrieve data from the server. The connection is initialized + * with the "struct server". + */ + si_reset(&socket->s->si[1], socket->s->task); + si_set_state(&socket->s->si[1], SI_ST_INI); + socket->s->si[1].conn_retries = socket_proxy.conn_retries; + + /* Force destination server. */ + socket->s->flags |= SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET | SN_BE_ASSIGNED; + socket->s->target = &socket_tcp.obj_type; + + /* This session is added to te lists of alive sessions. */ + LIST_ADDQ(&sessions, &socket->s->list); + + /* XXX: I think that this list is used by stats. */ + LIST_INIT(&socket->s->back_refs); + + /* Update statistics counters. */ + socket_proxy.feconn++; /* beconn will be increased later */ + jobs++; + totalconn++; + + /* Return yield waiting for connection. */ + return 1; + +out_fail_conn1: + pool_free2(pool2_buffer, socket->s->rep->buf); +out_fail_rep_buf: + pool_free2(pool2_channel, socket->s->rep); +out_fail_rep: + pool_free2(pool2_buffer, socket->s->req->buf); +out_fail_req_buf: + pool_free2(pool2_channel, socket->s->req); +out_fail_req: + task_free(socket->s->task); +out_free_session: + pool_free2(pool2_session, socket->s); +out_fail_conf: + WILL_LJMP(lua_error(L)); + return 0; +} + /* * * @@ -1873,6 +2822,7 @@ void hlua_init(void) hlua_class_function(gL.T, "register_converters", hlua_register_converters); hlua_class_function(gL.T, "sleep", hlua_sleep); hlua_class_function(gL.T, "msleep", hlua_msleep); + hlua_class_function(gL.T, "tcp", hlua_socket_new); /* Store the table __index in the metable. */ lua_settable(gL.T, -3); @@ -1944,4 +2894,176 @@ void hlua_init(void) lua_pushvalue(gL.T, -1); /* Copy the -1 entry and push it on the stack. */ lua_setfield(gL.T, LUA_REGISTRYINDEX, CLASS_TXN); /* register class session. */ class_txn_ref = luaL_ref(gL.T, LUA_REGISTRYINDEX); /* reference class session. */ + + /* + * + * Register class Socket + * + */ + + /* Create and fill the metatable. */ + lua_newtable(gL.T); + + /* Create and fille the __index entry. */ + lua_pushstring(gL.T, "__index"); + lua_newtable(gL.T); + + hlua_class_function(gL.T, "connect_ssl", hlua_socket_connect_ssl); + hlua_class_function(gL.T, "connect", hlua_socket_connect); + hlua_class_function(gL.T, "send", hlua_socket_send); + hlua_class_function(gL.T, "receive", hlua_socket_receive); + hlua_class_function(gL.T, "close", hlua_socket_close); + hlua_class_function(gL.T, "getpeername", hlua_socket_getpeername); + hlua_class_function(gL.T, "getsockname", hlua_socket_getsockname); + hlua_class_function(gL.T, "setoption", hlua_socket_setoption); + hlua_class_function(gL.T, "settimeout", hlua_socket_settimeout); + + lua_settable(gL.T, -3); /* Push the last 2 entries in the table at index -3 */ + + /* Register the garbage collector entry. */ + lua_pushstring(gL.T, "__gc"); + lua_pushcclosure(gL.T, hlua_socket_gc, 0); + lua_settable(gL.T, -3); /* Push the last 2 entries in the table at index -3 */ + + /* Register previous table in the registry with reference and named entry. */ + lua_pushvalue(gL.T, -1); /* Copy the -1 entry and push it on the stack. */ + lua_pushvalue(gL.T, -1); /* Copy the -1 entry and push it on the stack. */ + lua_setfield(gL.T, LUA_REGISTRYINDEX, CLASS_SOCKET); /* register class socket. */ + class_socket_ref = luaL_ref(gL.T, LUA_REGISTRYINDEX); /* reference class socket. */ + + /* Proxy and server configuration initialisation. */ + memset(&socket_proxy, 0, sizeof(socket_proxy)); + init_new_proxy(&socket_proxy); + socket_proxy.parent = NULL; + socket_proxy.last_change = now.tv_sec; + socket_proxy.id = "LUA-SOCKET"; + socket_proxy.cap = PR_CAP_FE | PR_CAP_BE; + socket_proxy.maxconn = 0; + socket_proxy.accept = NULL; + socket_proxy.options2 |= PR_O2_INDEPSTR; + socket_proxy.srv = NULL; + socket_proxy.conn_retries = 0; + socket_proxy.timeout.connect = 5000; /* By default the timeout connection is 5s. */ + + /* Init TCP server: unchanged parameters */ + memset(&socket_tcp, 0, sizeof(socket_tcp)); + socket_tcp.next = NULL; + socket_tcp.proxy = &socket_proxy; + socket_tcp.obj_type = OBJ_TYPE_SERVER; + LIST_INIT(&socket_tcp.actconns); + LIST_INIT(&socket_tcp.pendconns); + socket_tcp.state = SRV_ST_RUNNING; /* early server setup */ + socket_tcp.last_change = 0; + socket_tcp.id = "LUA-TCP-CONN"; + socket_tcp.check.state &= ~CHK_ST_ENABLED; /* Disable health checks. */ + socket_tcp.agent.state &= ~CHK_ST_ENABLED; /* Disable health checks. */ + socket_tcp.pp_opts = 0; /* Remove proxy protocol. */ + + /* XXX: Copy default parameter from default server, + * but the default server is not initialized. + */ + socket_tcp.maxqueue = socket_proxy.defsrv.maxqueue; + socket_tcp.minconn = socket_proxy.defsrv.minconn; + socket_tcp.maxconn = socket_proxy.defsrv.maxconn; + socket_tcp.slowstart = socket_proxy.defsrv.slowstart; + socket_tcp.onerror = socket_proxy.defsrv.onerror; + socket_tcp.onmarkeddown = socket_proxy.defsrv.onmarkeddown; + socket_tcp.onmarkedup = socket_proxy.defsrv.onmarkedup; + socket_tcp.consecutive_errors_limit = socket_proxy.defsrv.consecutive_errors_limit; + socket_tcp.uweight = socket_proxy.defsrv.iweight; + socket_tcp.iweight = socket_proxy.defsrv.iweight; + + socket_tcp.check.status = HCHK_STATUS_INI; + socket_tcp.check.rise = socket_proxy.defsrv.check.rise; + socket_tcp.check.fall = socket_proxy.defsrv.check.fall; + socket_tcp.check.health = socket_tcp.check.rise; /* socket, but will fall down at first failure */ + socket_tcp.check.server = &socket_tcp; + + socket_tcp.agent.status = HCHK_STATUS_INI; + socket_tcp.agent.rise = socket_proxy.defsrv.agent.rise; + socket_tcp.agent.fall = socket_proxy.defsrv.agent.fall; + socket_tcp.agent.health = socket_tcp.agent.rise; /* socket, but will fall down at first failure */ + socket_tcp.agent.server = &socket_tcp; + + socket_tcp.xprt = &raw_sock; + +#ifdef USE_OPENSSL + + char *args[4]; + struct srv_kw *kw; + int tmp_error; + char *error; + + /* Init TCP server: unchanged parameters */ + memset(&socket_ssl, 0, sizeof(socket_ssl)); + socket_ssl.next = NULL; + socket_ssl.proxy = &socket_proxy; + socket_ssl.obj_type = OBJ_TYPE_SERVER; + LIST_INIT(&socket_ssl.actconns); + LIST_INIT(&socket_ssl.pendconns); + socket_ssl.state = SRV_ST_RUNNING; /* early server setup */ + socket_ssl.last_change = 0; + socket_ssl.id = "LUA-SSL-CONN"; + socket_ssl.check.state &= ~CHK_ST_ENABLED; /* Disable health checks. */ + socket_ssl.agent.state &= ~CHK_ST_ENABLED; /* Disable health checks. */ + socket_ssl.pp_opts = 0; /* Remove proxy protocol. */ + + /* XXX: Copy default parameter from default server, + * but the default server is not initialized. + */ + socket_ssl.maxqueue = socket_proxy.defsrv.maxqueue; + socket_ssl.minconn = socket_proxy.defsrv.minconn; + socket_ssl.maxconn = socket_proxy.defsrv.maxconn; + socket_ssl.slowstart = socket_proxy.defsrv.slowstart; + socket_ssl.onerror = socket_proxy.defsrv.onerror; + socket_ssl.onmarkeddown = socket_proxy.defsrv.onmarkeddown; + socket_ssl.onmarkedup = socket_proxy.defsrv.onmarkedup; + socket_ssl.consecutive_errors_limit = socket_proxy.defsrv.consecutive_errors_limit; + socket_ssl.uweight = socket_proxy.defsrv.iweight; + socket_ssl.iweight = socket_proxy.defsrv.iweight; + + socket_ssl.check.status = HCHK_STATUS_INI; + socket_ssl.check.rise = socket_proxy.defsrv.check.rise; + socket_ssl.check.fall = socket_proxy.defsrv.check.fall; + socket_ssl.check.health = socket_ssl.check.rise; /* socket, but will fall down at first failure */ + socket_ssl.check.server = &socket_ssl; + + socket_ssl.agent.status = HCHK_STATUS_INI; + socket_ssl.agent.rise = socket_proxy.defsrv.agent.rise; + socket_ssl.agent.fall = socket_proxy.defsrv.agent.fall; + socket_ssl.agent.health = socket_ssl.agent.rise; /* socket, but will fall down at first failure */ + socket_ssl.agent.server = &socket_ssl; + + socket_ssl.xprt = &raw_sock; + + args[0] = "ssl"; + args[1] = "verify"; + args[2] = "none"; + args[3] = NULL; + + for (idx=0; idx<3; idx++) { + if ((kw = srv_find_kw(args[idx])) != NULL) { /* Maybe it's registered server keyword */ + /* + * + * If the keyword is not known, we can search in the registered + * server keywords. This is usefull to configure special SSL + * features like client certificates and ssl_verify. + * + */ + tmp_error = kw->parse(args, &idx, &socket_proxy, &socket_ssl, &error); + if (tmp_error != 0) { + fprintf(stderr, "INTERNAL ERROR: %s\n", error); + abort(); /* This must be never arrives because the command line + not editable by the user. */ + } + idx += kw->skip; + } + } + + /* Initialize SSL server. */ + if (socket_ssl.xprt == &ssl_sock) { + socket_ssl.use_ssl = 1; + ssl_sock_prepare_srv_ctx(&socket_ssl, &socket_proxy); + } +#endif }