diff --git a/src/peers.c b/src/peers.c index ca858d7da..53eda995f 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1069,21 +1069,11 @@ void __peer_session_deinit(struct peer *peer) /* Re-init current table pointers to force announcement on re-connect */ peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL; peer->appctx = NULL; - if (peer->flags & PEER_F_LEARN_ASSIGN) { - /* unassign current peer for learning */ - peer->flags &= ~(PEER_F_LEARN_ASSIGN); - peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - if (peer->local) - peers->flags |= PEERS_F_RESYNC_LOCALABORT; - else - peers->flags |= PEERS_F_RESYNC_REMOTEABORT; - /* reschedule a resync */ - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); - } - /* reset teaching and learning flags to 0 */ - peer->flags &= PEER_TEACH_RESET; - peer->flags &= PEER_LEARN_RESET; + /* Mark peer as released */ + peer->flags &= PEER_STATE_RESET; + peer->flags |= PEER_F_ST_RELEASED; + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } @@ -2512,64 +2502,19 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) { TRACE_PROTO("received control message", PEERS_EV_CTRLMSG, NULL, &msg_head[1], peers->local->id, peer->id); - if (peer->flags & PEER_F_LEARN_ASSIGN) { - int commit_a_finish = 1; - - peer->flags &= ~PEER_F_LEARN_ASSIGN; - peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - if (peer->srv->shard) { - struct peer *ps; - - peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL; - peer->flags |= PEER_F_LEARN_NOTUP2DATE; - for (ps = peers->remote; ps; ps = ps->next) { - if (ps->srv->shard == peer->srv->shard) { - /* flag all peers from same shard - * notup2date to disable request - * of a resync frm them - */ - ps->flags |= PEER_F_LEARN_NOTUP2DATE; - } - else if (ps->srv->shard && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { - /* it remains some other shards not requested - * we don't commit a resync finish to request - * the other shards - */ - commit_a_finish = 0; - } - } - - if (!commit_a_finish) { - /* it remains some shard to request, we schedule a new request - */ - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); - task_wakeup(peers->sync_task, TASK_WOKEN_MSG); - } - } - - if (commit_a_finish) { - peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE); - if (peer->local) - peers->flags |= PEERS_F_RESYNC_LOCALFINISHED; - else - peers->flags |= PEERS_F_RESYNC_REMOTEFINISHED; - } + if (peer->flags & PEER_F_LEARN_PROCESS) { + peer->flags &= ~PEER_F_LEARN_PROCESS; + peer->flags |= PEER_F_LEARN_FINISHED; + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } peer->confirm++; } else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) { TRACE_PROTO("received control message", PEERS_EV_CTRLMSG, NULL, &msg_head[1], peers->local->id, peer->id); - if (peer->flags & PEER_F_LEARN_ASSIGN) { - peer->flags &= ~PEER_F_LEARN_ASSIGN; - peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - - if (peer->local) - peers->flags |= PEERS_F_RESYNC_LOCALPARTIAL; - else - peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL; - peer->flags |= PEER_F_LEARN_NOTUP2DATE; - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + if (peer->flags & PEER_F_LEARN_PROCESS) { + peer->flags &= ~PEER_F_LEARN_PROCESS; + peer->flags |= (PEER_F_LEARN_FINISHED|PEER_F_LEARN_NOTUP2DATE); task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } peer->confirm++; @@ -2667,15 +2612,12 @@ static inline int peer_send_msgs(struct appctx *appctx, int repl; /* Need to request a resync */ - if ((peer->flags & PEER_F_LEARN_ASSIGN) && - (peers->flags & PEERS_F_RESYNC_ASSIGN) && - !(peers->flags & PEERS_F_RESYNC_PROCESS)) { - + if ((peer->flags & (PEER_F_LEARN_ASSIGN|PEER_F_LEARN_PROCESS|PEER_F_LEARN_FINISHED)) == PEER_F_LEARN_ASSIGN) { repl = peer_send_resync_reqmsg(appctx, peer, peers); if (repl <= 0) return repl; - peers->flags |= PEERS_F_RESYNC_PROCESS; + peer->flags |= PEER_F_LEARN_PROCESS; } /* Nothing to read, now we start to write */ @@ -2906,6 +2848,9 @@ static inline void init_accepted_peer(struct peer *peer, struct peers *peers) /* Init confirm counter */ peer->confirm = 0; + peer->flags &= PEER_STATE_RESET; + peer->flags |= PEER_F_ST_ACCEPTED; + /* Init cursors */ for (st = peer->tables; st ; st = st->next) { uint commitid, updateid; @@ -2937,30 +2882,6 @@ static inline void init_accepted_peer(struct peer *peer, struct peers *peers) HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); } - - /* reset teaching and learning flags to 0 */ - peer->flags &= PEER_TEACH_RESET; - peer->flags &= PEER_LEARN_RESET; - - /* if current peer is local */ - if (peer->local) { - /* if current host need resyncfrom local and no process assigned */ - if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL && - !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { - /* assign local peer for a lesson, consider lesson already requested */ - peer->flags |= PEER_F_LEARN_ASSIGN; - peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - peers->flags |= PEERS_F_RESYNC_LOCALASSIGN; - } - - } - else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && - !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { - /* assign peer for a lesson */ - peer->flags |= PEER_F_LEARN_ASSIGN; - peers->flags |= PEERS_F_RESYNC_ASSIGN; - peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN; - } } /* @@ -3003,28 +2924,14 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers) HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); } + /* Awake main task */ + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); + /* Init confirm counter */ peer->confirm = 0; - /* reset teaching and learning flags to 0 */ - peer->flags &= PEER_TEACH_RESET; - peer->flags &= PEER_LEARN_RESET; - - /* If current peer is local */ - if (peer->local) { - /* flag to start to teach lesson */ - peer->flags |= PEER_F_TEACH_PROCESS; - } - else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && - !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { - /* If peer is remote and resync from remote is needed, - and no peer currently assigned */ - - /* assign peer for a lesson */ - peer->flags |= PEER_F_LEARN_ASSIGN; - peers->flags |= PEERS_F_RESYNC_ASSIGN; - peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN; - } + peer->flags &= PEER_STATE_RESET; + peer->flags |= PEER_F_ST_CONNECTED; } /* @@ -3108,6 +3015,11 @@ switchstate: */ curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); peer_session_forceshutdown(curpeer); + + /* old peer connection was replaced by a new one. */ + curpeer->flags &= PEER_STATE_RESET; + curpeer->flags |= PEER_F_ST_RENEWED; + curpeer->heartbeat = TICK_ETERNITY; curpeer->coll++; } @@ -3413,12 +3325,121 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer static void __process_peer_learn_status(struct peers *peers, struct peer *peer) { + struct peer *ps; + + if (peer->flags & PEER_F_LEARN_PROCESS) + peers->flags |= PEERS_F_RESYNC_PROCESS; + + if (!(peer->flags & PEER_F_LEARN_FINISHED)) + return; + + if (peer->flags & PEER_F_LEARN_NOTUP2DATE) { + /* Partial resync */ + peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALPARTIAL : PEERS_F_RESYNC_REMOTEPARTIAL); + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + } + else { + /* Full resync */ + int commit_a_finish = 1; + + if (peer->srv->shard) { + peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL; + peer->flags |= PEER_F_LEARN_NOTUP2DATE; + for (ps = peers->remote; ps; ps = ps->next) { + if (ps->srv->shard && ps != peer) { + if (ps->srv->shard == peer->srv->shard) { + /* flag all peers from same shard + * notup2date to disable request + * of a resync frm them + */ + ps->flags |= PEER_F_LEARN_NOTUP2DATE; + } + else if (!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { + /* it remains some other shards not requested + * we don't commit a resync finish to request + * the other shards + */ + commit_a_finish = 0; + } + } + } + + if (!commit_a_finish) { + /* it remains some shard to request, we schedule a new request */ + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + } + } + + if (commit_a_finish) { + peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE); + peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALFINISHED : PEERS_F_RESYNC_REMOTEFINISHED); + } + } + peer->flags &= ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_PROCESS|PEER_F_LEARN_FINISHED); + peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); } static void __process_peer_state(struct peers *peers, struct peer *peer) { if (peer->flags & PEER_F_RESYNC_REQUESTED) peers->flags |= PEERS_F_RESYNC_REQUESTED; + + /* Check peer state. Order is important */ + if (peer->flags & (PEER_F_ST_RELEASED|PEER_F_ST_RENEWED)) { + if (peer->flags & PEER_F_LEARN_ASSIGN) { + /* unassign current peer for learning */ + peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); + peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALABORT : PEERS_F_RESYNC_REMOTEABORT); + /* reschedule a resync */ + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + } + peer->flags &= PEER_TEACH_RESET; + peer->flags &= PEER_LEARN_RESET; + } + if (peer->flags & (PEER_F_ST_ACCEPTED|PEER_F_ST_RENEWED)) { + peer->flags &= PEER_TEACH_RESET; + peer->flags &= PEER_LEARN_RESET; + + /* if current peer is local */ + if (peer->local) { + /* if current host need resyncfrom local and no process assigned */ + if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* assign local peer for a lesson, consider lesson already requested */ + peer->flags |= (PEER_F_LEARN_ASSIGN|PEERS_F_RESYNC_PROCESS); + peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); + peers->flags |= PEERS_F_RESYNC_LOCALASSIGN; + } + } + else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* assign peer for a lesson */ + peer->flags |= PEER_F_LEARN_ASSIGN; + peers->flags |= PEERS_F_RESYNC_ASSIGN; + peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN; + } + } + if (peer->flags & PEER_F_ST_CONNECTED) { + peer->flags &= PEER_TEACH_RESET; + peer->flags &= PEER_LEARN_RESET; + + /* If current peer is local */ + if (peer->local) { + /* flag to start to teach lesson */ + peer->flags |= PEER_F_TEACH_PROCESS; + } + else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* If peer is remote and resync from remote is needed, + and no peer currently assigned */ + + /* assign peer for a lesson */ + peer->flags |= PEER_F_LEARN_ASSIGN; + peers->flags |= PEERS_F_RESYNC_ASSIGN; + peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN; + } + } + peer->flags &= PEER_STATE_RESET; } static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)