MEDIUM: peers: Update the peer applet to use its own buffers

Thanks to this patch, the peer applet is now using its own buffers. .rcv_buf
and .snd_buf callback functions are now defined to use the default raw
functions. The applet API is now used and any dependencies on the
stream-connectors and the channels were removed.
This commit is contained in:
Christopher Faulet 2025-07-22 19:01:10 +02:00
parent f58ef3f281
commit 9ad435e302
2 changed files with 33 additions and 34 deletions

View File

@ -1153,12 +1153,18 @@ static int peer_get_version(const char *str,
*/
static inline int peer_getline(struct appctx *appctx)
{
struct stconn *sc = appctx_sc(appctx);
int n;
n = co_getline(sc_oc(sc), trash.area, trash.size);
if (!n)
if (applet_get_inbuf(appctx) == NULL || !applet_input_data(appctx)) {
applet_need_more_data(appctx);
return 0;
}
n = applet_getline(appctx, trash.area, trash.size);
if (!n) {
applet_need_more_data(appctx);
return 0;
}
if (n < 0 || trash.area[n - 1] != '\n') {
appctx->st0 = PEER_SESS_ST_END;
@ -1170,8 +1176,7 @@ static inline int peer_getline(struct appctx *appctx)
else
trash.area[n - 1] = 0;
co_skip(sc_oc(sc), n);
applet_skip_input(appctx, n);
return n;
}
@ -2404,10 +2409,9 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
uint32_t *msg_len, int *totl)
{
int reql;
struct stconn *sc = appctx_sc(appctx);
char *cur;
reql = co_getblk(sc_oc(sc), msg_head, 2 * sizeof(char), *totl);
reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
@ -2421,11 +2425,11 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
/* Read and Decode message length */
msg_head += *totl;
msg_head_sz -= *totl;
reql = co_data(sc_oc(sc)) - *totl;
reql = applet_input_data(appctx) - *totl;
if (reql > msg_head_sz)
reql = msg_head_sz;
reql = co_getblk(sc_oc(sc), msg_head, reql, *totl);
reql = applet_getblk(appctx, msg_head, reql, *totl);
if (reql <= 0) /* closed */
goto incomplete;
@ -2451,7 +2455,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
return -1;
}
reql = co_getblk(sc_oc(sc), trash.area, *msg_len, *totl);
reql = applet_getblk(appctx, trash.area, *msg_len, *totl);
if (reql <= 0) /* closed */
goto incomplete;
*totl += reql;
@ -2460,7 +2464,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
return 1;
incomplete:
if (reql < 0 || (sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
/* there was an error or the message was truncated */
appctx->st0 = PEER_SESS_ST_END;
return -1;
@ -2792,8 +2796,7 @@ static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer
char *p;
int reql;
struct peer *peer;
struct stream *s = appctx_strm(appctx);
struct peers *peers = strm_fe(s)->parent;
struct peers *peers = strm_fe(appctx_strm(appctx))->parent;
reql = peer_getline(appctx);
if (!reql)
@ -2897,9 +2900,6 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
*/
static void peer_io_handler(struct appctx *appctx)
{
struct stconn *sc = appctx_sc(appctx);
struct stream *s = __sc_strm(sc);
struct peers *curpeers = strm_fe(s)->parent;
struct peer *curpeer = NULL;
int reql = 0;
int repl = 0;
@ -2907,14 +2907,14 @@ static void peer_io_handler(struct appctx *appctx)
int prev_state;
int msg_done = 0;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
co_skip(sc_oc(sc), co_data(sc_oc(sc)));
if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
applet_reset_input(appctx);
goto out;
}
/* Check if the input buffer is available. */
if (sc_ib(sc)->size == 0) {
sc_need_room(sc, 0);
/* Check if the out buffer is available. */
if (!applet_get_outbuf(appctx)) {
applet_have_more_data(appctx);
goto out;
}
@ -3015,7 +3015,7 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
curpeer->last_hdshk = now_ms;
init_connected_peer(curpeer, curpeers);
init_connected_peer(curpeer, curpeer->peers);
/* switch to waiting message state */
_HA_ATOMIC_INC(&connected_peers);
@ -3054,9 +3054,7 @@ static void peer_io_handler(struct appctx *appctx)
goto switchstate;
}
}
if (sc_ic(sc)->flags & CF_WROTE_DATA)
curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
reql = peer_getline(appctx);
if (!reql)
@ -3070,11 +3068,11 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->last_hdshk = now_ms;
/* Awake main task */
task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG);
task_wakeup(curpeer->peers->sync_task, TASK_WOKEN_MSG);
/* If status code is success */
if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
init_connected_peer(curpeer, curpeers);
init_connected_peer(curpeer, curpeer->peers);
}
else {
if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
@ -3141,7 +3139,7 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->flags |= PEER_F_ALIVE;
/* skip consumed message */
co_skip(sc_oc(sc), totl);
applet_skip_input(appctx, totl);
/* make sure we don't process too many at once */
if (msg_done >= peers_max_updates_at_once)
@ -3154,7 +3152,7 @@ static void peer_io_handler(struct appctx *appctx)
send_msgs:
if (curpeer->flags & PEER_F_HEARTBEAT) {
curpeer->flags &= ~PEER_F_HEARTBEAT;
repl = peer_send_heartbeatmsg(appctx, curpeer, curpeers);
repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
if (repl == -1)
goto out;
@ -3163,7 +3161,7 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->tx_hbt++;
}
/* we get here when a peer_recv_msg() returns 0 in reql */
repl = peer_send_msgs(appctx, curpeer, curpeers);
repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
if (repl == -1)
goto out;
@ -3214,14 +3212,14 @@ static void peer_io_handler(struct appctx *appctx)
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
curpeer = NULL;
}
se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
co_skip(sc_oc(sc), co_data(sc_oc(sc)));
applet_set_eos(appctx);
applet_reset_input(appctx);
goto out;
}
}
}
out:
sc_opposite(sc)->flags |= SC_FL_RCV_ONCE;
/* sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; */
if (curpeer)
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
@ -3232,6 +3230,8 @@ static struct applet peer_applet = {
.obj_type = OBJ_TYPE_APPLET,
.name = "<PEER>", /* used for logging */
.fct = peer_io_handler,
.rcv_buf = appctx_raw_rcv_buf,
.snd_buf = appctx_raw_snd_buf,
.init = peer_session_init,
.release = peer_session_release,
};

View File

@ -499,7 +499,6 @@ static void _sink_forward_io_handler(struct appctx *appctx,
* soft_close will result in the port staying in TIME_WAIT state:
* don't abuse from soft_close!
*/
applet_set_eoi(appctx);
applet_set_eos(appctx);
/* if required, hard_close could be achieve by using SE_FL_EOS|SE_FL_ERROR