mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 23:56:57 +02:00
MINOR: peers: Add traces for peer control messages.
Display traces when sending/receiving peer control messages (synchronisation, heartbeat). Add remaining traces when parsing malformed messages (acks, stick-table definitions) or ignoring them. Also add traces when releasing session or when reaching the PEER_SESS_ST_ERRPROTO peer protocol state.
This commit is contained in:
parent
31ffe9fad0
commit
da2b0844fc
192
src/peers.c
192
src/peers.c
@ -298,6 +298,18 @@ static void peers_trace(enum trace_level level, uint64_t mask,
|
|||||||
static const struct trace_event peers_trace_events[] = {
|
static const struct trace_event peers_trace_events[] = {
|
||||||
#define PEERS_EV_UPDTMSG (1 << 0)
|
#define PEERS_EV_UPDTMSG (1 << 0)
|
||||||
{ .mask = PEERS_EV_UPDTMSG, .name = "updtmsg", .desc = "update message received" },
|
{ .mask = PEERS_EV_UPDTMSG, .name = "updtmsg", .desc = "update message received" },
|
||||||
|
#define PEERS_EV_ACKMSG (1 << 1)
|
||||||
|
{ .mask = PEERS_EV_ACKMSG, .name = "ackmsg", .desc = "ack message received" },
|
||||||
|
#define PEERS_EV_SWTCMSG (1 << 2)
|
||||||
|
{ .mask = PEERS_EV_SWTCMSG, .name = "swtcmsg", .desc = "switch message received" },
|
||||||
|
#define PEERS_EV_DEFMSG (1 << 3)
|
||||||
|
{ .mask = PEERS_EV_DEFMSG, .name = "defmsg", .desc = "definition message received" },
|
||||||
|
#define PEERS_EV_CTRLMSG (1 << 4)
|
||||||
|
{ .mask = PEERS_EV_CTRLMSG, .name = "ctrlmsg", .desc = "control message sent/received" },
|
||||||
|
#define PEERS_EV_SESSREL (1 << 5)
|
||||||
|
{ .mask = PEERS_EV_SESSREL, .name = "sessrl", .desc = "peer session releasing" },
|
||||||
|
#define PEERS_EV_PROTOERR (1 << 6)
|
||||||
|
{ .mask = PEERS_EV_PROTOERR, .name = "protoerr", .desc = "protocol error" },
|
||||||
};
|
};
|
||||||
|
|
||||||
static const struct name_desc peers_trace_lockon_args[4] = {
|
static const struct name_desc peers_trace_lockon_args[4] = {
|
||||||
@ -325,6 +337,25 @@ struct trace_source trace_peers = {
|
|||||||
.report_events = ~0, /* report everything by default */
|
.report_events = ~0, /* report everything by default */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* Return peer control message types as strings (only for debugging purpose). */
|
||||||
|
static inline char *ctrl_msg_type_str(unsigned int type)
|
||||||
|
{
|
||||||
|
switch (type) {
|
||||||
|
case PEER_MSG_CTRL_RESYNCREQ:
|
||||||
|
return "RESYNCREQ";
|
||||||
|
case PEER_MSG_CTRL_RESYNCFINISHED:
|
||||||
|
return "RESYNCFINISHED";
|
||||||
|
case PEER_MSG_CTRL_RESYNCPARTIAL:
|
||||||
|
return "RESYNCPARTIAL";
|
||||||
|
case PEER_MSG_CTRL_RESYNCCONFIRM:
|
||||||
|
return "RESYNCCONFIRM";
|
||||||
|
case PEER_MSG_CTRL_HEARTBEAT:
|
||||||
|
return "HEARTBEAT";
|
||||||
|
default:
|
||||||
|
return "???";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define TRACE_SOURCE &trace_peers
|
#define TRACE_SOURCE &trace_peers
|
||||||
INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
|
INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
|
||||||
|
|
||||||
@ -333,7 +364,7 @@ static void peers_trace(enum trace_level level, uint64_t mask,
|
|||||||
const struct ist where, const struct ist func,
|
const struct ist where, const struct ist func,
|
||||||
const void *a1, const void *a2, const void *a3, const void *a4)
|
const void *a1, const void *a2, const void *a3, const void *a4)
|
||||||
{
|
{
|
||||||
if (mask & PEERS_EV_UPDTMSG) {
|
if (mask & (PEERS_EV_UPDTMSG|PEERS_EV_ACKMSG|PEERS_EV_SWTCMSG)) {
|
||||||
if (a2) {
|
if (a2) {
|
||||||
const struct peer *peer = a2;
|
const struct peer *peer = a2;
|
||||||
|
|
||||||
@ -350,6 +381,73 @@ static void peers_trace(enum trace_level level, uint64_t mask,
|
|||||||
chunk_appendf(&trace_buf, " %llu", (unsigned long long)*val);
|
chunk_appendf(&trace_buf, " %llu", (unsigned long long)*val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mask & PEERS_EV_DEFMSG) {
|
||||||
|
if (a2) {
|
||||||
|
const struct peer *peer = a2;
|
||||||
|
|
||||||
|
chunk_appendf(&trace_buf, " peer=%s", peer->id);
|
||||||
|
}
|
||||||
|
if (a3) {
|
||||||
|
const char *p = a3;
|
||||||
|
|
||||||
|
chunk_appendf(&trace_buf, " @%p", p);
|
||||||
|
}
|
||||||
|
if (a4) {
|
||||||
|
const int *val = a4;
|
||||||
|
|
||||||
|
chunk_appendf(&trace_buf, " %d", *val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mask & PEERS_EV_CTRLMSG) {
|
||||||
|
if (a2) {
|
||||||
|
const unsigned char *ctrl_msg_type = a2;
|
||||||
|
|
||||||
|
chunk_appendf(&trace_buf, " %s", ctrl_msg_type_str(*ctrl_msg_type));
|
||||||
|
|
||||||
|
}
|
||||||
|
if (a3) {
|
||||||
|
const char *local_peer = a3;
|
||||||
|
|
||||||
|
chunk_appendf(&trace_buf, " %s", local_peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (a4) {
|
||||||
|
const char *remote_peer = a4;
|
||||||
|
|
||||||
|
chunk_appendf(&trace_buf, " -> %s", remote_peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mask & (PEERS_EV_SESSREL|PEERS_EV_PROTOERR)) {
|
||||||
|
if (a2) {
|
||||||
|
const struct peer *peer = a2;
|
||||||
|
struct peers *peers = NULL;
|
||||||
|
|
||||||
|
if (peer) {
|
||||||
|
struct stream_interface *si;
|
||||||
|
|
||||||
|
si = peer->appctx->owner;
|
||||||
|
if (si) {
|
||||||
|
struct stream *s = si_strm(si);
|
||||||
|
|
||||||
|
peers = strm_fe(s)->parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (peers)
|
||||||
|
chunk_appendf(&trace_buf, " %s", peers->local->id);
|
||||||
|
if (peer)
|
||||||
|
chunk_appendf(&trace_buf, " -> %s", peer->id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (a3) {
|
||||||
|
const int *prev_state = a3;
|
||||||
|
|
||||||
|
chunk_appendf(&trace_buf, " prev_state=%d\n", *prev_state);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char *statuscode_str(int statuscode)
|
static const char *statuscode_str(int statuscode)
|
||||||
@ -863,6 +961,7 @@ static void peer_session_release(struct appctx *appctx)
|
|||||||
{
|
{
|
||||||
struct peer *peer = appctx->ctx.peers.ptr;
|
struct peer *peer = appctx->ctx.peers.ptr;
|
||||||
|
|
||||||
|
TRACE_PROTO("releasing peer session", PEERS_EV_SESSREL, NULL, peer);
|
||||||
/* appctx->ctx.peers.ptr is not a peer session */
|
/* appctx->ctx.peers.ptr is not a peer session */
|
||||||
if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
|
if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
|
||||||
return;
|
return;
|
||||||
@ -1094,12 +1193,16 @@ static int peer_prepare_control_msg(char *msg, size_t size, struct peer_prep_par
|
|||||||
* any other negative returned value must be considered as an error with an appctx st0
|
* any other negative returned value must be considered as an error with an appctx st0
|
||||||
* returned value equal to PEER_SESS_ST_END.
|
* returned value equal to PEER_SESS_ST_END.
|
||||||
*/
|
*/
|
||||||
static inline int peer_send_resync_reqmsg(struct appctx *appctx)
|
static inline int peer_send_resync_reqmsg(struct appctx *appctx,
|
||||||
|
struct peer *peer, struct peers *peers)
|
||||||
{
|
{
|
||||||
struct peer_prep_params p = {
|
struct peer_prep_params p = {
|
||||||
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCREQ, },
|
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCREQ, },
|
||||||
};
|
};
|
||||||
|
|
||||||
|
TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &p.control.head[1], peers->local->id, peer->id);
|
||||||
|
|
||||||
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1110,12 +1213,16 @@ static inline int peer_send_resync_reqmsg(struct appctx *appctx)
|
|||||||
* any other negative returned value must be considered as an error with an appctx st0
|
* any other negative returned value must be considered as an error with an appctx st0
|
||||||
* returned value equal to PEER_SESS_ST_END.
|
* returned value equal to PEER_SESS_ST_END.
|
||||||
*/
|
*/
|
||||||
static inline int peer_send_resync_confirmsg(struct appctx *appctx)
|
static inline int peer_send_resync_confirmsg(struct appctx *appctx,
|
||||||
|
struct peer *peer, struct peers *peers)
|
||||||
{
|
{
|
||||||
struct peer_prep_params p = {
|
struct peer_prep_params p = {
|
||||||
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCCONFIRM, },
|
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCCONFIRM, },
|
||||||
};
|
};
|
||||||
|
|
||||||
|
TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &p.control.head[1], peers->local->id, peer->id);
|
||||||
|
|
||||||
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1126,7 +1233,8 @@ static inline int peer_send_resync_confirmsg(struct appctx *appctx)
|
|||||||
* any other negative returned value must be considered as an error with an appctx st0
|
* any other negative returned value must be considered as an error with an appctx st0
|
||||||
* returned value equal to PEER_SESS_ST_END.
|
* returned value equal to PEER_SESS_ST_END.
|
||||||
*/
|
*/
|
||||||
static inline int peer_send_resync_finishedmsg(struct appctx *appctx, struct peers *peers)
|
static inline int peer_send_resync_finishedmsg(struct appctx *appctx,
|
||||||
|
struct peer *peer, struct peers *peers)
|
||||||
{
|
{
|
||||||
struct peer_prep_params p = {
|
struct peer_prep_params p = {
|
||||||
.control.head = { PEER_MSG_CLASS_CONTROL, },
|
.control.head = { PEER_MSG_CLASS_CONTROL, },
|
||||||
@ -1135,6 +1243,9 @@ static inline int peer_send_resync_finishedmsg(struct appctx *appctx, struct pee
|
|||||||
p.control.head[1] = (peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ?
|
p.control.head[1] = (peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ?
|
||||||
PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL;
|
PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL;
|
||||||
|
|
||||||
|
TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &p.control.head[1], peers->local->id, peer->id);
|
||||||
|
|
||||||
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1145,12 +1256,16 @@ static inline int peer_send_resync_finishedmsg(struct appctx *appctx, struct pee
|
|||||||
* any other negative returned value must be considered as an error with an appctx st0
|
* any other negative returned value must be considered as an error with an appctx st0
|
||||||
* returned value equal to PEER_SESS_ST_END.
|
* returned value equal to PEER_SESS_ST_END.
|
||||||
*/
|
*/
|
||||||
static inline int peer_send_heartbeatmsg(struct appctx *appctx)
|
static inline int peer_send_heartbeatmsg(struct appctx *appctx,
|
||||||
|
struct peer *peer, struct peers *peers)
|
||||||
{
|
{
|
||||||
struct peer_prep_params p = {
|
struct peer_prep_params p = {
|
||||||
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, },
|
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, },
|
||||||
};
|
};
|
||||||
|
|
||||||
|
TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &p.control.head[1], peers->local->id, peer->id);
|
||||||
|
|
||||||
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1698,6 +1813,9 @@ static inline int peer_treat_ackmsg(struct appctx *appctx, struct peer *p,
|
|||||||
table_id = intdecode(msg_cur, msg_end);
|
table_id = intdecode(msg_cur, msg_end);
|
||||||
if (!*msg_cur || (*msg_cur + sizeof(update) > msg_end)) {
|
if (!*msg_cur || (*msg_cur + sizeof(update) > msg_end)) {
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
|
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_ACKMSG,
|
||||||
|
NULL, p, *msg_cur);
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1731,6 +1849,7 @@ static inline int peer_treat_switchmsg(struct appctx *appctx, struct peer *p,
|
|||||||
|
|
||||||
table_id = intdecode(msg_cur, msg_end);
|
table_id = intdecode(msg_cur, msg_end);
|
||||||
if (!*msg_cur) {
|
if (!*msg_cur) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_SWTCMSG, NULL, p);
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
return 0;
|
return 0;
|
||||||
@ -1768,16 +1887,22 @@ static inline int peer_treat_definemsg(struct appctx *appctx, struct peer *p,
|
|||||||
uint64_t table_data;
|
uint64_t table_data;
|
||||||
|
|
||||||
table_id = intdecode(msg_cur, msg_end);
|
table_id = intdecode(msg_cur, msg_end);
|
||||||
if (!*msg_cur)
|
if (!*msg_cur) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
|
||||||
goto malformed_exit;
|
goto malformed_exit;
|
||||||
|
}
|
||||||
|
|
||||||
table_id_len = intdecode(msg_cur, msg_end);
|
table_id_len = intdecode(msg_cur, msg_end);
|
||||||
if (!*msg_cur)
|
if (!*msg_cur) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p, *msg_cur);
|
||||||
goto malformed_exit;
|
goto malformed_exit;
|
||||||
|
}
|
||||||
|
|
||||||
p->remote_table = NULL;
|
p->remote_table = NULL;
|
||||||
if (!table_id_len || (*msg_cur + table_id_len) >= msg_end)
|
if (!table_id_len || (*msg_cur + table_id_len) >= msg_end) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p, *msg_cur, &table_id_len);
|
||||||
goto malformed_exit;
|
goto malformed_exit;
|
||||||
|
}
|
||||||
|
|
||||||
for (st = p->tables; st; st = st->next) {
|
for (st = p->tables; st; st = st->next) {
|
||||||
/* Reset IDs */
|
/* Reset IDs */
|
||||||
@ -1789,28 +1914,39 @@ static inline int peer_treat_definemsg(struct appctx *appctx, struct peer *p,
|
|||||||
p->remote_table = st;
|
p->remote_table = st;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!p->remote_table)
|
if (!p->remote_table) {
|
||||||
|
TRACE_PROTO("ignored message", PEERS_EV_DEFMSG, NULL, p);
|
||||||
goto ignore_msg;
|
goto ignore_msg;
|
||||||
|
}
|
||||||
|
|
||||||
*msg_cur += table_id_len;
|
*msg_cur += table_id_len;
|
||||||
if (*msg_cur >= msg_end)
|
if (*msg_cur >= msg_end) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
|
||||||
goto malformed_exit;
|
goto malformed_exit;
|
||||||
|
}
|
||||||
|
|
||||||
table_type = intdecode(msg_cur, msg_end);
|
table_type = intdecode(msg_cur, msg_end);
|
||||||
if (!*msg_cur)
|
if (!*msg_cur) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
|
||||||
goto malformed_exit;
|
goto malformed_exit;
|
||||||
|
}
|
||||||
|
|
||||||
table_keylen = intdecode(msg_cur, msg_end);
|
table_keylen = intdecode(msg_cur, msg_end);
|
||||||
if (!*msg_cur)
|
if (!*msg_cur) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
|
||||||
goto malformed_exit;
|
goto malformed_exit;
|
||||||
|
}
|
||||||
|
|
||||||
table_data = intdecode(msg_cur, msg_end);
|
table_data = intdecode(msg_cur, msg_end);
|
||||||
if (!*msg_cur)
|
if (!*msg_cur) {
|
||||||
|
TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
|
||||||
goto malformed_exit;
|
goto malformed_exit;
|
||||||
|
}
|
||||||
|
|
||||||
if (p->remote_table->table->type != peer_int_key_type[table_type]
|
if (p->remote_table->table->type != peer_int_key_type[table_type]
|
||||||
|| p->remote_table->table->key_size != table_keylen) {
|
|| p->remote_table->table->key_size != table_keylen) {
|
||||||
p->remote_table = NULL;
|
p->remote_table = NULL;
|
||||||
|
TRACE_PROTO("ignored message", PEERS_EV_DEFMSG, NULL, p);
|
||||||
goto ignore_msg;
|
goto ignore_msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1925,6 +2061,8 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
struct shared_table *st;
|
struct shared_table *st;
|
||||||
/* Reset message: remote need resync */
|
/* Reset message: remote need resync */
|
||||||
|
|
||||||
|
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &msg_head[1], peers->local->id, peer->id);
|
||||||
/* prepare tables for a global push */
|
/* prepare tables for a global push */
|
||||||
for (st = peer->tables; st; st = st->next) {
|
for (st = peer->tables; st; st = st->next) {
|
||||||
st->teaching_origin = st->last_pushed = st->table->update;
|
st->teaching_origin = st->last_pushed = st->table->update;
|
||||||
@ -1938,6 +2076,8 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
peer->flags |= PEER_F_TEACH_PROCESS;
|
peer->flags |= PEER_F_TEACH_PROCESS;
|
||||||
}
|
}
|
||||||
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
|
||||||
|
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &msg_head[1], peers->local->id, peer->id);
|
||||||
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
||||||
peer->flags &= ~PEER_F_LEARN_ASSIGN;
|
peer->flags &= ~PEER_F_LEARN_ASSIGN;
|
||||||
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
||||||
@ -1946,6 +2086,8 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
peer->confirm++;
|
peer->confirm++;
|
||||||
}
|
}
|
||||||
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
|
||||||
|
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &msg_head[1], peers->local->id, peer->id);
|
||||||
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
||||||
peer->flags &= ~PEER_F_LEARN_ASSIGN;
|
peer->flags &= ~PEER_F_LEARN_ASSIGN;
|
||||||
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
||||||
@ -1959,6 +2101,8 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
|
||||||
struct shared_table *st;
|
struct shared_table *st;
|
||||||
|
|
||||||
|
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &msg_head[1], peers->local->id, peer->id);
|
||||||
/* If stopping state */
|
/* If stopping state */
|
||||||
if (stopping) {
|
if (stopping) {
|
||||||
/* Close session, push resync no more needed */
|
/* Close session, push resync no more needed */
|
||||||
@ -1975,6 +2119,8 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
peer->flags &= PEER_TEACH_RESET;
|
peer->flags &= PEER_TEACH_RESET;
|
||||||
}
|
}
|
||||||
else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
|
else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
|
||||||
|
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
||||||
|
NULL, &msg_head[1], peers->local->id, peer->id);
|
||||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||||
peer->rx_hbt++;
|
peer->rx_hbt++;
|
||||||
}
|
}
|
||||||
@ -2021,19 +2167,19 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
* -1 means an internal error occurred, 0 is for a peer protocol error leading
|
* -1 means an internal error occurred, 0 is for a peer protocol error leading
|
||||||
* to a peer state change (from the peer I/O handler point of view).
|
* to a peer state change (from the peer I/O handler point of view).
|
||||||
*/
|
*/
|
||||||
static inline int peer_send_msgs(struct appctx *appctx, struct peer *peer)
|
static inline int peer_send_msgs(struct appctx *appctx,
|
||||||
|
struct peer *peer, struct peers *peers)
|
||||||
{
|
{
|
||||||
int repl;
|
int repl;
|
||||||
struct stream_interface *si = appctx->owner;
|
struct stream_interface *si = appctx->owner;
|
||||||
struct stream *s = si_strm(si);
|
struct stream *s = si_strm(si);
|
||||||
struct peers *peers = strm_fe(s)->parent;
|
|
||||||
|
|
||||||
/* Need to request a resync */
|
/* Need to request a resync */
|
||||||
if ((peer->flags & PEER_F_LEARN_ASSIGN) &&
|
if ((peer->flags & PEER_F_LEARN_ASSIGN) &&
|
||||||
(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
||||||
!(peers->flags & PEERS_F_RESYNC_PROCESS)) {
|
!(peers->flags & PEERS_F_RESYNC_PROCESS)) {
|
||||||
|
|
||||||
repl = peer_send_resync_reqmsg(appctx);
|
repl = peer_send_resync_reqmsg(appctx, peer, peers);
|
||||||
if (repl <= 0)
|
if (repl <= 0)
|
||||||
return repl;
|
return repl;
|
||||||
|
|
||||||
@ -2097,7 +2243,7 @@ static inline int peer_send_msgs(struct appctx *appctx, struct peer *peer)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) {
|
if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) {
|
||||||
repl = peer_send_resync_finishedmsg(appctx, peers);
|
repl = peer_send_resync_finishedmsg(appctx, peer, peers);
|
||||||
if (repl <= 0)
|
if (repl <= 0)
|
||||||
return repl;
|
return repl;
|
||||||
|
|
||||||
@ -2107,7 +2253,7 @@ static inline int peer_send_msgs(struct appctx *appctx, struct peer *peer)
|
|||||||
|
|
||||||
/* Confirm finished or partial messages */
|
/* Confirm finished or partial messages */
|
||||||
while (peer->confirm) {
|
while (peer->confirm) {
|
||||||
repl = peer_send_resync_confirmsg(appctx);
|
repl = peer_send_resync_confirmsg(appctx, peer, peers);
|
||||||
if (repl <= 0)
|
if (repl <= 0)
|
||||||
return repl;
|
return repl;
|
||||||
|
|
||||||
@ -2530,7 +2676,7 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
send_msgs:
|
send_msgs:
|
||||||
if (curpeer->flags & PEER_F_HEARTBEAT) {
|
if (curpeer->flags & PEER_F_HEARTBEAT) {
|
||||||
curpeer->flags &= ~PEER_F_HEARTBEAT;
|
curpeer->flags &= ~PEER_F_HEARTBEAT;
|
||||||
repl = peer_send_heartbeatmsg(appctx);
|
repl = peer_send_heartbeatmsg(appctx, curpeer, curpeers);
|
||||||
if (repl <= 0) {
|
if (repl <= 0) {
|
||||||
if (repl == -1)
|
if (repl == -1)
|
||||||
goto out;
|
goto out;
|
||||||
@ -2539,7 +2685,7 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
curpeer->tx_hbt++;
|
curpeer->tx_hbt++;
|
||||||
}
|
}
|
||||||
/* we get here when a peer_recv_msg() returns 0 in reql */
|
/* we get here when a peer_recv_msg() returns 0 in reql */
|
||||||
repl = peer_send_msgs(appctx, curpeer);
|
repl = peer_send_msgs(appctx, curpeer, curpeers);
|
||||||
if (repl <= 0) {
|
if (repl <= 0) {
|
||||||
if (repl == -1)
|
if (repl == -1)
|
||||||
goto out;
|
goto out;
|
||||||
@ -2567,13 +2713,17 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
case PEER_SESS_ST_ERRPROTO: {
|
case PEER_SESS_ST_ERRPROTO: {
|
||||||
|
TRACE_PROTO("protocol error", PEERS_EV_PROTOERR,
|
||||||
|
NULL, curpeer, &prev_state);
|
||||||
if (curpeer)
|
if (curpeer)
|
||||||
curpeer->proto_err++;
|
curpeer->proto_err++;
|
||||||
if (prev_state == PEER_SESS_ST_WAITMSG)
|
if (prev_state == PEER_SESS_ST_WAITMSG)
|
||||||
_HA_ATOMIC_SUB(&connected_peers, 1);
|
_HA_ATOMIC_SUB(&connected_peers, 1);
|
||||||
prev_state = appctx->st0;
|
prev_state = appctx->st0;
|
||||||
if (peer_send_error_protomsg(appctx) == -1)
|
if (peer_send_error_protomsg(appctx) == -1) {
|
||||||
|
TRACE_PROTO("could not send error message", PEERS_EV_PROTOERR);
|
||||||
goto out;
|
goto out;
|
||||||
|
}
|
||||||
appctx->st0 = PEER_SESS_ST_END;
|
appctx->st0 = PEER_SESS_ST_END;
|
||||||
prev_state = appctx->st0;
|
prev_state = appctx->st0;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user