diff --git a/include/proto/peers.h b/include/proto/peers.h index 27b3875bb..782b66e4d 100644 --- a/include/proto/peers.h +++ b/include/proto/peers.h @@ -28,6 +28,7 @@ #include #include +void peers_init_sync(struct peers *peers); void peers_register_table(struct peers *, struct stktable *table); void peers_setup_frontend(struct proxy *fe); diff --git a/include/types/peers.h b/include/types/peers.h index fbe2506c8..320e128c8 100644 --- a/include/types/peers.h +++ b/include/types/peers.h @@ -35,40 +35,23 @@ #include #include -struct peer_session { - struct shared_table *table; /* shared table */ - struct peer *peer; /* current peer */ - struct stream *stream; /* current transport stream */ - struct appctx *appctx; /* the appctx running it */ - unsigned int flags; /* peer session flags */ - unsigned int statuscode; /* current/last session status code */ - unsigned int update; /* current peer acked update */ - unsigned int pushack; /* last commited update to ack */ - unsigned int lastack; /* last acked update */ - unsigned int lastpush; /* last pushed update */ - unsigned int confirm; /* confirm message counter */ - unsigned int pushed; /* equal to last pushed data or to table local update in case of total push - * or to teaching_origin if teaching is ended */ - unsigned int reconnect; /* next connect timer */ - unsigned int teaching_origin; /* resync teaching origine update */ - struct peer_session *next; -}; - struct shared_table { struct stktable *table; /* stick table to sync */ - struct task *sync_task; /* main sync task */ - struct sig_handler *sighandler; /* signal handler */ - struct peer_session *local_session; /* local peer session */ - struct peer_session *sessions; /* peer sessions list */ - unsigned int flags; /* current table resync state */ - unsigned int resync_timeout; /* resync timeout timer */ + int local_id; + int remote_id; + int flags; + uint64_t remote_data; + unsigned int last_acked; + unsigned int last_pushed; + unsigned int last_get; + unsigned int teaching_origin; + unsigned int update; struct shared_table *next; /* next shared table in list */ }; struct peer { int local; /* proxy state */ char *id; - struct peers *peers; struct { const char *file; /* file where the section appears */ int line; /* line where the section appears */ @@ -78,6 +61,15 @@ struct peer { struct protocol *proto; /* peer address protocol */ struct xprt_ops *xprt; /* peer socket operations at transport layer */ void *sock_init_arg; /* socket operations's opaque init argument if needed */ + unsigned int flags; /* peer session flags */ + unsigned int statuscode; /* current/last session status code */ + unsigned int reconnect; /* next connect timer */ + unsigned int confirm; /* confirm message counter */ + struct stream *stream; /* current transport stream */ + struct appctx *appctx; /* the appctx running it */ + struct shared_table *remote_table; + struct shared_table *last_local_table; + struct shared_table *tables; struct peer *next; /* next peer in the list */ }; @@ -85,15 +77,19 @@ struct peer { struct peers { int state; /* proxy state */ char *id; /* peer section name */ + struct task *sync_task; /* main sync task */ + struct sig_handler *sighandler; /* signal handler */ struct peer *remote; /* remote peers list */ + struct peer *local; /* local peer list */ struct proxy *peers_fe; /* peer frontend */ struct { const char *file; /* file where the section appears */ int line; /* line where the section appears */ } conf; /* config information */ - struct shared_table *tables; /* registered shared tables */ time_t last_change; struct peers *next; /* next peer section */ + unsigned int flags; /* current peers section resync state */ + unsigned int resync_timeout; /* resync timeout timer */ int count; /* total of peers */ }; diff --git a/src/cfgparse.c b/src/cfgparse.c index bb3118859..6d89b3d9c 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -1986,7 +1986,6 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm) curpeers->count++; newpeer->next = curpeers->remote; curpeers->remote = newpeer; - newpeer->peers = curpeers; newpeer->conf.file = strdup(file); newpeer->conf.line = linenum; @@ -2030,6 +2029,7 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm) if (strcmp(newpeer->id, localpeer) == 0) { /* Current is local peer, it define a frontend */ newpeer->local = 1; + peers->local = newpeer; if (!curpeers->peers_fe) { if ((curpeers->peers_fe = calloc(1, sizeof(struct proxy))) == NULL) { @@ -8397,6 +8397,7 @@ int check_config_validity() curpeers->peers_fe = NULL; } else { + peers_init_sync(curpeers); last = &curpeers->next; continue; } diff --git a/src/peers.c b/src/peers.c index 468a96dc7..317492922 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1,5 +1,5 @@ /* - * Stick table synchro management. + * Peer synchro management. * * Copyright 2010 EXCELIANCE, Emeric Brun * @@ -52,34 +52,79 @@ /*******************************/ /******************************/ -/* Current table resync state */ +/* Current peers section resync state */ /******************************/ -#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */ -#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */ -#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */ -#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */ -#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop +#define PEERS_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */ +#define PEERS_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */ +#define PEERS_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */ +#define PEERS_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */ +#define PEERS_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop to push data to new process */ -#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE) -#define SHTABLE_RESYNC_FROMLOCAL 0x00000000 -#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL -#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE) +#define PEERS_RESYNC_STATEMASK (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE) +#define PEERS_RESYNC_FROMLOCAL 0x00000000 +#define PEERS_RESYNC_FROMREMOTE PEERS_F_RESYNC_LOCAL +#define PEERS_RESYNC_FINISHED (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE) + +/***********************************/ +/* Current shared table sync state */ +/***********************************/ +#define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */ +#define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */ /******************************/ /* Remote peer teaching state */ /******************************/ #define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */ -#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */ -#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */ #define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */ #define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */ #define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */ #define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */ -#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */ +#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */ #define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE) +/*****************************/ +/* Sync message class */ +/*****************************/ +enum { + PEER_MSG_CLASS_CONTROL = 0, + PEER_MSG_CLASS_ERROR, + PEER_MSG_CLASS_STICKTABLE = 10, + PEER_MSG_CLASS_RESERVED = 255, +}; + +/*****************************/ +/* control message types */ +/*****************************/ +enum { + PEER_MSG_CTRL_RESYNCREQ = 0, + PEER_MSG_CTRL_RESYNCFINISHED, + PEER_MSG_CTRL_RESYNCPARTIAL, + PEER_MSG_CTRL_RESYNCCONFIRM, +}; + +/*****************************/ +/* error message types */ +/*****************************/ +enum { + PEER_MSG_ERR_PROTOCOL = 0, + PEER_MSG_ERR_SIZELIMIT, +}; + + +/*******************************/ +/* stick table sync mesg types */ +/* Note: ids >= 128 contains */ +/* id message cotains data */ +/*******************************/ +enum { + PEER_MSG_STKT_UPDATE = 128, + PEER_MSG_STKT_INCUPDATE, + PEER_MSG_STKT_DEFINE, + PEER_MSG_STKT_SWITCH, + PEER_MSG_STKT_ACK, +}; /**********************************/ /* Peer Session IO handler states */ @@ -90,13 +135,14 @@ enum { PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */ PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */ PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */ - PEER_SESS_ST_GETTABLE, /* Search into registered table for a table with same id and validate type and size */ /* after this point, data were possibly exchanged */ PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */ PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */ PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */ PEER_SESS_ST_WAITMSG, /* Wait for data messages */ PEER_SESS_ST_EXIT, /* Exit with status code */ + PEER_SESS_ST_ERRPROTO, /* Send error proto message before exit */ + PEER_SESS_ST_ERRSIZE, /* Send error size message before exit */ PEER_SESS_ST_END, /* Killed session */ }; @@ -115,67 +161,241 @@ enum { #define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */ #define PEER_SESS_SC_ERRHOST 503 /* bad host name */ #define PEER_SESS_SC_ERRPEER 504 /* unknown peer */ -#define PEER_SESS_SC_ERRTYPE 505 /* table key type mismatch */ -#define PEER_SESS_SC_ERRSIZE 506 /* table key size mismatch */ -#define PEER_SESS_SC_ERRTABLE 507 /* unknown table */ #define PEER_SESSION_PROTO_NAME "HAProxyS" struct peers *peers = NULL; static void peer_session_forceshutdown(struct stream * stream); +int intencode(uint64_t i, char **str) { + int idx = 0; + unsigned char *msg; -/* - * This prepare the data update message of the stick session , is the the peer session - * where the data going to be pushed, is a buffer of to recieve data message content - */ -static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size) -{ - uint32_t netinteger; - int len; - /* construct message */ - if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) { - msg[0] = 0x80 + ts->upd.key - ps->lastpush; - len = sizeof(char); - } - else { - msg[0] = 'D'; - netinteger = htonl(ts->upd.key); - memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger)); - len = sizeof(char) + sizeof(netinteger); + if (!*str) + return 0; + + msg = (unsigned char *)*str; + if (i < 240) { + msg[0] = (unsigned char)i; + *str = (char *)&msg[idx+1]; + return (idx+1); } - if (ps->table->table->type == STKTABLE_TYPE_STRING) { - int stlen = strlen((char *)ts->key.key); - - netinteger = htonl(strlen((char *)ts->key.key)); - memcpy(&msg[len], &netinteger, sizeof(netinteger)); - memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen); - len += sizeof(netinteger) + stlen; - + msg[idx] =(unsigned char)i | 240; + i = (i - 240) >> 4; + while (i >= 128) { + msg[++idx] = (unsigned char)i | 128; + i = (i - 128) >> 7; } - else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) { - netinteger = htonl(*((uint32_t *)ts->key.key)); - memcpy(&msg[len], &netinteger, sizeof(netinteger)); - len += sizeof(netinteger); - } - else { - memcpy(&msg[len], ts->key.key, ps->table->table->key_size); - len += ps->table->table->key_size; - } - - if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID)) - netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id)); - else - netinteger = 0; - - memcpy(&msg[len], &netinteger , sizeof(netinteger)); - len += sizeof(netinteger); - - return len; + msg[++idx] = (unsigned char)i; + *str = (char *)&msg[idx+1]; + return (idx+1); } +/* This function returns the decoded integer or 0 + if decode failed + *str point on the beginning of the integer to decode + at the end of decoding *str point on the end of the + encoded integer or to null if end is reached */ +uint64_t intdecode(char **str, char *end) { + uint64_t i; + int idx = 0; + unsigned char *msg; + + if (!*str) + return 0; + + msg = (unsigned char *)*str; + if (msg >= (unsigned char *)end) { + *str = NULL; + return 0; + } + + if (msg[idx] < 240) { + *str = (char *)&msg[idx+1]; + return msg[idx]; + } + i = msg[idx]; + do { + idx++; + if (msg >= (unsigned char *)end) { + *str = NULL; + return 0; + } + i += (uint64_t)msg[idx] << (4 + 7*(idx-1)); + } + while (msg[idx] > 128); + *str = (char *)&msg[idx+1]; + return i; +} + +/* + * This prepare the data update message on the stick session , is the considered + * stick table. + * is a buffer of to recieve data message content + * If function returns 0, the caller should consider we were unable to encode this message (TODO: + * check size) + */ +static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier) +{ + uint32_t netinteger; + unsigned short datalen; + char *cursor, *datamsg; + + cursor = datamsg = msg + 1 + 5; + + /* construct message */ + + /* check if we need to send the update identifer */ + if (st->last_pushed && ts->upd.key > st->last_pushed && (ts->upd.key - st->last_pushed) == 1) { + use_identifier = 0; + } + + /* encode update identifier if needed */ + if (use_identifier) { + netinteger = htonl(ts->upd.key); + memcpy(cursor, &netinteger, sizeof(netinteger)); + cursor += sizeof(netinteger); + } + + /* encode the key */ + if (st->table->type == STKTABLE_TYPE_STRING) { + int stlen = strlen((char *)ts->key.key); + + intencode(stlen, &cursor); + memcpy(cursor, ts->key.key, stlen); + cursor += stlen; + } + else if (st->table->type == STKTABLE_TYPE_INTEGER) { + netinteger = htonl(*((uint32_t *)ts->key.key)); + memcpy(cursor, &netinteger, sizeof(netinteger)); + cursor += sizeof(netinteger); + } + else { + memcpy(cursor, ts->key.key, st->table->key_size); + cursor += st->table->key_size; + } + + /* encode values */ + if (stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID)) { + int srvid; + + srvid = stktable_data_cast(stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID), server_id); + intencode(srvid, &cursor); + } + + /* Compute datalen */ + datalen = (cursor - datamsg); + + /* prepare message header */ + msg[0] = PEER_MSG_CLASS_STICKTABLE; + if (use_identifier) + msg[1] = PEER_MSG_STKT_UPDATE; + else + msg[1] = PEER_MSG_STKT_INCUPDATE; + + cursor = &msg[2]; + intencode(datalen, &cursor); + + /* move data after header */ + memmove(cursor, datamsg, datalen); + + /* return header size + data_len */ + return (cursor - msg) + datalen; +} + +/* + * This prepare the switch table message to targeted share table . + * is a buffer of to recieve data message content + * If function returns 0, the caller should consider we were unable to encode this message (TODO: + * check size) + */ +static int peer_prepare_switchmsg(struct shared_table *st, char *msg, size_t size) +{ + int len; + unsigned short datalen; + char *cursor, *datamsg; + uint64_t data = 0; + + cursor = datamsg = msg + 2 + 5; + + /* Encode data */ + + /* encode local id */ + intencode(st->local_id, &cursor); + + /* encode table name */ + len = strlen(st->table->id); + intencode(len, &cursor); + memcpy(cursor, st->table->id, len); + cursor += len; + + /* encode table type */ + + intencode(st->table->type, &cursor); + + /* encode table key size */ + intencode(st->table->key_size, &cursor); + + /* encode available data types in table */ + if (st->table->data_ofs[STKTABLE_DT_SERVER_ID]) { + data |= 1 << STKTABLE_DT_SERVER_ID; + } + intencode(data, &cursor); + + /* Compute datalen */ + datalen = (cursor - datamsg); + + /* prepare message header */ + msg[0] = PEER_MSG_CLASS_STICKTABLE; + msg[1] = PEER_MSG_STKT_DEFINE; + cursor = &msg[2]; + intencode(datalen, &cursor); + + /* move data after header */ + memmove(cursor, datamsg, datalen); + + /* return header size + data_len */ + return (cursor - msg) + datalen; +} + +/* + * This prepare the acknowledge message on the stick session , is the considered + * stick table. + * is a buffer of to recieve data message content + * If function returns 0, the caller should consider we were unable to encode this message (TODO: + * check size) + */ +static int peer_prepare_ackmsg(struct shared_table *st, char *msg, size_t size) +{ + unsigned short datalen; + char *cursor, *datamsg; + uint32_t netinteger; + + cursor = datamsg = trash.str + 2 + 5; + + intencode(st->remote_id, &cursor); + netinteger = htonl(st->last_get); + memcpy(cursor, &netinteger, sizeof(netinteger)); + cursor += sizeof(netinteger); + + /* Compute datalen */ + datalen = (cursor - datamsg); + + /* prepare message header */ + msg[0] = PEER_MSG_CLASS_STICKTABLE; + msg[1] = PEER_MSG_STKT_DEFINE; + cursor = &msg[2]; + intencode(datalen, &cursor); + + /* move data after header */ + memmove(cursor, datamsg, datalen); + + /* return header size + data_len */ + return (cursor - msg) + datalen; +} + /* * Callback to release a session with a peer */ @@ -183,30 +403,31 @@ static void peer_session_release(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); - struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr; + struct peer *peer = (struct peer *)appctx->ctx.peers.ptr; + struct peers *peers = (struct peers *)strm_fe(s)->parent; /* appctx->ctx.peers.ptr is not a peer session */ if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS) return; /* peer session identified */ - if (ps) { - if (ps->stream == s) { - ps->stream = NULL; - ps->appctx = NULL; - if (ps->flags & PEER_F_LEARN_ASSIGN) { + if (peer) { + if (peer->stream == s) { + peer->stream = NULL; + peer->appctx = NULL; + if (peer->flags & PEER_F_LEARN_ASSIGN) { /* unassign current peer for learning */ - ps->flags &= ~(PEER_F_LEARN_ASSIGN); - ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); + peer->flags &= ~(PEER_F_LEARN_ASSIGN); + peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); /* reschedule a resync */ - ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); } /* reset teaching and learning flags to 0 */ - ps->flags &= PEER_TEACH_RESET; - ps->flags &= PEER_LEARN_RESET; + peer->flags &= PEER_TEACH_RESET; + peer->flags &= PEER_LEARN_RESET; } - task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } } @@ -249,7 +470,7 @@ static void peer_io_handler(struct appctx *appctx) bo_skip(si_oc(si), reql); /* test version */ - if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) { + if (strcmp(PEER_SESSION_PROTO_NAME " 2.0", trash.str) != 0) { appctx->st0 = PEER_SESS_ST_EXIT; appctx->st1 = PEER_SESS_SC_ERRVERSION; /* test protocol */ @@ -310,7 +531,7 @@ static void peer_io_handler(struct appctx *appctx) bo_skip(si_oc(si), reql); - /* parse line " " */ + /* parse line " " */ p = strchr(trash.str, ' '); if (!p) { appctx->st0 = PEER_SESS_ST_EXIT; @@ -332,124 +553,24 @@ static void peer_io_handler(struct appctx *appctx) goto switchstate; } - appctx->ctx.peers.ptr = curpeer; - appctx->st0 = PEER_SESS_ST_GETTABLE; - /* fall through */ - } - case PEER_SESS_ST_GETTABLE: { - struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; - struct shared_table *st; - struct peer_session *ps = NULL; - unsigned long key_type; - size_t key_size; - char *p; - - reql = bo_getline(si_oc(si), trash.str, trash.size); - if (reql <= 0) { /* closed or EOL not found */ - if (reql == 0) - goto out; - appctx->ctx.peers.ptr = NULL; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - /* Re init appctx->ctx.peers.ptr to null, to handle correctly a release case */ - appctx->ctx.peers.ptr = NULL; - - if (trash.str[reql-1] != '\n') { - /* Incomplete line, we quit */ - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - else if (reql > 1 && (trash.str[reql-2] == '\r')) - trash.str[reql-2] = 0; - else - trash.str[reql-1] = 0; - - bo_skip(si_oc(si), reql); - - /* Parse line " " */ - p = strchr(trash.str, ' '); - if (!p) { - appctx->st0 = PEER_SESS_ST_EXIT; - appctx->st1 = PEER_SESS_SC_ERRPROTO; - goto switchstate; - } - *p = 0; - key_type = (unsigned long)atol(p+1); - - p = strchr(p+1, ' '); - if (!p) { - appctx->ctx.peers.ptr = NULL; - appctx->st0 = PEER_SESS_ST_EXIT; - appctx->st1 = PEER_SESS_SC_ERRPROTO; - goto switchstate; - } - - key_size = (size_t)atoi(p); - for (st = curpeers->tables; st; st = st->next) { - /* If table name matches */ - if (strcmp(st->table->id, trash.str) == 0) { - /* Check key size mismatches, except for strings - * which may be truncated as long as they fit in - * a buffer. - */ - if (key_size != st->table->key_size && - (key_type != STKTABLE_TYPE_STRING || - 1 + 4 + 4 + key_size - 1 >= trash.size)) { - appctx->st0 = PEER_SESS_ST_EXIT; - appctx->st1 = PEER_SESS_SC_ERRSIZE; - goto switchstate; - } - - /* If key type mismatches */ - if (key_type != st->table->type) { - appctx->st0 = PEER_SESS_ST_EXIT; - appctx->st1 = PEER_SESS_SC_ERRTYPE; - goto switchstate; - } - - /* lookup peer stream of current peer */ - for (ps = st->sessions; ps; ps = ps->next) { - if (ps->peer == curpeer) { - /* If stream already active, replaced by new one */ - if (ps->stream && ps->stream != s) { - if (ps->peer->local) { - /* Local connection, reply a retry */ - appctx->st0 = PEER_SESS_ST_EXIT; - appctx->st1 = PEER_SESS_SC_TRYAGAIN; - goto switchstate; - } - peer_session_forceshutdown(ps->stream); - } - ps->stream = s; - ps->appctx = appctx; - break; - } - } - break; + if (curpeer->stream && curpeer->stream != s) { + if (curpeer->local) { + /* Local connection, reply a retry */ + appctx->st0 = PEER_SESS_ST_EXIT; + appctx->st1 = PEER_SESS_SC_TRYAGAIN; + goto switchstate; } + peer_session_forceshutdown(curpeer->stream); } - - /* If table not found */ - if (!st){ - appctx->st0 = PEER_SESS_ST_EXIT; - appctx->st1 = PEER_SESS_SC_ERRTABLE; - goto switchstate; - } - - /* If no peer session for current peer */ - if (!ps) { - appctx->st0 = PEER_SESS_ST_EXIT; - appctx->st1 = PEER_SESS_SC_ERRPEER; - goto switchstate; - } - - appctx->ctx.peers.ptr = ps; + curpeer->stream = s; + curpeer->appctx = appctx; + appctx->ctx.peers.ptr = curpeer; appctx->st0 = PEER_SESS_ST_SENDSUCCESS; /* fall through */ } case PEER_SESS_ST_SENDSUCCESS: { - struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr; + struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; + struct shared_table *st; repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE); repl = bi_putblk(si_ic(si), trash.str, repl); @@ -461,55 +582,57 @@ static void peer_io_handler(struct appctx *appctx) } /* Register status code */ - ps->statuscode = PEER_SESS_SC_SUCCESSCODE; + curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE; /* Awake main task */ - task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); - - /* Init cursors */ - ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0; - ps->pushed = ps->update; + task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG); /* Init confirm counter */ - ps->confirm = 0; + curpeer->confirm = 0; + + /* Init cursors */ + for (st = curpeer->tables; st ; st = st->next) { + st->last_get = st->last_acked = 0; + st->teaching_origin = st->last_pushed = st->update; + } /* reset teaching and learning flags to 0 */ - ps->flags &= PEER_TEACH_RESET; - ps->flags &= PEER_LEARN_RESET; + curpeer->flags &= PEER_TEACH_RESET; + curpeer->flags &= PEER_LEARN_RESET; /* if current peer is local */ - if (ps->peer->local) { - /* if table need resyncfrom local and no process assined */ - if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL && - !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) { - /* assign local peer for a lesson, consider lesson already requested */ - ps->flags |= PEER_F_LEARN_ASSIGN; - ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); - } + if (curpeer->local) { + /* if current host need resyncfrom local and no process assined */ + if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* assign local peer for a lesson, consider lesson already requested */ + curpeer->flags |= PEER_F_LEARN_ASSIGN; + peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); + } + + } + else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* assign peer for a lesson */ + curpeer->flags |= PEER_F_LEARN_ASSIGN; + peers->flags |= PEERS_F_RESYNC_ASSIGN; + } + - } - else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE && - !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) { - /* assign peer for a lesson */ - ps->flags |= PEER_F_LEARN_ASSIGN; - ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN; - } /* switch to waiting message state */ appctx->st0 = PEER_SESS_ST_WAITMSG; goto switchstate; } case PEER_SESS_ST_CONNECT: { - struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr; + struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; /* Send headers */ repl = snprintf(trash.str, trash.size, - PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n", - ps->peer->id, + PEER_SESSION_PROTO_NAME " 2.0\n%s\n%s %d %d\n", + curpeer->id, localpeer, (int)getpid(), - ps->table->table->id, - ps->table->table->type, - (int)ps->table->table->key_size); + relative_pid); if (repl >= trash.size) { appctx->st0 = PEER_SESS_ST_END; @@ -529,10 +652,11 @@ static void peer_io_handler(struct appctx *appctx) /* fall through */ } case PEER_SESS_ST_GETSTATUS: { - struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr; + struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; + struct shared_table *st; if (si_ic(si)->flags & CF_WRITE_PARTIAL) - ps->statuscode = PEER_SESS_SC_CONNECTEDCODE; + curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE; reql = bo_getline(si_oc(si), trash.str, trash.size); if (reql <= 0) { /* closed or EOL not found */ @@ -554,40 +678,40 @@ static void peer_io_handler(struct appctx *appctx) bo_skip(si_oc(si), reql); /* Register status code */ - ps->statuscode = atoi(trash.str); + curpeer->statuscode = atoi(trash.str); /* Awake main task */ - task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); /* If status code is success */ - if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) { + if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) { /* Init cursors */ - ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0; - ps->pushed = ps->update; + for (st = curpeer->tables; st ; st = st->next) { + st->last_get = st->last_acked = 0; + st->teaching_origin = st->last_pushed = st->update; + } /* Init confirm counter */ - ps->confirm = 0; + curpeer->confirm = 0; - /* reset teaching and learning flags to 0 */ - ps->flags &= PEER_TEACH_RESET; - ps->flags &= PEER_LEARN_RESET; + /* reset teaching and learning flags to 0 */ + curpeer->flags &= PEER_TEACH_RESET; + curpeer->flags &= PEER_LEARN_RESET; - /* If current peer is local */ - if (ps->peer->local) { - /* Init cursors to push a resync */ - ps->teaching_origin = ps->pushed = ps->table->table->update; - /* flag to start to teach lesson */ - ps->flags |= PEER_F_TEACH_PROCESS; + /* If current peer is local */ + if (curpeer->local) { + /* flag to start to teach lesson */ + curpeer->flags |= PEER_F_TEACH_PROCESS; - } - else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE && - !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) { - /* If peer is remote and resync from remote is needed, - and no peer currently assigned */ + } + else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* If peer is remote and resync from remote is needed, + and no peer currently assigned */ - /* assign peer for a lesson */ - ps->flags |= PEER_F_LEARN_ASSIGN; - ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN; + /* assign peer for a lesson */ + curpeer->flags |= PEER_F_LEARN_ASSIGN; + peers->flags |= PEERS_F_RESYNC_ASSIGN; } } @@ -600,256 +724,403 @@ static void peer_io_handler(struct appctx *appctx) /* fall through */ } case PEER_SESS_ST_WAITMSG: { - struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr; + struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; struct stksess *ts, *newts = NULL; - char c; + uint32_t msg_len = 0; + char *msg_cur = trash.str; + char *msg_end = trash.str; + unsigned char msg_head[7]; int totl = 0; - reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl); + reql = bo_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl); if (reql <= 0) /* closed or EOL not found */ goto incomplete; totl += reql; - if ((c & 0x80) || (c == 'D')) { - /* Here we have data message */ - unsigned int pushack; - int srvid; - uint32_t netinteger; - - /* Compute update remote version */ - if (c & 0x80) { - pushack = ps->pushack + (unsigned int)(c & 0x7F); - } - else { - reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl); - if (reql <= 0) /* closed or EOL not found */ - goto incomplete; - - totl += reql; - pushack = ntohl(netinteger); - } - - /* Read key. The string keys are read in two steps, the first step - * consists in reading whatever fits into the table directly into - * the pre-allocated key. The second step consists in simply - * draining all exceeding data. This can happen for example after a - * config reload with a smaller key size for the stick table than - * what was previously set, or when facing the impossibility to - * allocate a new stksess (for example when the table is full with - * "nopurge"). - */ - if (ps->table->table->type == STKTABLE_TYPE_STRING) { - unsigned int to_read, to_store; - - /* read size first */ - reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl); - if (reql <= 0) /* closed or EOL not found */ - goto incomplete; - - totl += reql; - - to_store = 0; - to_read = ntohl(netinteger); - - if (to_read + totl > si_ob(si)->size) { - /* impossible to read a key this large, abort */ - reql = -1; - goto incomplete; - } - - newts = stksess_new(ps->table->table, NULL); - if (newts) - to_store = MIN(to_read, ps->table->table->key_size - 1); - - /* we read up to two blocks, the first one goes into the key, - * the rest is drained into the trash. - */ - if (to_store) { - reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl); - if (reql <= 0) /* closed or incomplete */ - goto incomplete; - newts->key.key[reql] = 0; - totl += reql; - to_read -= reql; - } - if (to_read) { - reql = bo_getblk(si_oc(si), trash.str, to_read, totl); - if (reql <= 0) /* closed or incomplete */ - goto incomplete; - totl += reql; - } - } - else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) { - reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl); - if (reql <= 0) /* closed or EOL not found */ - goto incomplete; - newts = stksess_new(ps->table->table, NULL); - if (newts) { - netinteger = ntohl(netinteger); - memcpy(newts->key.key, &netinteger, sizeof(netinteger)); - } - totl += reql; - } - else { - /* type ip or binary */ - newts = stksess_new(ps->table->table, NULL); - reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl); - if (reql <= 0) /* closed or EOL not found */ - goto incomplete; - totl += reql; - } - - /* read server id */ - reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl); - if (reql <= 0) /* closed or EOL not found */ + if (msg_head[1] >= 128) { + /* Read and Decode message length */ + reql = bo_getblk(si_oc(si), (char *)&msg_head[2], sizeof(unsigned char), totl); + if (reql <= 0) /* closed */ goto incomplete; totl += reql; - srvid = ntohl(netinteger); - /* update entry */ - if (newts) { + if (msg_head[2] < 240) { + msg_len = msg_head[2]; + } + else { + int i; + char *cur; + char *end; + + for (i = 3 ; i < sizeof(msg_head) ; i++) { + reql = bo_getblk(si_oc(si), (char *)&msg_head[i], sizeof(char), totl); + if (reql <= 0) /* closed */ + goto incomplete; + + totl += reql; + + if (!(msg_head[i] & 0x80)) + break; + } + + if (i == sizeof(msg_head)) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + + } + end = (char *)msg_head + sizeof(msg_head); + cur = (char *)&msg_head[2]; + msg_len = intdecode(&cur, end); + if (!cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + } + + + /* Read message content */ + if (msg_len) { + if (msg_len > trash.size) { + /* Status code is not success, abort */ + appctx->st0 = PEER_SESS_ST_ERRSIZE; + goto switchstate; + } + + reql = bo_getblk(si_oc(si), trash.str, msg_len, totl); + if (reql <= 0) /* closed */ + goto incomplete; + totl += reql; + + msg_end += msg_len; + } + } + + if (msg_head[0] == PEER_MSG_CLASS_CONTROL) { + if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) { + struct shared_table *st; + /* Reset message: remote need resync */ + + /* prepare tables fot a global push */ + for (st = curpeer->tables; st; st = st->next) { + st->teaching_origin = st->last_pushed = st->table->update; + st->flags = 0; + } + + /* reset teaching flags to 0 */ + curpeer->flags &= PEER_TEACH_RESET; + + /* flag to start to teach lesson */ + curpeer->flags |= PEER_F_TEACH_PROCESS; + + + } + else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) { + + if (curpeer->flags & PEER_F_LEARN_ASSIGN) { + curpeer->flags &= ~PEER_F_LEARN_ASSIGN; + peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); + peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE); + } + curpeer->confirm++; + } + else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) { + + if (curpeer->flags & PEER_F_LEARN_ASSIGN) { + curpeer->flags &= ~PEER_F_LEARN_ASSIGN; + peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); + + curpeer->flags |= PEER_F_LEARN_NOTUP2DATE; + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); + } + curpeer->confirm++; + } + else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) { + + /* If stopping state */ + if (stopping) { + /* Close session, push resync no more needed */ + curpeer->flags |= PEER_F_TEACH_COMPLETE; + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + + /* reset teaching flags to 0 */ + curpeer->flags &= PEER_TEACH_RESET; + } + } + else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) { + if (msg_head[1] == PEER_MSG_STKT_DEFINE) { + int table_id_len; + struct shared_table *st; + int table_type; + int table_keylen; + int table_id; + uint64_t table_data; + + table_id = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + table_id_len = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + curpeer->remote_table = NULL; + if (!table_id_len || (msg_cur + table_id_len) >= msg_end) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + for (st = curpeer->tables; st; st = st->next) { + /* Reset IDs */ + if (st->remote_id == table_id) + st->remote_id = 0; + + if (!curpeer->remote_table + && (table_id_len == strlen(st->table->id)) + && (memcmp(st->table->id, msg_cur, table_id_len) == 0)) { + curpeer->remote_table = st; + } + } + + if (!curpeer->remote_table) { + goto ignore_msg; + } + + msg_cur += table_id_len; + if (msg_cur >= msg_end) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + table_type = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + table_keylen = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + table_data = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + if (curpeer->remote_table->table->type != table_type + || curpeer->remote_table->table->key_size != table_keylen) { + curpeer->remote_table = NULL; + goto ignore_msg; + } + + curpeer->remote_table->remote_data = table_data; + curpeer->remote_table->remote_id = table_id; + } + else if (msg_head[1] == PEER_MSG_STKT_SWITCH) { + struct shared_table *st; + int table_id; + + table_id = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + curpeer->remote_table = NULL; + for (st = curpeer->tables; st; st = st->next) { + if (st->remote_id == table_id) { + curpeer->remote_table = st; + break; + } + } + + } + else if (msg_head[1] == PEER_MSG_STKT_UPDATE + || msg_head[1] == PEER_MSG_STKT_INCUPDATE) { + struct shared_table *st = curpeer->remote_table; + uint32_t update; + + /* Here we have data message */ + if (!st) + goto ignore_msg; + + if (msg_head[1] == PEER_MSG_STKT_UPDATE) { + if (msg_len < sizeof(update)) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + memcpy(&update, msg_cur, sizeof(update)); + msg_cur += sizeof(update); + st->last_get = htonl(update); + } + else { + st->last_get++; + } + + newts = stksess_new(st->table, NULL); + if (!newts) + goto ignore_msg; + + if (st->table->type == STKTABLE_TYPE_STRING) { + unsigned int to_read, to_store; + + to_read = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + stksess_free(st->table, newts); + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + to_store = MIN(to_read, st->table->key_size - 1); + if (msg_cur + to_store > msg_end) { + /* malformed message */ + stksess_free(st->table, newts); + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + memcpy(newts->key.key, msg_cur, to_store); + newts->key.key[to_store] = 0; + msg_cur += to_read; + } + else if (st->table->type == STKTABLE_TYPE_INTEGER) { + unsigned int netinteger; + + if (msg_cur + sizeof(netinteger) > msg_end) { + /* malformed message */ + stksess_free(st->table, newts); + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + memcpy(&netinteger, msg_cur, sizeof(netinteger)); + netinteger = ntohl(netinteger); + memcpy(newts->key.key, &netinteger, sizeof(netinteger)); + msg_cur += sizeof(netinteger); + } + else { + if (msg_cur + st->table->key_size > msg_end) { + /* malformed message */ + stksess_free(st->table, newts); + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + memcpy(newts->key.key, msg_cur, st->table->key_size); + msg_cur += st->table->key_size; + } + /* lookup for existing entry */ - ts = stktable_lookup(ps->table->table, newts); + ts = stktable_lookup(st->table, newts); if (ts) { - /* the entry already exist, we can free ours */ - stktable_touch(ps->table->table, ts, 0); - stksess_free(ps->table->table, newts); + /* the entry already exist, we can free ours */ + stktable_touch(st->table, ts, 0); + stksess_free(st->table, newts); newts = NULL; } else { struct eb32_node *eb; /* create new entry */ - ts = stktable_store(ps->table->table, newts, 0); + ts = stktable_store(st->table, newts, 0); newts = NULL; /* don't reuse it */ - ts->upd.key= (++ps->table->table->update)+(2^31); - eb = eb32_insert(&ps->table->table->updates, &ts->upd); + ts->upd.key= (++st->table->update)+(2^31); + eb = eb32_insert(&st->table->updates, &ts->upd); if (eb != &ts->upd) { eb32_delete(eb); - eb32_insert(&ps->table->table->updates, &ts->upd); + eb32_insert(&st->table->updates, &ts->upd); } } - /* update entry */ - if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID)) - stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid; - ps->pushack = pushack; + if ((1 << STKTABLE_DT_SERVER_ID) & st->remote_data) { + int srvid; + + srvid = intdecode(&msg_cur, msg_end); + if (!msg_cur) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + + if (stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID)) { + stktable_data_cast(stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid; + } + } } + else if (msg_head[1] == PEER_MSG_STKT_ACK) { + /* ack message */ + uint32_t table_id ; + uint32_t update; + struct shared_table *st; - } - else if (c == 'R') { - /* Reset message: remote need resync */ + table_id = intdecode(&msg_cur, msg_end); + if (!msg_cur || (msg_cur + sizeof(update) > msg_end)) { + /* malformed message */ + appctx->st0 = PEER_SESS_ST_ERRPROTO; + goto switchstate; + } + memcpy(&update, msg_cur, sizeof(update)); + update = ntohl(update); - /* reinit counters for a resync */ - ps->lastpush = 0; - ps->teaching_origin = ps->pushed = ps->table->table->update; - - /* reset teaching flags to 0 */ - ps->flags &= PEER_TEACH_RESET; - - /* flag to start to teach lesson */ - ps->flags |= PEER_F_TEACH_PROCESS; - } - else if (c == 'F') { - /* Finish message, all known updates have been pushed by remote */ - /* and remote is up to date */ - - /* If resync is in progress with remote peer */ - if (ps->flags & PEER_F_LEARN_ASSIGN) { - - /* unassign current peer for learning */ - ps->flags &= ~PEER_F_LEARN_ASSIGN; - ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); - - /* Consider table is now up2date, resync resync no more needed from local neither remote */ - ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE); + for (st = curpeer->tables; st; st = st->next) { + if (st->local_id == table_id) { + st->update = update; + break; + } + } } - /* Increase confirm counter to launch a confirm message */ - ps->confirm++; } - else if (c == 'c') { - /* confirm message, remote peer is now up to date with us */ - - /* If stopping state */ - if (stopping) { - /* Close session, push resync no more needed */ - ps->flags |= PEER_F_TEACH_COMPLETE; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - - /* reset teaching flags to 0 */ - ps->flags &= PEER_TEACH_RESET; - } - else if (c == 'C') { - /* Continue message, all known updates have been pushed by remote */ - /* but remote is not up to date */ - - /* If resync is in progress with current peer */ - if (ps->flags & PEER_F_LEARN_ASSIGN) { - - /* unassign current peer */ - ps->flags &= ~PEER_F_LEARN_ASSIGN; - ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); - - /* flag current peer is not up 2 date to try from an other */ - ps->flags |= PEER_F_LEARN_NOTUP2DATE; - - /* reschedule a resync */ - ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); - task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); - } - ps->confirm++; - } - else if (c == 'A') { - /* ack message */ - uint32_t netinteger; - - reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl); - if (reql <= 0) /* closed or EOL not found */ - goto incomplete; - - totl += reql; - - /* Consider remote is up to date with "acked" version */ - ps->update = ntohl(netinteger); - } - else { - /* Unknown message */ - appctx->st0 = PEER_SESS_ST_END; + else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) { + appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } +ignore_msg: /* skip consumed message */ bo_skip(si_oc(si), totl); - /* loop on that state to peek next message */ goto switchstate; incomplete: /* we get here when a bo_getblk() returns <= 0 in reql */ - /* first, we may have to release newts */ - if (newts) { - stksess_free(ps->table->table, newts); - newts = NULL; - } - if (reql < 0) { /* there was an error */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } - /* Nothing to read, now we start to write */ /* Confirm finished or partial messages */ - while (ps->confirm) { + while (curpeer->confirm) { + unsigned char msg[2]; + /* There is a confirm messages to send */ - repl = bi_putchr(si_ic(si), 'c'); + msg[0] = PEER_MSG_CLASS_CONTROL; + msg[1] = PEER_MSG_CTRL_RESYNCCONFIRM; + + /* message to buffer */ + repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg)); if (repl <= 0) { /* no more write possible */ if (repl == -1) @@ -857,175 +1128,313 @@ static void peer_io_handler(struct appctx *appctx) appctx->st0 = PEER_SESS_ST_END; goto switchstate; } - ps->confirm--; + curpeer->confirm--; } + /* Need to request a resync */ - if ((ps->flags & PEER_F_LEARN_ASSIGN) && - (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) && - !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) { - /* Current peer was elected to request a resync */ + if ((curpeer->flags & PEER_F_LEARN_ASSIGN) && + (peers->flags & PEERS_F_RESYNC_ASSIGN) && + !(peers->flags & PEERS_F_RESYNC_PROCESS)) { + unsigned char msg[2]; - repl = bi_putchr(si_ic(si), 'R'); - if (repl <= 0) { - /* no more write possible */ - if (repl == -1) - goto full; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - ps->table->flags |= SHTABLE_F_RESYNC_PROCESS; - } + /* Current peer was elected to request a resync */ + msg[0] = PEER_MSG_CLASS_CONTROL; + msg[1] = PEER_MSG_CTRL_RESYNCREQ; - /* It remains some updates to ack */ - if (ps->pushack != ps->lastack) { - uint32_t netinteger; + /* message to buffer */ + repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg)); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto full; + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + peers->flags |= PEERS_F_RESYNC_PROCESS; + } - trash.str[0] = 'A'; - netinteger = htonl(ps->pushack); - memcpy(&trash.str[1], &netinteger, sizeof(netinteger)); + /* Nothing to read, now we start to write */ - repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger)); - if (repl <= 0) { - /* no more write possible */ - if (repl == -1) - goto full; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - ps->lastack = ps->pushack; - } + if (curpeer->tables) { + struct shared_table *st; + struct shared_table *last_local_table; - if (ps->flags & PEER_F_TEACH_PROCESS) { - /* current peer was requested for a lesson */ + last_local_table = curpeer->last_local_table; + if (!last_local_table) + last_local_table = curpeer->tables; + st = last_local_table->next; - if (!(ps->flags & PEER_F_TEACH_STAGE1)) { - /* lesson stage 1 not complete */ - struct eb32_node *eb; - - eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1); - while (1) { - int msglen; - struct stksess *ts; - - if (!eb) { - /* flag lesson stage1 complete */ - ps->flags |= PEER_F_TEACH_STAGE1; - eb = eb32_first(&ps->table->table->updates); - if (eb) - ps->pushed = eb->key - 1; - break; - } - - ts = eb32_entry(eb, struct stksess, upd); - msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size); - if (msglen) { - /* message to buffer */ - repl = bi_putblk(si_ic(si), trash.str, msglen); - if (repl <= 0) { - /* no more write possible */ - if (repl == -1) - goto full; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - ps->lastpush = ps->pushed = ts->upd.key; - } - eb = eb32_next(eb); - } - } /* !TEACH_STAGE1 */ - - if (!(ps->flags & PEER_F_TEACH_STAGE2)) { - /* lesson stage 2 not complete */ - struct eb32_node *eb; - - eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1); - while (1) { - int msglen; - struct stksess *ts; - - if (!eb || eb->key > ps->teaching_origin) { - /* flag lesson stage1 complete */ - ps->flags |= PEER_F_TEACH_STAGE2; - ps->pushed = ps->teaching_origin; - break; - } - - ts = eb32_entry(eb, struct stksess, upd); - msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size); - if (msglen) { - /* message to buffer */ - repl = bi_putblk(si_ic(si), trash.str, msglen); - if (repl <= 0) { - /* no more write possible */ - if (repl == -1) - goto full; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - ps->lastpush = ps->pushed = ts->upd.key; - } - eb = eb32_next(eb); - } - } /* !TEACH_STAGE2 */ - - if (!(ps->flags & PEER_F_TEACH_FINISHED)) { - /* process final lesson message */ - repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C'); - if (repl <= 0) { - /* no more write possible */ - if (repl == -1) - goto full; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - - /* flag finished message sent */ - ps->flags |= PEER_F_TEACH_FINISHED; - } /* !TEACH_FINISHED */ - } /* TEACH_PROCESS */ - - if (!(ps->flags & PEER_F_LEARN_ASSIGN) && - (int)(ps->pushed - ps->table->table->localupdate) < 0) { - /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */ - struct eb32_node *eb; - - eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1); while (1) { - int msglen; - struct stksess *ts; + if (!st) + st = curpeer->tables; - /* push local updates */ - if (!eb) { - eb = eb32_first(&ps->table->table->updates); - if (!eb || ((int)(eb->key - ps->pushed) <= 0)) { - ps->pushed = ps->table->table->localupdate; - break; + /* It remains some updates to ack */ + if (st->last_get != st->last_acked) { + int msglen; + + msglen = peer_prepare_ackmsg(st, trash.str, trash.size); + if (!msglen) { + /* internal error: message does not fit in trash */ + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; } - } - if ((int)(eb->key - ps->table->table->localupdate) > 0) { - ps->pushed = ps->table->table->localupdate; - break; - } - - ts = eb32_entry(eb, struct stksess, upd); - msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size); - if (msglen) { /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ - if (repl == -1) + if (repl == -1) { goto full; + } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } - ps->lastpush = ps->pushed = ts->upd.key; + st->last_acked = st->last_get; } - eb = eb32_next(eb); + + if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) { + if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) && + ((int)(st->last_pushed - st->table->localupdate) < 0)) { + struct eb32_node *eb; + int new_pushed; + + if (st != curpeer->last_local_table) { + int msglen; + + msglen = peer_prepare_switchmsg(st, trash.str, trash.size); + if (!msglen) { + /* internal error: message does not fit in trash */ + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + + /* message to buffer */ + repl = bi_putblk(si_ic(si), trash.str, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) { + goto full; + } + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + curpeer->last_local_table = st; + } + + /* We force new pushed to 1 to force identifier in update message */ + new_pushed = 1; + eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); + while (1) { + uint32_t msglen; + struct stksess *ts; + + /* push local updates */ + if (!eb) { + eb = eb32_first(&st->table->updates); + if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) { + st->last_pushed = st->table->localupdate; + break; + } + } + + if ((int)(eb->key - st->table->localupdate) > 0) { + st->last_pushed = st->table->localupdate; + break; + } + + ts = eb32_entry(eb, struct stksess, upd); + msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed); + if (!msglen) { + /* internal error: message does not fit in trash */ + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + + /* message to buffer */ + repl = bi_putblk(si_ic(si), trash.str, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) { + goto full; + } + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + st->last_pushed = ts->upd.key; + /* identifier may not needed in next update message */ + new_pushed = 0; + + eb = eb32_next(eb); + } + } + } + else { + if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { + struct eb32_node *eb; + int new_pushed; + + if (st != curpeer->last_local_table) { + int msglen; + + msglen = peer_prepare_switchmsg(st, trash.str, trash.size); + if (!msglen) { + /* internal error: message does not fit in trash */ + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + + /* message to buffer */ + repl = bi_putblk(si_ic(si), trash.str, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) { + goto full; + } + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + curpeer->last_local_table = st; + } + + /* We force new pushed to 1 to force identifier in update message */ + new_pushed = 1; + eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); + while (1) { + uint32_t msglen; + struct stksess *ts; + + /* push local updates */ + if (!eb) { + st->flags |= SHTABLE_F_TEACH_STAGE1; + eb = eb32_first(&st->table->updates); + if (eb) + st->last_pushed = eb->key - 1; + break; + } + + ts = eb32_entry(eb, struct stksess, upd); + msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed); + if (!msglen) { + /* internal error: message does not fit in trash */ + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + + /* message to buffer */ + repl = bi_putblk(si_ic(si), trash.str, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) { + goto full; + } + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + st->last_pushed = ts->upd.key; + /* identifier may not needed in next update message */ + new_pushed = 0; + + eb = eb32_next(eb); + } + } + + if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) { + struct eb32_node *eb; + int new_pushed; + + if (st != curpeer->last_local_table) { + int msglen; + + msglen = peer_prepare_switchmsg(st, trash.str, trash.size); + if (!msglen) { + /* internal error: message does not fit in trash */ + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + + /* message to buffer */ + repl = bi_putblk(si_ic(si), trash.str, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) { + goto full; + } + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + curpeer->last_local_table = st; + } + + /* We force new pushed to 1 to force identifier in update message */ + new_pushed = 1; + eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); + while (1) { + uint32_t msglen; + struct stksess *ts; + + /* push local updates */ + if (!eb || eb->key > st->teaching_origin) { + st->flags |= SHTABLE_F_TEACH_STAGE2; + eb = eb32_first(&st->table->updates); + if (eb) + st->last_pushed = eb->key - 1; + break; + } + + ts = eb32_entry(eb, struct stksess, upd); + msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed); + if (!msglen) { + /* internal error: message does not fit in trash */ + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + + /* message to buffer */ + repl = bi_putblk(si_ic(si), trash.str, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) { + goto full; + } + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + st->last_pushed = ts->upd.key; + /* identifier may not needed in next update message */ + new_pushed = 0; + + eb = eb32_next(eb); + } + } + } + + if (st == last_local_table) + break; + st = st->next; } - } /* ! LEARN_ASSIGN */ + } + + + if ((curpeer->flags & PEER_F_TEACH_PROCESS) && !(curpeer->flags & PEER_F_TEACH_FINISHED)) { + unsigned char msg[2]; + + /* Current peer was elected to request a resync */ + msg[0] = PEER_MSG_CLASS_CONTROL; + msg[1] = ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED) ? PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL; + /* process final lesson message */ + repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg)); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto full; + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + /* flag finished message sent */ + curpeer->flags |= PEER_F_TEACH_FINISHED; + } + /* noting more to do */ goto out; } @@ -1035,7 +1444,29 @@ static void peer_io_handler(struct appctx *appctx) if (bi_putblk(si_ic(si), trash.str, repl) == -1) goto full; appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + case PEER_SESS_ST_ERRSIZE: { + unsigned char msg[2]; + + msg[0] = PEER_MSG_CLASS_ERROR; + msg[1] = PEER_MSG_ERR_SIZELIMIT; + + if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1) + goto full; + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + case PEER_SESS_ST_ERRPROTO: { + unsigned char msg[2]; + + msg[0] = PEER_MSG_CLASS_ERROR; + msg[1] = PEER_MSG_ERR_PROTOCOL; + + if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1) + goto full; + appctx->st0 = PEER_SESS_ST_END; /* fall through */ + } case PEER_SESS_ST_END: { si_shutw(si); si_shutr(si); @@ -1065,7 +1496,8 @@ static struct applet peer_applet = { static void peer_session_forceshutdown(struct stream * stream) { struct appctx *appctx = NULL; - struct peer_session *ps; + struct peer *ps; + int i; for (i = 0; i <= 1; i++) { @@ -1080,7 +1512,7 @@ static void peer_session_forceshutdown(struct stream * stream) if (!appctx) return; - ps = (struct peer_session *)appctx->ctx.peers.ptr; + ps = (struct peer *)appctx->ctx.peers.ptr; /* we're killing a connection, we must apply a random delay before * retrying otherwise the other end will do the same and we can loop * for a while. @@ -1112,9 +1544,9 @@ void peers_setup_frontend(struct proxy *fe) /* * Create a new peer session in assigned state (connect will start automatically) */ -static struct stream *peer_session_create(struct peer *peer, struct peer_session *ps) +static struct stream *peer_session_create(struct peers *peers, struct peer *peer) { - struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe); + struct listener *l = LIST_NEXT(&peers->peers_fe->conf.listeners, struct listener *, by_fe); struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */ struct appctx *appctx; struct session *sess; @@ -1122,8 +1554,8 @@ static struct stream *peer_session_create(struct peer *peer, struct peer_session struct task *t; struct connection *conn; - ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); - ps->statuscode = PEER_SESS_SC_CONNECTCODE; + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); + peer->statuscode = PEER_SESS_SC_CONNECTCODE; s = NULL; appctx = appctx_new(&peer_applet); @@ -1131,7 +1563,7 @@ static struct stream *peer_session_create(struct peer *peer, struct peer_session goto out_close; appctx->st0 = PEER_SESS_ST_CONNECT; - appctx->ctx.peers.ptr = (void *)ps; + appctx->ctx.peers.ptr = (void *)peer; sess = session_new(p, l, &appctx->obj_type); if (!sess) { @@ -1185,8 +1617,8 @@ static struct stream *peer_session_create(struct peer *peer, struct peer_session actconn++; totalconn++; - ps->appctx = appctx; - ps->stream = s; + peer->appctx = appctx; + peer->stream = s; return s; /* Error unrolling */ @@ -1209,41 +1641,43 @@ static struct stream *peer_session_create(struct peer *peer, struct peer_session */ static struct task *process_peer_sync(struct task * task) { - struct shared_table *st = (struct shared_table *)task->context; - struct peer_session *ps; + struct peers *peers = (struct peers *)task->context; + struct peer *ps; + struct shared_table *st; task->expire = TICK_ETERNITY; - if (!st->sessions->peer->peers->peers_fe) { + if (!peers->peers_fe) { /* this one was never started, kill it */ - signal_unregister_handler(st->sighandler); - st->table->sync_task = NULL; - task_delete(st->sync_task); - task_free(st->sync_task); + signal_unregister_handler(peers->sighandler); + peers->sync_task = NULL; + task_delete(peers->sync_task); + task_free(peers->sync_task); return NULL; } if (!stopping) { /* Normal case (not soft stop)*/ - if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) && - (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) && - !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) { + + if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) && + (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { /* Resync from local peer needed no peer was assigned for the lesson and no old local peer found or resync timeout expire */ /* flag no more resync from local, to try resync from remotes */ - st->flags |= SHTABLE_F_RESYNC_LOCAL; + peers->flags |= PEERS_F_RESYNC_LOCAL; /* reschedule a resync */ - st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); } /* For each session */ - for (ps = st->sessions; ps; ps = ps->next) { + for (ps = peers->remote; ps; ps = ps->next) { /* For each remote peers */ - if (!ps->peer->local) { + if (!ps->local) { if (!ps->stream) { /* no active stream */ if (ps->statuscode == 0 || @@ -1257,7 +1691,7 @@ static struct task *process_peer_sync(struct task * task) * and reconnection timer is expired */ /* retry a connect */ - ps->stream = peer_session_create(ps->peer, ps); + ps->stream = peer_session_create(peers, ps); } else if (!tick_is_expired(ps->reconnect, now_ms)) { /* If previous session failed during connection @@ -1270,8 +1704,8 @@ static struct task *process_peer_sync(struct task * task) } /* !ps->stream */ else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) { /* current stream is active and established */ - if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) && - !(st->flags & SHTABLE_F_RESYNC_ASSIGN) && + if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && + !(peers->flags & PEERS_F_RESYNC_ASSIGN) && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { /* Resync from a remote is needed * and no peer was assigned for lesson @@ -1279,14 +1713,20 @@ static struct task *process_peer_sync(struct task * task) /* assign peer for the lesson */ ps->flags |= PEER_F_LEARN_ASSIGN; - st->flags |= SHTABLE_F_RESYNC_ASSIGN; + peers->flags |= PEERS_F_RESYNC_ASSIGN; /* awake peer stream task to handle a request of resync */ appctx_wakeup(ps->appctx); } - else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) { - /* awake peer stream task to push local updates */ - appctx_wakeup(ps->appctx); + else { + /* Awake session if there is data to push */ + for (st = ps->tables; st ; st = st->next) { + if ((int)(st->last_pushed - st->table->localupdate) < 0) { + /* awake peer stream task to push local updates */ + appctx_wakeup(ps->appctx); + break; + } + } } /* else do nothing */ } /* SUCCESSCODE */ @@ -1294,36 +1734,38 @@ static struct task *process_peer_sync(struct task * task) } /* for */ /* Resync from remotes expired: consider resync is finished */ - if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) && - !(st->flags & SHTABLE_F_RESYNC_ASSIGN) && - tick_is_expired(st->resync_timeout, now_ms)) { + if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && + !(peers->flags & PEERS_F_RESYNC_ASSIGN) && + tick_is_expired(peers->resync_timeout, now_ms)) { /* Resync from remote peer needed * no peer was assigned for the lesson * and resync timeout expire */ /* flag no more resync from remote, consider resync is finished */ - st->flags |= SHTABLE_F_RESYNC_REMOTE; + peers->flags |= PEERS_F_RESYNC_REMOTE; } - if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) { + if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) { /* Resync not finished*/ /* reschedule task to resync timeout, to ended resync if needed */ - task->expire = tick_first(task->expire, st->resync_timeout); + task->expire = tick_first(task->expire, peers->resync_timeout); } } /* !stopping */ else { /* soft stop case */ if (task->state & TASK_WOKEN_SIGNAL) { /* We've just recieved the signal */ - if (!(st->flags & SHTABLE_F_DONOTSTOP)) { + if (!(peers->flags & PEERS_F_DONOTSTOP)) { /* add DO NOT STOP flag if not present */ jobs++; - st->flags |= SHTABLE_F_DONOTSTOP; - st->table->syncing++; + peers->flags |= PEERS_F_DONOTSTOP; + ps = peers->local; + for (st = ps->tables; st ; st = st->next) + st->table->syncing++; } /* disconnect all connected peers */ - for (ps = st->sessions; ps; ps = ps->next) { + for (ps = peers->remote; ps; ps = ps->next) { if (ps->stream) { peer_session_forceshutdown(ps->stream); ps->stream = NULL; @@ -1331,14 +1773,15 @@ static struct task *process_peer_sync(struct task * task) } } } - ps = st->local_session; + ps = peers->local; if (ps->flags & PEER_F_TEACH_COMPLETE) { - if (st->flags & SHTABLE_F_DONOTSTOP) { + if (peers->flags & PEERS_F_DONOTSTOP) { /* resync of new process was complete, current process can die now */ jobs--; - st->flags &= ~SHTABLE_F_DONOTSTOP; - st->table->syncing--; + peers->flags &= ~PEERS_F_DONOTSTOP; + for (st = ps->tables; st ; st = st->next) + st->table->syncing--; } } else if (!ps->stream) { @@ -1353,29 +1796,60 @@ static struct task *process_peer_sync(struct task * task) * or during previous connect, peer replies a try again statuscode */ /* connect to the peer */ - peer_session_create(ps->peer, ps); + peer_session_create(peers, ps); } else { /* Other error cases */ - if (st->flags & SHTABLE_F_DONOTSTOP) { + if (peers->flags & PEERS_F_DONOTSTOP) { /* unable to resync new process, current process can die now */ jobs--; - st->flags &= ~SHTABLE_F_DONOTSTOP; - st->table->syncing--; + peers->flags &= ~PEERS_F_DONOTSTOP; + for (st = ps->tables; st ; st = st->next) + st->table->syncing--; } } } - else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE && - (int)(ps->pushed - ps->table->table->localupdate) < 0) { + else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) { /* current stream active and established awake stream to push remaining local updates */ - appctx_wakeup(ps->appctx); + for (st = ps->tables; st ; st = st->next) { + if ((int)(st->last_pushed - st->table->localupdate) < 0) { + /* awake peer stream task to push local updates */ + appctx_wakeup(ps->appctx); + break; + } + } } } /* stopping */ /* Wakeup for re-connect */ return task; } + +/* + * + */ +void peers_init_sync(struct peers *peers) +{ + struct peer * curpeer; + struct listener *listener; + + for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) { + peers->peers_fe->maxconn += 3; + } + + list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe) + listener->maxconn = peers->peers_fe->maxconn; + peers->sync_task = task_new(); + peers->sync_task->process = process_peer_sync; + peers->sync_task->expire = TICK_ETERNITY; + peers->sync_task->context = (void *)peers; + peers->sighandler = signal_register_task(0, peers->sync_task, 0); + task_wakeup(peers->sync_task, TASK_WOKEN_INIT); +} + + + /* * Function used to register a table for sync on a group of peers * @@ -1384,35 +1858,19 @@ void peers_register_table(struct peers *peers, struct stktable *table) { struct shared_table *st; struct peer * curpeer; - struct peer_session *ps; - struct listener *listener; - - st = (struct shared_table *)calloc(1,sizeof(struct shared_table)); - st->table = table; - st->next = peers->tables; - st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); - peers->tables = st; + int id = 0; for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) { - ps = (struct peer_session *)calloc(1,sizeof(struct peer_session)); - ps->table = st; - ps->peer = curpeer; - if (curpeer->local) - st->local_session = ps; - ps->next = st->sessions; - ps->reconnect = now_ms; - st->sessions = ps; - peers->peers_fe->maxconn += 3; + st = (struct shared_table *)calloc(1,sizeof(struct shared_table)); + st->table = table; + st->next = curpeer->tables; + if (curpeer->tables) + id = curpeer->tables->local_id; + st->local_id = id + 1; + + curpeer->tables = st; } - list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe) - listener->maxconn = peers->peers_fe->maxconn; - st->sync_task = task_new(); - st->sync_task->process = process_peer_sync; - st->sync_task->expire = TICK_ETERNITY; - st->sync_task->context = (void *)st; - table->sync_task = st->sync_task; - st->sighandler = signal_register_task(0, table->sync_task, 0); - task_wakeup(st->sync_task, TASK_WOKEN_INIT); + table->sync_task = peers->sync_task; }