MINOR: applet: Implement default functions to exchange data with channels

In this patch, we add default functions to copy data from a channel to the
<inbuf> buffer of an applet (appctx_rcv_buf) and another on to copy data
from <outbuf> buffer of an applet to a channel (appctx_snd_buf).

These functions are not used for now, but they will be used by applets to
define their <rcv_buf> and <snd_buf> callback functions. Of course, it will
be possible for a specific applet to implement its own functions but these
ones should be good enough for most of applets. HTX and RAW buffers are
supported.
This commit is contained in:
Christopher Faulet 2024-01-11 09:58:46 +01:00
parent 361b81bfca
commit 525ec12305
2 changed files with 132 additions and 0 deletions

View File

@ -48,6 +48,9 @@ int appctx_finalize_startup(struct appctx *appctx, struct proxy *px, struct buff
void appctx_free_on_early_error(struct appctx *appctx); void appctx_free_on_early_error(struct appctx *appctx);
void appctx_free(struct appctx *appctx); void appctx_free(struct appctx *appctx);
size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc) static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc)
{ {
return appctx_new_on(applet, sedesc, tid); return appctx_new_on(applet, sedesc, tid);

View File

@ -16,6 +16,7 @@
#include <haproxy/api.h> #include <haproxy/api.h>
#include <haproxy/applet.h> #include <haproxy/applet.h>
#include <haproxy/channel.h> #include <haproxy/channel.h>
#include <haproxy/htx.h>
#include <haproxy/list.h> #include <haproxy/list.h>
#include <haproxy/sc_strm.h> #include <haproxy/sc_strm.h>
#include <haproxy/stconn.h> #include <haproxy/stconn.h>
@ -424,6 +425,134 @@ int appctx_buf_available(void *arg)
return 1; return 1;
} }
size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags)
{
struct appctx *appctx = __sc_appctx(sc);
size_t ret = 0;
TRACE_ENTER(APPLET_EV_RECV, appctx);
if (appctx->state & APPLET_OUTBLK_ALLOC)
goto end;
if (!count)
goto end;
if (!appctx_get_buf(appctx, &appctx->outbuf)) {
appctx->state |= APPLET_OUTBLK_ALLOC;
TRACE_STATE("waiting for appctx outbuf allocation", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
goto end;
}
if (IS_HTX_SC(sc)) {
struct htx *appctx_htx = htx_from_buf(&appctx->outbuf);
struct htx *buf_htx = NULL;
if (htx_is_empty(appctx_htx)) {
htx_to_buf(appctx_htx, &appctx->outbuf);
goto done;
}
ret = appctx_htx->data;
buf_htx = htx_from_buf(buf);
if (htx_is_empty(buf_htx) && htx_used_space(appctx_htx) <= count) {
htx_to_buf(buf_htx, buf);
htx_to_buf(appctx_htx, &appctx->outbuf);
b_xfer(buf, &appctx->outbuf, b_data(&appctx->outbuf));
goto done;
}
htx_xfer_blks(buf_htx, appctx_htx, count, HTX_BLK_UNUSED);
if (appctx_htx->flags & HTX_FL_PARSING_ERROR) {
buf_htx->flags |= HTX_FL_PARSING_ERROR;
if (htx_is_empty(buf_htx))
se_fl_set(appctx->sedesc, SE_FL_EOI);
}
else if (htx_is_empty(appctx_htx)) {
buf_htx->flags |= (appctx_htx->flags & HTX_FL_EOM);
}
buf_htx->extra = (appctx_htx->extra ? (appctx_htx->data + appctx_htx->extra) : 0);
htx_to_buf(buf_htx, buf);
htx_to_buf(appctx_htx, &appctx->inbuf);
ret -= appctx_htx->data;
}
else
ret = b_xfer(buf, &appctx->outbuf, MAX(count, b_data(&appctx->outbuf)));
done:
if (ret)
appctx->state |= APPLET_OUTBLK_FULL;
if (b_data(&appctx->outbuf)) {
se_fl_set(appctx->sedesc, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
}
else {
se_fl_clr(appctx->sedesc, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
// TODO: how to handle SE_FL_EOS/ERROR/EOI
}
end:
TRACE_LEAVE(APPLET_EV_RECV, appctx);
return ret;
}
size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags)
{
struct appctx *appctx = __sc_appctx(sc);
size_t ret = 0;
TRACE_ENTER(APPLET_EV_SEND, appctx);
if (appctx->state & (APPLET_INBLK_FULL|APPLET_INBLK_ALLOC))
goto end;
if (!count)
goto end;
if (!appctx_get_buf(appctx, &appctx->inbuf)) {
appctx->state |= APPLET_INBLK_ALLOC;
TRACE_STATE("waiting for appctx inbuf allocation", APPLET_EV_SEND|APPLET_EV_BLK, appctx);
goto end;
}
if (IS_HTX_SC(sc)) {
struct htx *appctx_htx = htx_from_buf(&appctx->inbuf);
struct htx *buf_htx = htx_from_buf(buf);
ret = buf_htx->data;
if (htx_is_empty(appctx_htx) && buf_htx->data == count) {
htx_to_buf(appctx_htx, &appctx->inbuf);
htx_to_buf(buf_htx, buf);
b_xfer(&appctx->inbuf, buf, b_data(buf));
goto done;
}
htx_xfer_blks(appctx_htx, buf_htx, count, HTX_BLK_UNUSED);
if (htx_is_empty(buf_htx)) {
appctx_htx->flags |= (buf_htx->flags & HTX_FL_EOM);
}
appctx_htx->extra = (buf_htx->extra ? (buf_htx->data + buf_htx->extra) : 0);
htx_to_buf(appctx_htx, &appctx->outbuf);
htx_to_buf(buf_htx, buf);
ret -= buf_htx->data;
}
else
ret = b_xfer(&appctx->inbuf, buf, MIN(b_room(&appctx->inbuf), count));
done:
if (ret < count) {
appctx->state |= APPLET_INBLK_FULL;
TRACE_STATE("report appctx inbuf is full", APPLET_EV_SEND|APPLET_EV_BLK, appctx);
}
end:
TRACE_LEAVE(APPLET_EV_SEND, appctx);
return ret;
}
/* Default applet handler */ /* Default applet handler */
struct task *task_run_applet(struct task *t, void *context, unsigned int state) struct task *task_run_applet(struct task *t, void *context, unsigned int state)
{ {