前言
1、节点角色
ZK本身的节点主要分为三类:
Leader:主要是负责数据的写入,如果超过半数同意,那么就广播进行写入;
Follower:主要负责查询请求并将写入请求发送给leader,参与选举和写入投票;
Observe:也是负责查询请求并将写入请求发送给leader,不参加投票,只被动接收结果
2、选举过程
2.1 胜出的条件
获取半数投票以上的节点成为leader节点。
2.2 比较的规则
万事万物都有一个准则,好的比较坏的,坏的比较更坏的,世上本没有痛苦,痛苦都是自己寻找的结果,海燕你可长点心吧,哎呀跑偏了。
(1)任期
(2)事务ID(ZK中的事务ID)
(3)节点编号(集群中每个节点的编号)
就这么简单?是的。规则就是这么简单,但是源码还是有那么一丢丢的绕。
3、代码逻辑综述
源码看着相对比较枯燥,但是作为一个手艺人,怎么能不去了解怎么做的呢,我们先来梳理一下代码的流程,方便更好的看第四部分内容。
节点先投自己一票,并进行广播
收到消息后
如果消息为空,就进行重新发送消息或者建立连接
如果消息不为空,且消息接收者和投票的leader都是合法节点就进行下 边步骤。
如果节点为looking节点
4、源码分析
选主的逻辑是在lookForLeader开始的,像金字塔的第一块砖一样,我们先看ZK选主的第一块砖lookForLeader,第一次看源码的时候一定要把握主线,忽略从线,等主线完全理清楚了之后才去处理从线,要不会陷入迷宫之中。
下边就是主要的投票代码,看里边的注释:
public Vote lookForLeader() throws InterruptedException {//省略代码synchronized (this) {//1、投票轮数加一logicalclock.incrementAndGet();//2、更新提议或者叫投票,三个参数:当前节点的id、ZK的最大事务id、任期updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = {}, proposed zxid=0x{}",self.getId(),Long.toHexString(proposedZxid));//发送通知sendNotifications();//省略后边代码}
更新投票或者投票的方法为:
synchronized void updateProposal(long leader, long zxid, long epoch) {proposedLeader = leader;proposedZxid = zxid;proposedEpoch = epoch;}
发送通知的方法为:
private void sendNotifications() {for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();//初始化待发送通知或者投票的报文、对象ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader, //投的leaderproposedZxid, //事务IDlogicalclock.get(), //投票或者选举的轮数QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, //任期qv.toString().getBytes(UTF_8));sendqueue.offer(notmsg);}}
待到山花烂漫时,她在丛中笑,消息都已经发完了,肯定就到了接收到选票的时候应该怎么操作了,接收选票的代码也是在lookForLeader中:
//在服务没有停止并且当前节点的状态是LOOKING时进行while循环while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {//200毫秒内有数据就立马取出Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);//当取出的数据为空的时候if (n == null) {//判断是否所有的投票都已经发送,如果发送完毕就重新发送if (manager.haveDelivered()) {sendNotifications();//如果没有发送完毕就建立连接} else {manager.connectAll();}notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);//self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)//上面一行代码默认返回false,所以下边的if判断永远不可能进入,当没有收到选票时就等待200ms后继续循环,即空转一轮。if (self.getQuorumVerifier() instanceof QuorumOracleMaj&& self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {setPeerState(proposedLeader, voteSet);Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}LOG.info("Notification time out: {} ms", notTimeout);//当接收到的消息不为空,并且是接收者和待选leader都是有效节点时进入当前逻辑} else if (validVoter(n.sid) && validVoter(n.leader)) {//先查看投票节点的状态switch (n.state) {case LOOKING:case OBSERVING:case FOLLOWING:case LEADING:default:LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);break;}} else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);}if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);}}}
case OBSERVING:LOG.debug("Notification from observer: {}", n.sid);break;
case FOLLOWING:Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);if (resultFN == null) {break;} else {return resultFN;}case LEADING:Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);if (resultLN == null) {break;} else {return resultLN;}
我们先把totalOrderPredicate方法放前边,这个其实就是选举leader的规则的实现。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {if (self.getQuorumVerifier().getWeight(newId) == 0) {return false;}return ((newEpoch > curEpoch)|| ((newEpoch == curEpoch)&& ((newZxid > curZxid)|| ((newZxid == curZxid)&& (newId > curId)))));}
case LOOKING://当前节点事务id为-1或者发送节点的事务id为-1时,忽略通知if (getInitLastLoggedZxid() == -1) {LOG.debug("Ignoring notification as our zxid is -1");break;}if (n.zxid == -1) {LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);break;}//如果发送节点的轮数大于当前节点的轮数 ,证明当前投票落后了if (n.electionEpoch > logicalclock.get()) {//就把当前节点的投票轮数设置成和发送节点轮数一样logicalclock.set(n.electionEpoch);//并且清空收到的选票,落后之后选票就无效了recvset.clear();/***这里是关键:*如果发送节点的选举的leader,按照规则比较后胜出*就把当前节点的选票修改成和发送节点一样的,*反之则把当前节点的选票保留,然后发送通知*/if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else {updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}sendNotifications();//如果当前节点的选举轮数比发送的大,证明接收到的选票无效} else if (n.electionEpoch < logicalclock.get()) {LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",Long.toHexString(n.electionEpoch),Long.toHexString(logicalclock.get()));break;//如果选举轮数一样,就比较发送节点的选票和当前节点的选票//按照规则发送节点的选择的leader胜出,就修改当前节点的选票为//发送节点一样,然后发送通知} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}//该发的通知发送了,该比较的都比较清楚了,//但是注意上边的代码,只有在选票不一致的时候才再次//进行投票,如果一样的话就不再进行投票了。//下边是把投票信息记录下来,为什么?因为要比较HALF,过半就选出//leader了,别一直忙活呀recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//该方法的作用是选的leader的一样的节点加在一样,为了比较是否过半voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));//该方法就是比较选择同样leader的节点是否过半,如果过半就进入//进行状态设置if (voteSet.hasAllQuorums()) {//这块并没有立马设置,而是又等了一轮(200ms),然后在这个200ms//内又接收到新的投票,并且新投票的选择的leader节点胜出,那么就//把这张投票放进队列中,进行下次循环,并跳出当前的计票或者选主流程while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {recvqueue.put(n);break;}}//如果200ms后没有收到新的投票,那么就进行节点状态的设置,//proposedLeader与当前节点id一样,当前节点就是leader,否则依据//是否为PARTICIPANT来判读为fllow或者observe节点if (n == null) {setPeerState(proposedLeader, voteSet);Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}}break;
结语
原创不易,肝了三夜才写完,如果文章对你的有帮助,欢迎点赞评论转发,最后如果你对技术有热爱,你对编程有热情,欢迎加入程序猿大家庭,咱们一起成长,一起进步。
如果你喜欢本文
请长按二维码,关注程序猿每日分享

转发至朋友圈,是对我最大的支持
点个在看
小时候有种感觉叫喜欢
长大后有种支持叫再看
💕💕💕💕💕💕




