MEDIUM: peers: Update the peer applet to use its own buffers

Thanks to this patch, the peer applet is now using its own buffers. .rcv_buf
and .snd_buf callback functions are now defined to use the default raw
functions. The applet API is now used and any dependencies on the
stream-connectors and the channels were removed.
This commit is contained in:
Christopher Faulet 2025-07-22 19:01:10 +02:00
parent 576361c23e
commit a2cb0033bd
2 changed files with 33 additions and 34 deletions

View File

@ -1153,12 +1153,18 @@ static int peer_get_version(const char *str,
*/ */
static inline int peer_getline(struct appctx *appctx) static inline int peer_getline(struct appctx *appctx)
{ {
struct stconn *sc = appctx_sc(appctx);
int n; int n;
n = co_getline(sc_oc(sc), trash.area, trash.size); if (applet_get_inbuf(appctx) == NULL || !applet_input_data(appctx)) {
if (!n) applet_need_more_data(appctx);
return 0; return 0;
}
n = applet_getline(appctx, trash.area, trash.size);
if (!n) {
applet_need_more_data(appctx);
return 0;
}
if (n < 0 || trash.area[n - 1] != '\n') { if (n < 0 || trash.area[n - 1] != '\n') {
appctx->st0 = PEER_SESS_ST_END; appctx->st0 = PEER_SESS_ST_END;
@ -1170,8 +1176,7 @@ static inline int peer_getline(struct appctx *appctx)
else else
trash.area[n - 1] = 0; trash.area[n - 1] = 0;
co_skip(sc_oc(sc), n); applet_skip_input(appctx, n);
return n; return n;
} }
@ -2404,10 +2409,9 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
uint32_t *msg_len, int *totl) uint32_t *msg_len, int *totl)
{ {
int reql; int reql;
struct stconn *sc = appctx_sc(appctx);
char *cur; char *cur;
reql = co_getblk(sc_oc(sc), msg_head, 2 * sizeof(char), *totl); reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl);
if (reql <= 0) /* closed or EOL not found */ if (reql <= 0) /* closed or EOL not found */
goto incomplete; goto incomplete;
@ -2421,11 +2425,11 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
/* Read and Decode message length */ /* Read and Decode message length */
msg_head += *totl; msg_head += *totl;
msg_head_sz -= *totl; msg_head_sz -= *totl;
reql = co_data(sc_oc(sc)) - *totl; reql = applet_input_data(appctx) - *totl;
if (reql > msg_head_sz) if (reql > msg_head_sz)
reql = msg_head_sz; reql = msg_head_sz;
reql = co_getblk(sc_oc(sc), msg_head, reql, *totl); reql = applet_getblk(appctx, msg_head, reql, *totl);
if (reql <= 0) /* closed */ if (reql <= 0) /* closed */
goto incomplete; goto incomplete;
@ -2451,7 +2455,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
return -1; return -1;
} }
reql = co_getblk(sc_oc(sc), trash.area, *msg_len, *totl); reql = applet_getblk(appctx, trash.area, *msg_len, *totl);
if (reql <= 0) /* closed */ if (reql <= 0) /* closed */
goto incomplete; goto incomplete;
*totl += reql; *totl += reql;
@ -2460,7 +2464,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
return 1; return 1;
incomplete: incomplete:
if (reql < 0 || (sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) { if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
/* there was an error or the message was truncated */ /* there was an error or the message was truncated */
appctx->st0 = PEER_SESS_ST_END; appctx->st0 = PEER_SESS_ST_END;
return -1; return -1;
@ -2792,8 +2796,7 @@ static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer
char *p; char *p;
int reql; int reql;
struct peer *peer; struct peer *peer;
struct stream *s = appctx_strm(appctx); struct peers *peers = strm_fe(appctx_strm(appctx))->parent;
struct peers *peers = strm_fe(s)->parent;
reql = peer_getline(appctx); reql = peer_getline(appctx);
if (!reql) if (!reql)
@ -2897,9 +2900,6 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
*/ */
static void peer_io_handler(struct appctx *appctx) static void peer_io_handler(struct appctx *appctx)
{ {
struct stconn *sc = appctx_sc(appctx);
struct stream *s = __sc_strm(sc);
struct peers *curpeers = strm_fe(s)->parent;
struct peer *curpeer = NULL; struct peer *curpeer = NULL;
int reql = 0; int reql = 0;
int repl = 0; int repl = 0;
@ -2907,14 +2907,14 @@ static void peer_io_handler(struct appctx *appctx)
int prev_state; int prev_state;
int msg_done = 0; int msg_done = 0;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) { if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
co_skip(sc_oc(sc), co_data(sc_oc(sc))); applet_reset_input(appctx);
goto out; goto out;
} }
/* Check if the input buffer is available. */ /* Check if the out buffer is available. */
if (sc_ib(sc)->size == 0) { if (!applet_get_outbuf(appctx)) {
sc_need_room(sc, 0); applet_have_more_data(appctx);
goto out; goto out;
} }
@ -3015,7 +3015,7 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE; curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
curpeer->last_hdshk = now_ms; curpeer->last_hdshk = now_ms;
init_connected_peer(curpeer, curpeers); init_connected_peer(curpeer, curpeer->peers);
/* switch to waiting message state */ /* switch to waiting message state */
_HA_ATOMIC_INC(&connected_peers); _HA_ATOMIC_INC(&connected_peers);
@ -3054,9 +3054,7 @@ static void peer_io_handler(struct appctx *appctx)
goto switchstate; goto switchstate;
} }
} }
curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
if (sc_ic(sc)->flags & CF_WROTE_DATA)
curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
reql = peer_getline(appctx); reql = peer_getline(appctx);
if (!reql) if (!reql)
@ -3070,11 +3068,11 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->last_hdshk = now_ms; curpeer->last_hdshk = now_ms;
/* Awake main task */ /* Awake main task */
task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG); task_wakeup(curpeer->peers->sync_task, TASK_WOKEN_MSG);
/* If status code is success */ /* If status code is success */
if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) { if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
init_connected_peer(curpeer, curpeers); init_connected_peer(curpeer, curpeer->peers);
} }
else { else {
if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION) if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
@ -3141,7 +3139,7 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->flags |= PEER_F_ALIVE; curpeer->flags |= PEER_F_ALIVE;
/* skip consumed message */ /* skip consumed message */
co_skip(sc_oc(sc), totl); applet_skip_input(appctx, totl);
/* make sure we don't process too many at once */ /* make sure we don't process too many at once */
if (msg_done >= peers_max_updates_at_once) if (msg_done >= peers_max_updates_at_once)
@ -3154,7 +3152,7 @@ static void peer_io_handler(struct appctx *appctx)
send_msgs: send_msgs:
if (curpeer->flags & PEER_F_HEARTBEAT) { if (curpeer->flags & PEER_F_HEARTBEAT) {
curpeer->flags &= ~PEER_F_HEARTBEAT; curpeer->flags &= ~PEER_F_HEARTBEAT;
repl = peer_send_heartbeatmsg(appctx, curpeer, curpeers); repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers);
if (repl <= 0) { if (repl <= 0) {
if (repl == -1) if (repl == -1)
goto out; goto out;
@ -3163,7 +3161,7 @@ static void peer_io_handler(struct appctx *appctx)
curpeer->tx_hbt++; curpeer->tx_hbt++;
} }
/* we get here when a peer_recv_msg() returns 0 in reql */ /* we get here when a peer_recv_msg() returns 0 in reql */
repl = peer_send_msgs(appctx, curpeer, curpeers); repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
if (repl <= 0) { if (repl <= 0) {
if (repl == -1) if (repl == -1)
goto out; goto out;
@ -3214,14 +3212,14 @@ static void peer_io_handler(struct appctx *appctx)
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
curpeer = NULL; curpeer = NULL;
} }
se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI); applet_set_eos(appctx);
co_skip(sc_oc(sc), co_data(sc_oc(sc))); applet_reset_input(appctx);
goto out; goto out;
} }
} }
} }
out: out:
sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; /* sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; */
if (curpeer) if (curpeer)
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
@ -3232,6 +3230,8 @@ static struct applet peer_applet = {
.obj_type = OBJ_TYPE_APPLET, .obj_type = OBJ_TYPE_APPLET,
.name = "<PEER>", /* used for logging */ .name = "<PEER>", /* used for logging */
.fct = peer_io_handler, .fct = peer_io_handler,
.rcv_buf = appctx_raw_rcv_buf,
.snd_buf = appctx_raw_snd_buf,
.init = peer_session_init, .init = peer_session_init,
.release = peer_session_release, .release = peer_session_release,
}; };

View File

@ -499,7 +499,6 @@ static void _sink_forward_io_handler(struct appctx *appctx,
* soft_close will result in the port staying in TIME_WAIT state: * soft_close will result in the port staying in TIME_WAIT state:
* don't abuse from soft_close! * don't abuse from soft_close!
*/ */
applet_set_eoi(appctx);
applet_set_eos(appctx); applet_set_eos(appctx);
/* if required, hard_close could be achieve by using SE_FL_EOS|SE_FL_ERROR /* if required, hard_close could be achieve by using SE_FL_EOS|SE_FL_ERROR