MEDIUM: hlua: Update the socket applet to use its own buffers

Thanks to this patch, the lua cosocket applet is now using its own
buffers. .rcv_buf and .snd_buf callback functions are now defined to use the
default raw functions. Functions to receive and send data have also been
updated to use the applet API and to remove any dependencies on the
stream-connectors and the channels.
This commit is contained in:
Christopher Faulet 2025-07-18 16:53:20 +02:00
parent 91c83e7cd3
commit 4c5961c731

View File

@ -2658,36 +2658,32 @@ __LJMP static struct hlua_socket *hlua_checksocket(lua_State *L, int ud)
static void hlua_socket_handler(struct appctx *appctx)
{
struct hlua_csk_ctx *ctx = appctx->svcctx;
struct stconn *sc = appctx_sc(appctx);
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
co_skip(sc_oc(sc), co_data(sc_oc(sc)));
if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
applet_reset_input(appctx);
notification_wake(&ctx->wake_on_read);
notification_wake(&ctx->wake_on_write);
return;
}
if (ctx->die) {
se_fl_set(appctx->sedesc, SE_FL_EOI|SE_FL_EOS);
applet_set_eos(appctx);
notification_wake(&ctx->wake_on_read);
notification_wake(&ctx->wake_on_write);
return;
}
/* If we can't write, wakeup the pending write signals. */
if (channel_output_closed(sc_ic(sc)))
/* If applet is shutdown, wakeup the pending read and write signals. */
if (se_fl_test(appctx->sedesc, SE_FL_SHW)) {
notification_wake(&ctx->wake_on_write);
/* If we can't read, wakeup the pending read signals. */
if (channel_input_closed(sc_oc(sc)))
notification_wake(&ctx->wake_on_read);
}
/* if the connection is not established, inform the stream that we want
* to be notified whenever the connection completes.
*/
if (sc_opposite(sc)->state < SC_ST_EST) {
if (se_fl_test(appctx->sedesc, SE_FL_APPLET_NEED_CONN)) {
applet_need_more_data(appctx);
se_need_remote_conn(appctx->sedesc);
applet_have_more_data(appctx);
return;
}
@ -2696,11 +2692,11 @@ static void hlua_socket_handler(struct appctx *appctx)
ctx->connected = 1;
/* Wake the tasks which wants to write if the buffer have available space. */
if (channel_may_recv(sc_ic(sc)))
if (applet_get_outbuf(appctx) != NULL && applet_output_room(appctx))
notification_wake(&ctx->wake_on_write);
/* Wake the tasks which wants to read if the buffer contains data. */
if (co_data(sc_oc(sc))) {
if (applet_get_inbuf(appctx) != NULL && applet_input_data(appctx)) {
notification_wake(&ctx->wake_on_read);
applet_wont_consume(appctx);
}
@ -2727,6 +2723,7 @@ static int hlua_socket_init(struct appctx *appctx)
* with the "struct server".
*/
sc_set_state(s->scb, SC_ST_ASS);
se_need_remote_conn(appctx->sedesc);
/* Force destination server. */
s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
@ -2865,6 +2862,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
struct hlua *hlua;
struct hlua_csk_ctx *csk_ctx;
struct appctx *appctx;
struct buffer *inbuf;
size_t len;
int nblk;
const char *blk1;
@ -2872,8 +2870,6 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
const char *blk2;
size_t len2;
int skip_at_end = 0;
struct channel *oc;
struct stream *s;
struct xref *peer;
int missing_bytes;
@ -2901,12 +2897,13 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
goto connection_closed;
appctx = csk_ctx->appctx;
s = appctx_strm(appctx);
inbuf = applet_get_inbuf(appctx);
if (!inbuf)
goto connection_empty;
oc = &s->res;
if (wanted == HLSR_READ_LINE) {
/* Read line. */
nblk = co_getline_nc(oc, &blk1, &len1, &blk2, &len2);
nblk = applet_getline_nc(appctx, &blk1, &len1, &blk2, &len2);
if (nblk < 0) /* Connection close. */
goto connection_closed;
if (nblk == 0) /* No data available. */
@ -2937,7 +2934,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
else if (wanted == HLSR_READ_ALL) {
/* Read all the available data. */
nblk = co_getblk_nc(oc, &blk1, &len1, &blk2, &len2);
nblk = applet_getblk_nc(appctx, &blk1, &len1, &blk2, &len2);
if (nblk < 0) /* Connection close. */
goto connection_closed;
if (nblk == 0) /* No data available. */
@ -2946,7 +2943,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
else {
/* Read a block of data. */
nblk = co_getblk_nc(oc, &blk1, &len1, &blk2, &len2);
nblk = applet_getblk_nc(appctx, &blk1, &len1, &blk2, &len2);
if (nblk < 0) /* Connection close. */
goto connection_closed;
if (nblk == 0) /* No data available. */
@ -2969,15 +2966,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
}
/* Consume data. */
if (len + skip_at_end) {
co_skip(oc, len + skip_at_end);
oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
if (s->scb->room_needed < 0 || channel_recv_max(oc) >= s->scb->room_needed)
sc_have_room(s->scb);
sc_ep_report_send_activity(s->scf);
}
else if (!s->scb->room_needed)
sc_have_room(s->scb);
applet_skip_input(appctx, len + skip_at_end);
/* Don't wait anything. */
@ -3019,6 +3008,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
WILL_LJMP(luaL_error(L, "out of memory"));
}
xref_unlock(&socket->xref, peer);
applet_need_more_data(appctx);
MAY_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0));
return 0;
}
@ -3115,14 +3105,13 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
struct hlua *hlua;
struct hlua_csk_ctx *csk_ctx;
struct appctx *appctx;
struct buffer *outbuf;
size_t buf_len;
const char *buf;
int len;
int send_len;
int sent;
struct xref *peer;
struct stream *s;
struct stconn *sc;
/* Get hlua struct, or NULL if we execute from main lua state */
hlua = hlua_gethlua(L);
@ -3158,15 +3147,9 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
}
appctx = csk_ctx->appctx;
sc = appctx_sc(appctx);
s = __sc_strm(sc);
/* Check for connection close. */
if (channel_output_closed(&s->req)) {
xref_unlock(&socket->xref, peer);
lua_pushinteger(L, -1);
return 1;
}
outbuf = applet_get_outbuf(appctx);
if (!outbuf)
goto hlua_socket_write_yield_return;
/* Update the input buffer data. */
buf += sent;
@ -3178,24 +3161,15 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
return 1; /* Implicitly return the length sent. */
}
/* Check if the buffer is available because HAProxy doesn't allocate
* the request buffer if its not required.
*/
if (s->req.buf.size == 0) {
if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
goto hlua_socket_write_yield_return;
}
/* Check for available space. */
len = b_room(&s->req.buf);
if (len <= 0) {
len = applet_output_room(appctx);
if (!len)
goto hlua_socket_write_yield_return;
}
/* send data */
if (len < send_len)
send_len = len;
len = ci_putblk(&s->req, buf, send_len);
len = applet_putblk(appctx, buf, send_len);
/* "Not enough space" (-1), "Buffer too little to contain
* the data" (-2) are not expected because the available length
@ -3203,9 +3177,6 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
* Other unknown error are also not expected.
*/
if (len <= 0) {
if (len == -1)
s->req.flags |= CF_WAKE_WRITE;
MAY_LJMP(hlua_socket_close_helper(L));
lua_pop(L, 1);
lua_pushinteger(L, -1);
@ -3232,7 +3203,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
WILL_LJMP(luaL_error(L, "out of memory"));
}
xref_unlock(&socket->xref, peer);
sc_need_room(sc, channel_recv_max(&s->req) + 1);
applet_have_more_data(appctx);
MAY_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0));
return 0;
}
@ -3468,6 +3439,8 @@ static struct applet update_applet = {
.obj_type = OBJ_TYPE_APPLET,
.name = "<LUA_TCP>",
.fct = hlua_socket_handler,
.rcv_buf = appctx_raw_rcv_buf,
.snd_buf = appctx_raw_snd_buf,
.init = hlua_socket_init,
.release = hlua_socket_release,
};
@ -3479,7 +3452,6 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
struct xref *peer;
struct hlua_csk_ctx *csk_ctx;
struct appctx *appctx;
struct stream *s;
/* Get hlua struct, or NULL if we execute from main lua state */
hlua = hlua_gethlua(L);
@ -3502,7 +3474,6 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
csk_ctx = container_of(peer, struct hlua_csk_ctx, xref);
appctx = csk_ctx->appctx;
s = appctx_strm(appctx);
/* Check if we run on the same thread than the xreator thread.
* We cannot access to the socket if the thread is different.
@ -3513,15 +3484,13 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
}
/* Check for connection close. */
if (!hlua || channel_output_closed(&s->req)) {
if (!hlua || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
xref_unlock(&socket->xref, peer);
lua_pushnil(L);
lua_pushstring(L, "Can't connect");
return 2;
}
appctx = __sc_appctx(s->scf);
/* Check for connection established. */
if (csk_ctx->connected) {
xref_unlock(&socket->xref, peer);