MEDIUM: hlua: Update the socket applet to use its own buffers

Thanks to this patch, the lua cosocket applet is now using its own
buffers. .rcv_buf and .snd_buf callback functions are now defined to use the
default raw functions. Functions to receive and send data have also been
updated to use the applet API and to remove any dependencies on the
stream-connectors and the channels.
This commit is contained in:
Christopher Faulet 2025-07-18 16:53:20 +02:00
parent 7e96ff6b84
commit e542d2dfaa

View File

@ -2658,36 +2658,32 @@ __LJMP static struct hlua_socket *hlua_checksocket(lua_State *L, int ud)
static void hlua_socket_handler(struct appctx *appctx) static void hlua_socket_handler(struct appctx *appctx)
{ {
struct hlua_csk_ctx *ctx = appctx->svcctx; struct hlua_csk_ctx *ctx = appctx->svcctx;
struct stconn *sc = appctx_sc(appctx);
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) { if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
co_skip(sc_oc(sc), co_data(sc_oc(sc))); applet_reset_input(appctx);
notification_wake(&ctx->wake_on_read); notification_wake(&ctx->wake_on_read);
notification_wake(&ctx->wake_on_write); notification_wake(&ctx->wake_on_write);
return; return;
} }
if (ctx->die) { if (ctx->die) {
se_fl_set(appctx->sedesc, SE_FL_EOI|SE_FL_EOS); applet_set_eos(appctx);
notification_wake(&ctx->wake_on_read); notification_wake(&ctx->wake_on_read);
notification_wake(&ctx->wake_on_write); notification_wake(&ctx->wake_on_write);
return; return;
} }
/* If we can't write, wakeup the pending write signals. */ /* If applet is shutdown, wakeup the pending read and write signals. */
if (channel_output_closed(sc_ic(sc))) if (se_fl_test(appctx->sedesc, SE_FL_SHW)) {
notification_wake(&ctx->wake_on_write); notification_wake(&ctx->wake_on_write);
/* If we can't read, wakeup the pending read signals. */
if (channel_input_closed(sc_oc(sc)))
notification_wake(&ctx->wake_on_read); notification_wake(&ctx->wake_on_read);
}
/* if the connection is not established, inform the stream that we want /* if the connection is not established, inform the stream that we want
* to be notified whenever the connection completes. * to be notified whenever the connection completes.
*/ */
if (sc_opposite(sc)->state < SC_ST_EST) { if (se_fl_test(appctx->sedesc, SE_FL_APPLET_NEED_CONN)) {
applet_need_more_data(appctx); applet_need_more_data(appctx);
se_need_remote_conn(appctx->sedesc);
applet_have_more_data(appctx); applet_have_more_data(appctx);
return; return;
} }
@ -2696,11 +2692,11 @@ static void hlua_socket_handler(struct appctx *appctx)
ctx->connected = 1; ctx->connected = 1;
/* Wake the tasks which wants to write if the buffer have available space. */ /* Wake the tasks which wants to write if the buffer have available space. */
if (channel_may_recv(sc_ic(sc))) if (applet_get_outbuf(appctx) != NULL && applet_output_room(appctx))
notification_wake(&ctx->wake_on_write); notification_wake(&ctx->wake_on_write);
/* Wake the tasks which wants to read if the buffer contains data. */ /* Wake the tasks which wants to read if the buffer contains data. */
if (co_data(sc_oc(sc))) { if (applet_get_inbuf(appctx) != NULL && applet_input_data(appctx)) {
notification_wake(&ctx->wake_on_read); notification_wake(&ctx->wake_on_read);
applet_wont_consume(appctx); applet_wont_consume(appctx);
} }
@ -2727,6 +2723,7 @@ static int hlua_socket_init(struct appctx *appctx)
* with the "struct server". * with the "struct server".
*/ */
sc_set_state(s->scb, SC_ST_ASS); sc_set_state(s->scb, SC_ST_ASS);
se_need_remote_conn(appctx->sedesc);
/* Force destination server. */ /* Force destination server. */
s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED; s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
@ -2865,6 +2862,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
struct hlua *hlua; struct hlua *hlua;
struct hlua_csk_ctx *csk_ctx; struct hlua_csk_ctx *csk_ctx;
struct appctx *appctx; struct appctx *appctx;
struct buffer *inbuf;
size_t len; size_t len;
int nblk; int nblk;
const char *blk1; const char *blk1;
@ -2872,8 +2870,6 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
const char *blk2; const char *blk2;
size_t len2; size_t len2;
int skip_at_end = 0; int skip_at_end = 0;
struct channel *oc;
struct stream *s;
struct xref *peer; struct xref *peer;
int missing_bytes; int missing_bytes;
@ -2901,12 +2897,13 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
goto connection_closed; goto connection_closed;
appctx = csk_ctx->appctx; appctx = csk_ctx->appctx;
s = appctx_strm(appctx); inbuf = applet_get_inbuf(appctx);
if (!inbuf)
goto connection_empty;
oc = &s->res;
if (wanted == HLSR_READ_LINE) { if (wanted == HLSR_READ_LINE) {
/* Read line. */ /* Read line. */
nblk = co_getline_nc(oc, &blk1, &len1, &blk2, &len2); nblk = applet_getline_nc(appctx, &blk1, &len1, &blk2, &len2);
if (nblk < 0) /* Connection close. */ if (nblk < 0) /* Connection close. */
goto connection_closed; goto connection_closed;
if (nblk == 0) /* No data available. */ if (nblk == 0) /* No data available. */
@ -2937,7 +2934,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
else if (wanted == HLSR_READ_ALL) { else if (wanted == HLSR_READ_ALL) {
/* Read all the available data. */ /* Read all the available data. */
nblk = co_getblk_nc(oc, &blk1, &len1, &blk2, &len2); nblk = applet_getblk_nc(appctx, &blk1, &len1, &blk2, &len2);
if (nblk < 0) /* Connection close. */ if (nblk < 0) /* Connection close. */
goto connection_closed; goto connection_closed;
if (nblk == 0) /* No data available. */ if (nblk == 0) /* No data available. */
@ -2946,7 +2943,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
else { else {
/* Read a block of data. */ /* Read a block of data. */
nblk = co_getblk_nc(oc, &blk1, &len1, &blk2, &len2); nblk = applet_getblk_nc(appctx, &blk1, &len1, &blk2, &len2);
if (nblk < 0) /* Connection close. */ if (nblk < 0) /* Connection close. */
goto connection_closed; goto connection_closed;
if (nblk == 0) /* No data available. */ if (nblk == 0) /* No data available. */
@ -2969,15 +2966,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
} }
/* Consume data. */ /* Consume data. */
if (len + skip_at_end) { applet_skip_input(appctx, len + skip_at_end);
co_skip(oc, len + skip_at_end);
oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
if (s->scb->room_needed < 0 || channel_recv_max(oc) >= s->scb->room_needed)
sc_have_room(s->scb);
sc_ep_report_send_activity(s->scf);
}
else if (!s->scb->room_needed)
sc_have_room(s->scb);
/* Don't wait anything. */ /* Don't wait anything. */
@ -3019,6 +3008,7 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
WILL_LJMP(luaL_error(L, "out of memory")); WILL_LJMP(luaL_error(L, "out of memory"));
} }
xref_unlock(&socket->xref, peer); xref_unlock(&socket->xref, peer);
applet_need_more_data(appctx);
MAY_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0)); MAY_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0));
return 0; return 0;
} }
@ -3115,14 +3105,13 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
struct hlua *hlua; struct hlua *hlua;
struct hlua_csk_ctx *csk_ctx; struct hlua_csk_ctx *csk_ctx;
struct appctx *appctx; struct appctx *appctx;
struct buffer *outbuf;
size_t buf_len; size_t buf_len;
const char *buf; const char *buf;
int len; int len;
int send_len; int send_len;
int sent; int sent;
struct xref *peer; struct xref *peer;
struct stream *s;
struct stconn *sc;
/* Get hlua struct, or NULL if we execute from main lua state */ /* Get hlua struct, or NULL if we execute from main lua state */
hlua = hlua_gethlua(L); hlua = hlua_gethlua(L);
@ -3158,15 +3147,9 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
} }
appctx = csk_ctx->appctx; appctx = csk_ctx->appctx;
sc = appctx_sc(appctx); outbuf = applet_get_outbuf(appctx);
s = __sc_strm(sc); if (!outbuf)
goto hlua_socket_write_yield_return;
/* Check for connection close. */
if (channel_output_closed(&s->req)) {
xref_unlock(&socket->xref, peer);
lua_pushinteger(L, -1);
return 1;
}
/* Update the input buffer data. */ /* Update the input buffer data. */
buf += sent; buf += sent;
@ -3178,24 +3161,15 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
return 1; /* Implicitly return the length sent. */ return 1; /* Implicitly return the length sent. */
} }
/* Check if the buffer is available because HAProxy doesn't allocate
* the request buffer if its not required.
*/
if (s->req.buf.size == 0) {
if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
goto hlua_socket_write_yield_return;
}
/* Check for available space. */ /* Check for available space. */
len = b_room(&s->req.buf); len = applet_output_room(appctx);
if (len <= 0) { if (!len)
goto hlua_socket_write_yield_return; goto hlua_socket_write_yield_return;
}
/* send data */ /* send data */
if (len < send_len) if (len < send_len)
send_len = len; send_len = len;
len = ci_putblk(&s->req, buf, send_len); len = applet_putblk(appctx, buf, send_len);
/* "Not enough space" (-1), "Buffer too little to contain /* "Not enough space" (-1), "Buffer too little to contain
* the data" (-2) are not expected because the available length * the data" (-2) are not expected because the available length
@ -3203,9 +3177,6 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
* Other unknown error are also not expected. * Other unknown error are also not expected.
*/ */
if (len <= 0) { if (len <= 0) {
if (len == -1)
s->req.flags |= CF_WAKE_WRITE;
MAY_LJMP(hlua_socket_close_helper(L)); MAY_LJMP(hlua_socket_close_helper(L));
lua_pop(L, 1); lua_pop(L, 1);
lua_pushinteger(L, -1); lua_pushinteger(L, -1);
@ -3232,7 +3203,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
WILL_LJMP(luaL_error(L, "out of memory")); WILL_LJMP(luaL_error(L, "out of memory"));
} }
xref_unlock(&socket->xref, peer); xref_unlock(&socket->xref, peer);
sc_need_room(sc, channel_recv_max(&s->req) + 1); applet_have_more_data(appctx);
MAY_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0)); MAY_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0));
return 0; return 0;
} }
@ -3468,6 +3439,8 @@ static struct applet update_applet = {
.obj_type = OBJ_TYPE_APPLET, .obj_type = OBJ_TYPE_APPLET,
.name = "<LUA_TCP>", .name = "<LUA_TCP>",
.fct = hlua_socket_handler, .fct = hlua_socket_handler,
.rcv_buf = appctx_raw_rcv_buf,
.snd_buf = appctx_raw_snd_buf,
.init = hlua_socket_init, .init = hlua_socket_init,
.release = hlua_socket_release, .release = hlua_socket_release,
}; };
@ -3479,7 +3452,6 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
struct xref *peer; struct xref *peer;
struct hlua_csk_ctx *csk_ctx; struct hlua_csk_ctx *csk_ctx;
struct appctx *appctx; struct appctx *appctx;
struct stream *s;
/* Get hlua struct, or NULL if we execute from main lua state */ /* Get hlua struct, or NULL if we execute from main lua state */
hlua = hlua_gethlua(L); hlua = hlua_gethlua(L);
@ -3502,7 +3474,6 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
csk_ctx = container_of(peer, struct hlua_csk_ctx, xref); csk_ctx = container_of(peer, struct hlua_csk_ctx, xref);
appctx = csk_ctx->appctx; appctx = csk_ctx->appctx;
s = appctx_strm(appctx);
/* Check if we run on the same thread than the xreator thread. /* Check if we run on the same thread than the xreator thread.
* We cannot access to the socket if the thread is different. * We cannot access to the socket if the thread is different.
@ -3513,15 +3484,13 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
} }
/* Check for connection close. */ /* Check for connection close. */
if (!hlua || channel_output_closed(&s->req)) { if (!hlua || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
xref_unlock(&socket->xref, peer); xref_unlock(&socket->xref, peer);
lua_pushnil(L); lua_pushnil(L);
lua_pushstring(L, "Can't connect"); lua_pushstring(L, "Can't connect");
return 2; return 2;
} }
appctx = __sc_appctx(s->scf);
/* Check for connection established. */ /* Check for connection established. */
if (csk_ctx->connected) { if (csk_ctx->connected) {
xref_unlock(&socket->xref, peer); xref_unlock(&socket->xref, peer);