/* * Peer synchro management. * * Copyright 2010 EXCELIANCE, Emeric Brun * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /*******************************/ /* Current peer learning state */ /*******************************/ /******************************/ /* Current peers section resync state */ /******************************/ #define PEERS_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */ #define PEERS_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */ #define PEERS_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */ #define PEERS_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */ #define PEERS_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop to push data to new process */ #define PEERS_RESYNC_STATEMASK (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE) #define PEERS_RESYNC_FROMLOCAL 0x00000000 #define PEERS_RESYNC_FROMREMOTE PEERS_F_RESYNC_LOCAL #define PEERS_RESYNC_FINISHED (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE) /***********************************/ /* Current shared table sync state */ /***********************************/ #define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */ #define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */ /******************************/ /* Remote peer teaching state */ /******************************/ #define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */ #define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */ #define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */ #define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */ #define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */ #define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */ #define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE) /*****************************/ /* Sync message class */ /*****************************/ enum { PEER_MSG_CLASS_CONTROL = 0, PEER_MSG_CLASS_ERROR, PEER_MSG_CLASS_STICKTABLE = 10, PEER_MSG_CLASS_RESERVED = 255, }; /*****************************/ /* control message types */ /*****************************/ enum { PEER_MSG_CTRL_RESYNCREQ = 0, PEER_MSG_CTRL_RESYNCFINISHED, PEER_MSG_CTRL_RESYNCPARTIAL, PEER_MSG_CTRL_RESYNCCONFIRM, }; /*****************************/ /* error message types */ /*****************************/ enum { PEER_MSG_ERR_PROTOCOL = 0, PEER_MSG_ERR_SIZELIMIT, }; /*******************************/ /* stick table sync mesg types */ /* Note: ids >= 128 contains */ /* id message cotains data */ /*******************************/ enum { PEER_MSG_STKT_UPDATE = 128, PEER_MSG_STKT_INCUPDATE, PEER_MSG_STKT_DEFINE, PEER_MSG_STKT_SWITCH, PEER_MSG_STKT_ACK, }; /**********************************/ /* Peer Session IO handler states */ /**********************************/ enum { PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */ PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */ PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */ PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */ /* after this point, data were possibly exchanged */ PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */ PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */ PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */ PEER_SESS_ST_WAITMSG, /* Wait for data messages */ PEER_SESS_ST_EXIT, /* Exit with status code */ PEER_SESS_ST_ERRPROTO, /* Send error proto message before exit */ PEER_SESS_ST_ERRSIZE, /* Send error size message before exit */ PEER_SESS_ST_END, /* Killed session */ }; /***************************************************/ /* Peer Session status code - part of the protocol */ /***************************************************/ #define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */ #define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */ #define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */ #define PEER_SESS_SC_TRYAGAIN 300 /* try again later */ #define PEER_SESS_SC_ERRPROTO 501 /* error protocol */ #define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */ #define PEER_SESS_SC_ERRHOST 503 /* bad host name */ #define PEER_SESS_SC_ERRPEER 504 /* unknown peer */ #define PEER_SESSION_PROTO_NAME "HAProxyS" struct peers *peers = NULL; static void peer_session_forceshutdown(struct stream * stream); int intencode(uint64_t i, char **str) { int idx = 0; unsigned char *msg; if (!*str) return 0; msg = (unsigned char *)*str; if (i < 240) { msg[0] = (unsigned char)i; *str = (char *)&msg[idx+1]; return (idx+1); } msg[idx] =(unsigned char)i | 240; i = (i - 240) >> 4; while (i >= 128) { msg[++idx] = (unsigned char)i | 128; i = (i - 128) >> 7; } msg[++idx] = (unsigned char)i; *str = (char *)&msg[idx+1]; return (idx+1); } /* This function returns the decoded integer or 0 if decode failed *str point on the beginning of the integer to decode at the end of decoding *str point on the end of the encoded integer or to null if end is reached */ uint64_t intdecode(char **str, char *end) { uint64_t i; int idx = 0; unsigned char *msg; if (!*str) return 0; msg = (unsigned char *)*str; if (msg >= (unsigned char *)end) { *str = NULL; return 0; } if (msg[idx] < 240) { *str = (char *)&msg[idx+1]; return msg[idx]; } i = msg[idx]; do { idx++; if (msg >= (unsigned char *)end) { *str = NULL; return 0; } i += (uint64_t)msg[idx] << (4 + 7*(idx-1)); } while (msg[idx] > 128); *str = (char *)&msg[idx+1]; return i; } /* * This prepare the data update message on the stick session , is the considered * stick table. * is a buffer of to recieve data message content * If function returns 0, the caller should consider we were unable to encode this message (TODO: * check size) */ static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier) { uint32_t netinteger; unsigned short datalen; char *cursor, *datamsg; unsigned int data_type; void *data_ptr; cursor = datamsg = msg + 1 + 5; /* construct message */ /* check if we need to send the update identifer */ if (!st->last_pushed || ts->upd.key < st->last_pushed || ((ts->upd.key - st->last_pushed) != 1)) { use_identifier = 1; } /* encode update identifier if needed */ if (use_identifier) { netinteger = htonl(ts->upd.key); memcpy(cursor, &netinteger, sizeof(netinteger)); cursor += sizeof(netinteger); } /* encode the key */ if (st->table->type == SMP_T_STR) { int stlen = strlen((char *)ts->key.key); intencode(stlen, &cursor); memcpy(cursor, ts->key.key, stlen); cursor += stlen; } else if (st->table->type == SMP_T_SINT) { netinteger = htonl(*((uint32_t *)ts->key.key)); memcpy(cursor, &netinteger, sizeof(netinteger)); cursor += sizeof(netinteger); } else { memcpy(cursor, ts->key.key, st->table->key_size); cursor += st->table->key_size; } /* encode values */ for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) { data_ptr = stktable_data_ptr(st->table, ts, data_type); if (data_ptr) { switch (stktable_data_types[data_type].std_type) { case STD_T_SINT: { int data; data = stktable_data_cast(data_ptr, std_t_sint); intencode(data, &cursor); break; } case STD_T_UINT: { unsigned int data; data = stktable_data_cast(data_ptr, std_t_uint); intencode(data, &cursor); break; } case STD_T_ULL: { unsigned long long data; data = stktable_data_cast(data_ptr, std_t_ull); intencode(data, &cursor); break; } case STD_T_FRQP: { struct freq_ctr_period *frqp; frqp = &stktable_data_cast(data_ptr, std_t_frqp); intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor); intencode(frqp->curr_ctr, &cursor); intencode(frqp->prev_ctr, &cursor); break; } } } } /* Compute datalen */ datalen = (cursor - datamsg); /* prepare message header */ msg[0] = PEER_MSG_CLASS_STICKTABLE; if (use_identifier) msg[1] = PEER_MSG_STKT_UPDATE; else msg[1] = PEER_MSG_STKT_INCUPDATE; cursor = &msg[2]; intencode(datalen, &cursor); /* move data after header */ memmove(cursor, datamsg, datalen); /* return header size + data_len */ return (cursor - msg) + datalen; } /* * This prepare the switch table message to targeted share table . * is a buffer of to recieve data message content * If function returns 0, the caller should consider we were unable to encode this message (TODO: * check size) */ static int peer_prepare_switchmsg(struct shared_table *st, char *msg, size_t size) { int len; unsigned short datalen; char *cursor, *datamsg; uint64_t data = 0; unsigned int data_type; cursor = datamsg = msg + 2 + 5; /* Encode data */ /* encode local id */ intencode(st->local_id, &cursor); /* encode table name */ len = strlen(st->table->id); intencode(len, &cursor); memcpy(cursor, st->table->id, len); cursor += len; /* encode table type */ intencode(st->table->type, &cursor); /* encode table key size */ intencode(st->table->key_size, &cursor); /* encode available known data types in table */ for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) { if (st->table->data_ofs[data_type]) { switch (stktable_data_types[data_type].std_type) { case STD_T_SINT: case STD_T_UINT: case STD_T_ULL: case STD_T_FRQP: data |= 1 << data_type; break; } } } intencode(data, &cursor); /* Compute datalen */ datalen = (cursor - datamsg); /* prepare message header */ msg[0] = PEER_MSG_CLASS_STICKTABLE; msg[1] = PEER_MSG_STKT_DEFINE; cursor = &msg[2]; intencode(datalen, &cursor); /* move data after header */ memmove(cursor, datamsg, datalen); /* return header size + data_len */ return (cursor - msg) + datalen; } /* * This prepare the acknowledge message on the stick session , is the considered * stick table. * is a buffer of to recieve data message content * If function returns 0, the caller should consider we were unable to encode this message (TODO: * check size) */ static int peer_prepare_ackmsg(struct shared_table *st, char *msg, size_t size) { unsigned short datalen; char *cursor, *datamsg; uint32_t netinteger; cursor = datamsg = msg + 2 + 5; intencode(st->remote_id, &cursor); netinteger = htonl(st->last_get); memcpy(cursor, &netinteger, sizeof(netinteger)); cursor += sizeof(netinteger); /* Compute datalen */ datalen = (cursor - datamsg); /* prepare message header */ msg[0] = PEER_MSG_CLASS_STICKTABLE; msg[1] = PEER_MSG_STKT_ACK; cursor = &msg[2]; intencode(datalen, &cursor); /* move data after header */ memmove(cursor, datamsg, datalen); /* return header size + data_len */ return (cursor - msg) + datalen; } /* * Callback to release a session with a peer */ static void peer_session_release(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); struct peer *peer = (struct peer *)appctx->ctx.peers.ptr; struct peers *peers = (struct peers *)strm_fe(s)->parent; /* appctx->ctx.peers.ptr is not a peer session */ if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS) return; /* peer session identified */ if (peer) { if (peer->stream == s) { /* Re-init current table pointers to force announcement on re-connect */ peer->remote_table = peer->last_local_table = NULL; peer->stream = NULL; peer->appctx = NULL; if (peer->flags & PEER_F_LEARN_ASSIGN) { /* unassign current peer for learning */ peer->flags &= ~(PEER_F_LEARN_ASSIGN); peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); /* reschedule a resync */ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); } /* reset teaching and learning flags to 0 */ peer->flags &= PEER_TEACH_RESET; peer->flags &= PEER_LEARN_RESET; } task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } } /* * IO Handler to handle message exchance with a peer */ static void peer_io_handler(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); struct peers *curpeers = (struct peers *)strm_fe(s)->parent; int reql = 0; int repl = 0; while (1) { switchstate: switch(appctx->st0) { case PEER_SESS_ST_ACCEPT: appctx->ctx.peers.ptr = NULL; appctx->st0 = PEER_SESS_ST_GETVERSION; /* fall through */ case PEER_SESS_ST_GETVERSION: reql = bo_getline(si_oc(si), trash.str, trash.size); if (reql <= 0) { /* closed or EOL not found */ if (reql == 0) goto out; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } if (trash.str[reql-1] != '\n') { appctx->st0 = PEER_SESS_ST_END; goto switchstate; } else if (reql > 1 && (trash.str[reql-2] == '\r')) trash.str[reql-2] = 0; else trash.str[reql-1] = 0; bo_skip(si_oc(si), reql); /* test version */ if (strcmp(PEER_SESSION_PROTO_NAME " 2.0", trash.str) != 0) { appctx->st0 = PEER_SESS_ST_EXIT; appctx->st1 = PEER_SESS_SC_ERRVERSION; /* test protocol */ if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0) appctx->st1 = PEER_SESS_SC_ERRPROTO; goto switchstate; } appctx->st0 = PEER_SESS_ST_GETHOST; /* fall through */ case PEER_SESS_ST_GETHOST: reql = bo_getline(si_oc(si), trash.str, trash.size); if (reql <= 0) { /* closed or EOL not found */ if (reql == 0) goto out; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } if (trash.str[reql-1] != '\n') { appctx->st0 = PEER_SESS_ST_END; goto switchstate; } else if (reql > 1 && (trash.str[reql-2] == '\r')) trash.str[reql-2] = 0; else trash.str[reql-1] = 0; bo_skip(si_oc(si), reql); /* test hostname match */ if (strcmp(localpeer, trash.str) != 0) { appctx->st0 = PEER_SESS_ST_EXIT; appctx->st1 = PEER_SESS_SC_ERRHOST; goto switchstate; } appctx->st0 = PEER_SESS_ST_GETPEER; /* fall through */ case PEER_SESS_ST_GETPEER: { struct peer *curpeer; char *p; reql = bo_getline(si_oc(si), trash.str, trash.size); if (reql <= 0) { /* closed or EOL not found */ if (reql == 0) goto out; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } if (trash.str[reql-1] != '\n') { /* Incomplete line, we quit */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } else if (reql > 1 && (trash.str[reql-2] == '\r')) trash.str[reql-2] = 0; else trash.str[reql-1] = 0; bo_skip(si_oc(si), reql); /* parse line " " */ p = strchr(trash.str, ' '); if (!p) { appctx->st0 = PEER_SESS_ST_EXIT; appctx->st1 = PEER_SESS_SC_ERRPROTO; goto switchstate; } *p = 0; /* lookup known peer */ for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) { if (strcmp(curpeer->id, trash.str) == 0) break; } /* if unknown peer */ if (!curpeer) { appctx->st0 = PEER_SESS_ST_EXIT; appctx->st1 = PEER_SESS_SC_ERRPEER; goto switchstate; } if (curpeer->stream && curpeer->stream != s) { if (curpeer->local) { /* Local connection, reply a retry */ appctx->st0 = PEER_SESS_ST_EXIT; appctx->st1 = PEER_SESS_SC_TRYAGAIN; goto switchstate; } peer_session_forceshutdown(curpeer->stream); } curpeer->stream = s; curpeer->appctx = appctx; appctx->ctx.peers.ptr = curpeer; appctx->st0 = PEER_SESS_ST_SENDSUCCESS; /* fall through */ } case PEER_SESS_ST_SENDSUCCESS: { struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; struct shared_table *st; repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE); repl = bi_putblk(si_ic(si), trash.str, repl); if (repl <= 0) { if (repl == -1) goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* Register status code */ curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE; /* Awake main task */ task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG); /* Init confirm counter */ curpeer->confirm = 0; /* Init cursors */ for (st = curpeer->tables; st ; st = st->next) { st->last_get = st->last_acked = 0; st->teaching_origin = st->last_pushed = st->update; } /* reset teaching and learning flags to 0 */ curpeer->flags &= PEER_TEACH_RESET; curpeer->flags &= PEER_LEARN_RESET; /* if current peer is local */ if (curpeer->local) { /* if current host need resyncfrom local and no process assined */ if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL && !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { /* assign local peer for a lesson, consider lesson already requested */ curpeer->flags |= PEER_F_LEARN_ASSIGN; peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); } } else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { /* assign peer for a lesson */ curpeer->flags |= PEER_F_LEARN_ASSIGN; peers->flags |= PEERS_F_RESYNC_ASSIGN; } /* switch to waiting message state */ appctx->st0 = PEER_SESS_ST_WAITMSG; goto switchstate; } case PEER_SESS_ST_CONNECT: { struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; /* Send headers */ repl = snprintf(trash.str, trash.size, PEER_SESSION_PROTO_NAME " 2.0\n%s\n%s %d %d\n", curpeer->id, localpeer, (int)getpid(), relative_pid); if (repl >= trash.size) { appctx->st0 = PEER_SESS_ST_END; goto switchstate; } repl = bi_putblk(si_ic(si), trash.str, repl); if (repl <= 0) { if (repl == -1) goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* switch to the waiting statuscode state */ appctx->st0 = PEER_SESS_ST_GETSTATUS; /* fall through */ } case PEER_SESS_ST_GETSTATUS: { struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; struct shared_table *st; if (si_ic(si)->flags & CF_WRITE_PARTIAL) curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE; reql = bo_getline(si_oc(si), trash.str, trash.size); if (reql <= 0) { /* closed or EOL not found */ if (reql == 0) goto out; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } if (trash.str[reql-1] != '\n') { /* Incomplete line, we quit */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } else if (reql > 1 && (trash.str[reql-2] == '\r')) trash.str[reql-2] = 0; else trash.str[reql-1] = 0; bo_skip(si_oc(si), reql); /* Register status code */ curpeer->statuscode = atoi(trash.str); /* Awake main task */ task_wakeup(peers->sync_task, TASK_WOKEN_MSG); /* If status code is success */ if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) { /* Init cursors */ for (st = curpeer->tables; st ; st = st->next) { st->last_get = st->last_acked = 0; st->teaching_origin = st->last_pushed = st->update; } /* Init confirm counter */ curpeer->confirm = 0; /* reset teaching and learning flags to 0 */ curpeer->flags &= PEER_TEACH_RESET; curpeer->flags &= PEER_LEARN_RESET; /* If current peer is local */ if (curpeer->local) { /* flag to start to teach lesson */ curpeer->flags |= PEER_F_TEACH_PROCESS; } else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { /* If peer is remote and resync from remote is needed, and no peer currently assigned */ /* assign peer for a lesson */ curpeer->flags |= PEER_F_LEARN_ASSIGN; peers->flags |= PEERS_F_RESYNC_ASSIGN; } } else { /* Status code is not success, abort */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } appctx->st0 = PEER_SESS_ST_WAITMSG; /* fall through */ } case PEER_SESS_ST_WAITMSG: { struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr; struct stksess *ts, *newts = NULL; uint32_t msg_len = 0; char *msg_cur = trash.str; char *msg_end = trash.str; unsigned char msg_head[7]; int totl = 0; reql = bo_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl); if (reql <= 0) /* closed or EOL not found */ goto incomplete; totl += reql; if (msg_head[1] >= 128) { /* Read and Decode message length */ reql = bo_getblk(si_oc(si), (char *)&msg_head[2], sizeof(unsigned char), totl); if (reql <= 0) /* closed */ goto incomplete; totl += reql; if (msg_head[2] < 240) { msg_len = msg_head[2]; } else { int i; char *cur; char *end; for (i = 3 ; i < sizeof(msg_head) ; i++) { reql = bo_getblk(si_oc(si), (char *)&msg_head[i], sizeof(char), totl); if (reql <= 0) /* closed */ goto incomplete; totl += reql; if (!(msg_head[i] & 0x80)) break; } if (i == sizeof(msg_head)) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } end = (char *)msg_head + sizeof(msg_head); cur = (char *)&msg_head[2]; msg_len = intdecode(&cur, end); if (!cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } } /* Read message content */ if (msg_len) { if (msg_len > trash.size) { /* Status code is not success, abort */ appctx->st0 = PEER_SESS_ST_ERRSIZE; goto switchstate; } reql = bo_getblk(si_oc(si), trash.str, msg_len, totl); if (reql <= 0) /* closed */ goto incomplete; totl += reql; msg_end += msg_len; } } if (msg_head[0] == PEER_MSG_CLASS_CONTROL) { if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) { struct shared_table *st; /* Reset message: remote need resync */ /* prepare tables fot a global push */ for (st = curpeer->tables; st; st = st->next) { st->teaching_origin = st->last_pushed = st->table->update; st->flags = 0; } /* reset teaching flags to 0 */ curpeer->flags &= PEER_TEACH_RESET; /* flag to start to teach lesson */ curpeer->flags |= PEER_F_TEACH_PROCESS; } else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) { if (curpeer->flags & PEER_F_LEARN_ASSIGN) { curpeer->flags &= ~PEER_F_LEARN_ASSIGN; peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE); } curpeer->confirm++; } else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) { if (curpeer->flags & PEER_F_LEARN_ASSIGN) { curpeer->flags &= ~PEER_F_LEARN_ASSIGN; peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); curpeer->flags |= PEER_F_LEARN_NOTUP2DATE; peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } curpeer->confirm++; } else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) { /* If stopping state */ if (stopping) { /* Close session, push resync no more needed */ curpeer->flags |= PEER_F_TEACH_COMPLETE; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* reset teaching flags to 0 */ curpeer->flags &= PEER_TEACH_RESET; } } else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) { if (msg_head[1] == PEER_MSG_STKT_DEFINE) { int table_id_len; struct shared_table *st; int table_type; int table_keylen; int table_id; uint64_t table_data; table_id = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } table_id_len = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } curpeer->remote_table = NULL; if (!table_id_len || (msg_cur + table_id_len) >= msg_end) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } for (st = curpeer->tables; st; st = st->next) { /* Reset IDs */ if (st->remote_id == table_id) st->remote_id = 0; if (!curpeer->remote_table && (table_id_len == strlen(st->table->id)) && (memcmp(st->table->id, msg_cur, table_id_len) == 0)) { curpeer->remote_table = st; } } if (!curpeer->remote_table) { goto ignore_msg; } msg_cur += table_id_len; if (msg_cur >= msg_end) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } table_type = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } table_keylen = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } table_data = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } if (curpeer->remote_table->table->type != table_type || curpeer->remote_table->table->key_size != table_keylen) { curpeer->remote_table = NULL; goto ignore_msg; } curpeer->remote_table->remote_data = table_data; curpeer->remote_table->remote_id = table_id; } else if (msg_head[1] == PEER_MSG_STKT_SWITCH) { struct shared_table *st; int table_id; table_id = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } curpeer->remote_table = NULL; for (st = curpeer->tables; st; st = st->next) { if (st->remote_id == table_id) { curpeer->remote_table = st; break; } } } else if (msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_INCUPDATE) { struct shared_table *st = curpeer->remote_table; uint32_t update; unsigned int data_type; void *data_ptr; /* Here we have data message */ if (!st) goto ignore_msg; if (msg_head[1] == PEER_MSG_STKT_UPDATE) { if (msg_len < sizeof(update)) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } memcpy(&update, msg_cur, sizeof(update)); msg_cur += sizeof(update); st->last_get = htonl(update); } else { st->last_get++; } newts = stksess_new(st->table, NULL); if (!newts) goto ignore_msg; if (st->table->type == SMP_T_STR) { unsigned int to_read, to_store; to_read = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ stksess_free(st->table, newts); appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } to_store = MIN(to_read, st->table->key_size - 1); if (msg_cur + to_store > msg_end) { /* malformed message */ stksess_free(st->table, newts); appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } memcpy(newts->key.key, msg_cur, to_store); newts->key.key[to_store] = 0; msg_cur += to_read; } else if (st->table->type == SMP_T_SINT) { unsigned int netinteger; if (msg_cur + sizeof(netinteger) > msg_end) { /* malformed message */ stksess_free(st->table, newts); appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } memcpy(&netinteger, msg_cur, sizeof(netinteger)); netinteger = ntohl(netinteger); memcpy(newts->key.key, &netinteger, sizeof(netinteger)); msg_cur += sizeof(netinteger); } else { if (msg_cur + st->table->key_size > msg_end) { /* malformed message */ stksess_free(st->table, newts); appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } memcpy(newts->key.key, msg_cur, st->table->key_size); msg_cur += st->table->key_size; } /* lookup for existing entry */ ts = stktable_lookup(st->table, newts); if (ts) { /* the entry already exist, we can free ours */ stktable_touch(st->table, ts, 0); stksess_free(st->table, newts); newts = NULL; } else { struct eb32_node *eb; /* create new entry */ ts = stktable_store(st->table, newts, 0); newts = NULL; /* don't reuse it */ ts->upd.key= (++st->table->update)+(2147483648U); eb = eb32_insert(&st->table->updates, &ts->upd); if (eb != &ts->upd) { eb32_delete(eb); eb32_insert(&st->table->updates, &ts->upd); } } for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) { if ((1 << data_type) & st->remote_data) { switch (stktable_data_types[data_type].std_type) { case STD_T_SINT: { int data; data = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } data_ptr = stktable_data_ptr(st->table, ts, data_type); if (data_ptr) stktable_data_cast(data_ptr, std_t_sint) = data; break; } case STD_T_UINT: { unsigned int data; data = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } data_ptr = stktable_data_ptr(st->table, ts, data_type); if (data_ptr) stktable_data_cast(data_ptr, std_t_uint) = data; break; } case STD_T_ULL: { unsigned long long data; data = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } data_ptr = stktable_data_ptr(st->table, ts, data_type); if (data_ptr) stktable_data_cast(data_ptr, std_t_ull) = data; break; } case STD_T_FRQP: { struct freq_ctr_period data; data.curr_tick = tick_add(now_ms, intdecode(&msg_cur, msg_end)); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } data.curr_ctr = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } data.prev_ctr = intdecode(&msg_cur, msg_end); if (!msg_cur) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } data_ptr = stktable_data_ptr(st->table, ts, data_type); if (data_ptr) stktable_data_cast(data_ptr, std_t_frqp) = data; break; } } } } } else if (msg_head[1] == PEER_MSG_STKT_ACK) { /* ack message */ uint32_t table_id ; uint32_t update; struct shared_table *st; table_id = intdecode(&msg_cur, msg_end); if (!msg_cur || (msg_cur + sizeof(update) > msg_end)) { /* malformed message */ appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } memcpy(&update, msg_cur, sizeof(update)); update = ntohl(update); for (st = curpeer->tables; st; st = st->next) { if (st->local_id == table_id) { st->update = update; break; } } } } else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) { appctx->st0 = PEER_SESS_ST_ERRPROTO; goto switchstate; } ignore_msg: /* skip consumed message */ bo_skip(si_oc(si), totl); /* loop on that state to peek next message */ goto switchstate; incomplete: /* we get here when a bo_getblk() returns <= 0 in reql */ if (reql < 0) { /* there was an error */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* Confirm finished or partial messages */ while (curpeer->confirm) { unsigned char msg[2]; /* There is a confirm messages to send */ msg[0] = PEER_MSG_CLASS_CONTROL; msg[1] = PEER_MSG_CTRL_RESYNCCONFIRM; /* message to buffer */ repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg)); if (repl <= 0) { /* no more write possible */ if (repl == -1) goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } curpeer->confirm--; } /* Need to request a resync */ if ((curpeer->flags & PEER_F_LEARN_ASSIGN) && (peers->flags & PEERS_F_RESYNC_ASSIGN) && !(peers->flags & PEERS_F_RESYNC_PROCESS)) { unsigned char msg[2]; /* Current peer was elected to request a resync */ msg[0] = PEER_MSG_CLASS_CONTROL; msg[1] = PEER_MSG_CTRL_RESYNCREQ; /* message to buffer */ repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg)); if (repl <= 0) { /* no more write possible */ if (repl == -1) goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } peers->flags |= PEERS_F_RESYNC_PROCESS; } /* Nothing to read, now we start to write */ if (curpeer->tables) { struct shared_table *st; struct shared_table *last_local_table; last_local_table = curpeer->last_local_table; if (!last_local_table) last_local_table = curpeer->tables; st = last_local_table->next; while (1) { if (!st) st = curpeer->tables; /* It remains some updates to ack */ if (st->last_get != st->last_acked) { int msglen; msglen = peer_prepare_ackmsg(st, trash.str, trash.size); if (!msglen) { /* internal error: message does not fit in trash */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ if (repl == -1) { goto full; } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } st->last_acked = st->last_get; } if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) { if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) && ((int)(st->last_pushed - st->table->localupdate) < 0)) { struct eb32_node *eb; int new_pushed; if (st != curpeer->last_local_table) { int msglen; msglen = peer_prepare_switchmsg(st, trash.str, trash.size); if (!msglen) { /* internal error: message does not fit in trash */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ if (repl == -1) { goto full; } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } curpeer->last_local_table = st; } /* We force new pushed to 1 to force identifier in update message */ new_pushed = 1; eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); while (1) { uint32_t msglen; struct stksess *ts; /* push local updates */ if (!eb) { eb = eb32_first(&st->table->updates); if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) { st->table->commitupdate = st->last_pushed = st->table->localupdate; break; } } if ((int)(eb->key - st->table->localupdate) > 0) { st->table->commitupdate = st->last_pushed = st->table->localupdate; break; } ts = eb32_entry(eb, struct stksess, upd); msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed); if (!msglen) { /* internal error: message does not fit in trash */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ if (repl == -1) { goto full; } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } st->last_pushed = ts->upd.key; if ((int)(st->last_pushed - st->table->commitupdate) > 0) st->table->commitupdate = st->last_pushed; /* identifier may not needed in next update message */ new_pushed = 0; eb = eb32_next(eb); } } } else { if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { struct eb32_node *eb; int new_pushed; if (st != curpeer->last_local_table) { int msglen; msglen = peer_prepare_switchmsg(st, trash.str, trash.size); if (!msglen) { /* internal error: message does not fit in trash */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ if (repl == -1) { goto full; } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } curpeer->last_local_table = st; } /* We force new pushed to 1 to force identifier in update message */ new_pushed = 1; eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); while (1) { uint32_t msglen; struct stksess *ts; /* push local updates */ if (!eb) { st->flags |= SHTABLE_F_TEACH_STAGE1; eb = eb32_first(&st->table->updates); if (eb) st->last_pushed = eb->key - 1; break; } ts = eb32_entry(eb, struct stksess, upd); msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed); if (!msglen) { /* internal error: message does not fit in trash */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ if (repl == -1) { goto full; } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } st->last_pushed = ts->upd.key; /* identifier may not needed in next update message */ new_pushed = 0; eb = eb32_next(eb); } } if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) { struct eb32_node *eb; int new_pushed; if (st != curpeer->last_local_table) { int msglen; msglen = peer_prepare_switchmsg(st, trash.str, trash.size); if (!msglen) { /* internal error: message does not fit in trash */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ if (repl == -1) { goto full; } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } curpeer->last_local_table = st; } /* We force new pushed to 1 to force identifier in update message */ new_pushed = 1; eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); while (1) { uint32_t msglen; struct stksess *ts; /* push local updates */ if (!eb || eb->key > st->teaching_origin) { st->flags |= SHTABLE_F_TEACH_STAGE2; eb = eb32_first(&st->table->updates); if (eb) st->last_pushed = eb->key - 1; break; } ts = eb32_entry(eb, struct stksess, upd); msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed); if (!msglen) { /* internal error: message does not fit in trash */ appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* message to buffer */ repl = bi_putblk(si_ic(si), trash.str, msglen); if (repl <= 0) { /* no more write possible */ if (repl == -1) { goto full; } appctx->st0 = PEER_SESS_ST_END; goto switchstate; } st->last_pushed = ts->upd.key; /* identifier may not needed in next update message */ new_pushed = 0; eb = eb32_next(eb); } } } if (st == last_local_table) break; st = st->next; } } if ((curpeer->flags & PEER_F_TEACH_PROCESS) && !(curpeer->flags & PEER_F_TEACH_FINISHED)) { unsigned char msg[2]; /* Current peer was elected to request a resync */ msg[0] = PEER_MSG_CLASS_CONTROL; msg[1] = ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED) ? PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL; /* process final lesson message */ repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg)); if (repl <= 0) { /* no more write possible */ if (repl == -1) goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } /* flag finished message sent */ curpeer->flags |= PEER_F_TEACH_FINISHED; } /* noting more to do */ goto out; } case PEER_SESS_ST_EXIT: repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1); if (bi_putblk(si_ic(si), trash.str, repl) == -1) goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; case PEER_SESS_ST_ERRSIZE: { unsigned char msg[2]; msg[0] = PEER_MSG_CLASS_ERROR; msg[1] = PEER_MSG_ERR_SIZELIMIT; if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1) goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } case PEER_SESS_ST_ERRPROTO: { unsigned char msg[2]; msg[0] = PEER_MSG_CLASS_ERROR; msg[1] = PEER_MSG_ERR_PROTOCOL; if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1) goto full; appctx->st0 = PEER_SESS_ST_END; /* fall through */ } case PEER_SESS_ST_END: { si_shutw(si); si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; goto out; } } } out: si_oc(si)->flags |= CF_READ_DONTWAIT; return; full: si_applet_cant_put(si); goto out; } static struct applet peer_applet = { .obj_type = OBJ_TYPE_APPLET, .name = "", /* used for logging */ .fct = peer_io_handler, .release = peer_session_release, }; /* * Use this function to force a close of a peer session */ static void peer_session_forceshutdown(struct stream * stream) { struct appctx *appctx = NULL; struct peer *ps; int i; for (i = 0; i <= 1; i++) { appctx = objt_appctx(stream->si[i].end); if (!appctx) continue; if (appctx->applet != &peer_applet) continue; break; } if (!appctx) return; ps = (struct peer *)appctx->ctx.peers.ptr; /* we're killing a connection, we must apply a random delay before * retrying otherwise the other end will do the same and we can loop * for a while. */ if (ps) ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000)); /* call release to reinit resync states if needed */ peer_session_release(appctx); appctx->st0 = PEER_SESS_ST_END; appctx->ctx.peers.ptr = NULL; task_wakeup(stream->task, TASK_WOKEN_MSG); } /* Pre-configures a peers frontend to accept incoming connections */ void peers_setup_frontend(struct proxy *fe) { fe->last_change = now.tv_sec; fe->cap = PR_CAP_FE; fe->maxconn = 0; fe->conn_retries = CONN_RETRIES; fe->timeout.client = MS_TO_TICKS(5000); fe->accept = frontend_accept; fe->default_target = &peer_applet.obj_type; fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC; fe->bind_proc = 0; /* will be filled by users */ } /* * Create a new peer session in assigned state (connect will start automatically) */ static struct stream *peer_session_create(struct peers *peers, struct peer *peer) { struct listener *l = LIST_NEXT(&peers->peers_fe->conf.listeners, struct listener *, by_fe); struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */ struct appctx *appctx; struct session *sess; struct stream *s; struct task *t; struct connection *conn; peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); peer->statuscode = PEER_SESS_SC_CONNECTCODE; s = NULL; appctx = appctx_new(&peer_applet); if (!appctx) goto out_close; appctx->st0 = PEER_SESS_ST_CONNECT; appctx->ctx.peers.ptr = (void *)peer; sess = session_new(p, l, &appctx->obj_type); if (!sess) { Alert("out of memory in peer_session_create().\n"); goto out_free_appctx; } if ((t = task_new()) == NULL) { Alert("out of memory in peer_session_create().\n"); goto out_free_sess; } t->nice = l->nice; if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) { Alert("Failed to initialize stream in peer_session_create().\n"); goto out_free_task; } /* The tasks below are normally what is supposed to be done by * fe->accept(). */ s->flags = SF_ASSIGNED|SF_ADDR_SET; /* applet is waiting for data */ si_applet_cant_get(&s->si[0]); appctx_wakeup(appctx); /* initiate an outgoing connection */ si_set_state(&s->si[1], SI_ST_ASS); /* automatically prepare the stream interface to connect to the * pre-initialized connection in si->conn. */ if (unlikely((conn = conn_new()) == NULL)) goto out_free_strm; conn_prepare(conn, peer->proto, peer->xprt); si_attach_conn(&s->si[1], conn); conn->target = s->target = &s->be->obj_type; memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to)); s->do_log = NULL; s->uniq_id = 0; s->res.flags |= CF_READ_DONTWAIT; l->nbconn++; /* warning! right now, it's up to the handler to decrease this */ p->feconn++;/* beconn will be increased later */ jobs++; if (!(s->sess->listener->options & LI_O_UNLIMITED)) actconn++; totalconn++; peer->appctx = appctx; peer->stream = s; return s; /* Error unrolling */ out_free_strm: LIST_DEL(&s->list); pool_free2(pool2_stream, s); out_free_task: task_free(t); out_free_sess: session_free(sess); out_free_appctx: appctx_free(appctx); out_close: return s; } /* * Task processing function to manage re-connect and peer session * tasks wakeup on local update. */ static struct task *process_peer_sync(struct task * task) { struct peers *peers = (struct peers *)task->context; struct peer *ps; struct shared_table *st; task->expire = TICK_ETERNITY; if (!peers->peers_fe) { /* this one was never started, kill it */ signal_unregister_handler(peers->sighandler); task_delete(peers->sync_task); task_free(peers->sync_task); peers->sync_task = NULL; return NULL; } if (!stopping) { /* Normal case (not soft stop)*/ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) && (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) && !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { /* Resync from local peer needed no peer was assigned for the lesson and no old local peer found or resync timeout expire */ /* flag no more resync from local, to try resync from remotes */ peers->flags |= PEERS_F_RESYNC_LOCAL; /* reschedule a resync */ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); } /* For each session */ for (ps = peers->remote; ps; ps = ps->next) { /* For each remote peers */ if (!ps->local) { if (!ps->stream) { /* no active stream */ if (ps->statuscode == 0 || ((ps->statuscode == PEER_SESS_SC_CONNECTCODE || ps->statuscode == PEER_SESS_SC_SUCCESSCODE || ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) && tick_is_expired(ps->reconnect, now_ms))) { /* connection never tried * or previous stream established with success * or previous stream failed during connection * and reconnection timer is expired */ /* retry a connect */ ps->stream = peer_session_create(peers, ps); } else if (!tick_is_expired(ps->reconnect, now_ms)) { /* If previous session failed during connection * but reconnection timer is not expired */ /* reschedule task for reconnect */ task->expire = tick_first(task->expire, ps->reconnect); } /* else do nothing */ } /* !ps->stream */ else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) { /* current stream is active and established */ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && !(peers->flags & PEERS_F_RESYNC_ASSIGN) && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { /* Resync from a remote is needed * and no peer was assigned for lesson * and current peer may be up2date */ /* assign peer for the lesson */ ps->flags |= PEER_F_LEARN_ASSIGN; peers->flags |= PEERS_F_RESYNC_ASSIGN; /* awake peer stream task to handle a request of resync */ appctx_wakeup(ps->appctx); } else { /* Awake session if there is data to push */ for (st = ps->tables; st ; st = st->next) { if ((int)(st->last_pushed - st->table->localupdate) < 0) { /* awake peer stream task to push local updates */ appctx_wakeup(ps->appctx); break; } } } /* else do nothing */ } /* SUCCESSCODE */ } /* !ps->peer->local */ } /* for */ /* Resync from remotes expired: consider resync is finished */ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && !(peers->flags & PEERS_F_RESYNC_ASSIGN) && tick_is_expired(peers->resync_timeout, now_ms)) { /* Resync from remote peer needed * no peer was assigned for the lesson * and resync timeout expire */ /* flag no more resync from remote, consider resync is finished */ peers->flags |= PEERS_F_RESYNC_REMOTE; } if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) { /* Resync not finished*/ /* reschedule task to resync timeout, to ended resync if needed */ task->expire = tick_first(task->expire, peers->resync_timeout); } } /* !stopping */ else { /* soft stop case */ if (task->state & TASK_WOKEN_SIGNAL) { /* We've just recieved the signal */ if (!(peers->flags & PEERS_F_DONOTSTOP)) { /* add DO NOT STOP flag if not present */ jobs++; peers->flags |= PEERS_F_DONOTSTOP; ps = peers->local; for (st = ps->tables; st ; st = st->next) st->table->syncing++; } /* disconnect all connected peers */ for (ps = peers->remote; ps; ps = ps->next) { if (ps->stream) { peer_session_forceshutdown(ps->stream); ps->stream = NULL; ps->appctx = NULL; } } } ps = peers->local; if (ps->flags & PEER_F_TEACH_COMPLETE) { if (peers->flags & PEERS_F_DONOTSTOP) { /* resync of new process was complete, current process can die now */ jobs--; peers->flags &= ~PEERS_F_DONOTSTOP; for (st = ps->tables; st ; st = st->next) st->table->syncing--; } } else if (!ps->stream) { /* If stream is not active */ if (ps->statuscode == 0 || ps->statuscode == PEER_SESS_SC_SUCCESSCODE || ps->statuscode == PEER_SESS_SC_CONNECTEDCODE || ps->statuscode == PEER_SESS_SC_TRYAGAIN) { /* connection never tried * or previous stream was successfully established * or previous stream tcp connect success but init state incomplete * or during previous connect, peer replies a try again statuscode */ /* connect to the peer */ peer_session_create(peers, ps); } else { /* Other error cases */ if (peers->flags & PEERS_F_DONOTSTOP) { /* unable to resync new process, current process can die now */ jobs--; peers->flags &= ~PEERS_F_DONOTSTOP; for (st = ps->tables; st ; st = st->next) st->table->syncing--; } } } else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) { /* current stream active and established awake stream to push remaining local updates */ for (st = ps->tables; st ; st = st->next) { if ((int)(st->last_pushed - st->table->localupdate) < 0) { /* awake peer stream task to push local updates */ appctx_wakeup(ps->appctx); break; } } } } /* stopping */ /* Wakeup for re-connect */ return task; } /* * */ void peers_init_sync(struct peers *peers) { struct peer * curpeer; struct listener *listener; for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) { peers->peers_fe->maxconn += 3; } list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe) listener->maxconn = peers->peers_fe->maxconn; peers->sync_task = task_new(); peers->sync_task->process = process_peer_sync; peers->sync_task->expire = TICK_ETERNITY; peers->sync_task->context = (void *)peers; peers->sighandler = signal_register_task(0, peers->sync_task, 0); task_wakeup(peers->sync_task, TASK_WOKEN_INIT); } /* * Function used to register a table for sync on a group of peers * */ void peers_register_table(struct peers *peers, struct stktable *table) { struct shared_table *st; struct peer * curpeer; int id = 0; for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) { st = (struct shared_table *)calloc(1,sizeof(struct shared_table)); st->table = table; st->next = curpeer->tables; if (curpeer->tables) id = curpeer->tables->local_id; st->local_id = id + 1; curpeer->tables = st; } table->sync_task = peers->sync_task; }