mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-12-02 16:21:27 +01:00
EXP: peers: Use try-lock operations when looping on the updates list
This commit is contained in:
parent
e29863934e
commit
c37b14a705
114
src/peers.c
114
src/peers.c
@ -1549,7 +1549,6 @@ int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, struct shared_tab
|
||||
int ret, new_pushed, use_timed;
|
||||
int updates_sent = 0;
|
||||
struct stksess *ts = NULL;
|
||||
struct mt_list back;
|
||||
|
||||
TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
|
||||
|
||||
@ -1562,79 +1561,69 @@ int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, struct shared_tab
|
||||
/* We force new pushed to 1 to force identifier in update message */
|
||||
new_pushed = 1;
|
||||
|
||||
MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
|
||||
if (ts == st->end) {
|
||||
ret = 1; // done
|
||||
while (1) {
|
||||
struct mt_list el1, el2;
|
||||
|
||||
if (st->last->upd.next == &st->end->upd || st->last->upd.next == &st->table->updates)
|
||||
break;
|
||||
|
||||
if (buffer_almost_full(&appctx->outbuf)) {
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL)
|
||||
continue;
|
||||
else if (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE)
|
||||
if (updates_sent >= peers_max_updates_at_once) {
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
el1 = mt_list_try_lock_full(&st->last->upd);
|
||||
if (el1.prev == NULL) {
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
el2 = mt_list_lock_next(el1.next);
|
||||
/* el2 = mt_list_try_lock_next(el1.next); */
|
||||
/* if (el2.next == NULL) { */
|
||||
/* mt_list_unlock_full(&st->last->upd, el1); */
|
||||
/* applet_have_more_data(appctx); */
|
||||
/* ret = -1; */
|
||||
/* break; */
|
||||
/* } */
|
||||
|
||||
ts = MT_LIST_ELEM(el1.next, typeof(ts), upd);
|
||||
HA_ATOMIC_INC(&ts->ref_cnt);
|
||||
|
||||
mt_list_unlock_full(&st->last->upd, el2);
|
||||
mt_list_unlock_link(el1);
|
||||
|
||||
if ((!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL) ||
|
||||
(ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE) ||
|
||||
(p->srv->shard && ts->shard != p->srv->shard) ||
|
||||
ts->updt_type == STKSESS_UPDT_MARKER) {
|
||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (st != p->last_local_table) {
|
||||
ret = peer_send_switchmsg(st, appctx);
|
||||
if (ret <= 0)
|
||||
if (ret <= 0) {
|
||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
||||
break;
|
||||
|
||||
}
|
||||
p->last_local_table = st;
|
||||
}
|
||||
|
||||
if (!_HA_ATOMIC_LOAD(&ts->seen))
|
||||
_HA_ATOMIC_STORE(&ts->seen, 1);
|
||||
|
||||
if (p->srv->shard && ts->shard != p->srv->shard)
|
||||
continue;
|
||||
|
||||
if (updates_sent >= peers_max_updates_at_once) {
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
}
|
||||
else {
|
||||
HA_ATOMIC_INC(&ts->ref_cnt);
|
||||
ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
|
||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
||||
}
|
||||
|
||||
if (ret <= 0) {
|
||||
/* Insert <last> marker before <ts> to process it again
|
||||
* on next iteration. <ts> remains locked
|
||||
*
|
||||
* Before:
|
||||
* <back>: <A> <==============> <C>
|
||||
*
|
||||
* <last> <==> ... <A> <==> x x <==> <C> ...
|
||||
*
|
||||
* x <==> TS <==> x
|
||||
*
|
||||
* <back>.prev == <A> and <back>.prev->next == MT_LIST_BUSY
|
||||
*
|
||||
* Step 1: detach <last> from the list
|
||||
* Step 2: insert <last> after <A>
|
||||
* - <last>.prev = <A> (or <back>.prev)
|
||||
* - <last>.next = MT_LIST_BUSY (or <back>.prev->next)
|
||||
* Step 3: move <A> before <last>
|
||||
* - <A>.next = <last> (or back.prev->next = <last>)
|
||||
*
|
||||
* Step 4: update <back>.prev to point on <last>
|
||||
*
|
||||
* After:
|
||||
* <back>: <last> <==============> <C>
|
||||
*
|
||||
* ... <A> <==> <last> <==> x x <==> <C> ...
|
||||
*
|
||||
* x <==> TS <==> x
|
||||
*
|
||||
*/
|
||||
MT_LIST_DELETE(&st->last->upd);
|
||||
st->last->upd.next = back.prev->next; /* == MT_LIST_BUSY */
|
||||
__ha_barrier_atomic_store();
|
||||
st->last->upd.prev = back.prev;
|
||||
back.prev->next = &st->last->upd;
|
||||
back.prev = &st->last->upd;
|
||||
ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
|
||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
||||
if (ret <= 0)
|
||||
break;
|
||||
}
|
||||
|
||||
p->last.table = st;
|
||||
p->last.id = st->update_id;
|
||||
@ -1647,12 +1636,9 @@ int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, struct shared_tab
|
||||
updates_sent++;
|
||||
}
|
||||
|
||||
if (ret == 1) {
|
||||
MT_LIST_DELETE(&st->last->upd);
|
||||
MT_LIST_APPEND(&st->end->upd, &st->last->upd);
|
||||
if (ret == 1)
|
||||
MT_LIST_DELETE(&st->end->upd);
|
||||
}
|
||||
|
||||
end:
|
||||
TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
|
||||
return ret;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user