zookeeper 集群中发生选举的场景有以下三种:

  • 集群启动时
  • Leader 节点重启时
  • Follower 节点重启时

本文主要针对集群启动时发生的选举实现进行分析。

ZK 集群中节点在启动时会调用QuorumPeer.start方法

public synchronized void start() {
    /**
     * 加载数据文件,获取 lastProcessedZxid, currentEpoch,acceptedEpoch
     */
    loadDataBase();

    /**
     * 启动主线程 用于处理客户端连接请求
     */
    cnxnFactory.start();

    /**
     * 开始 leader 选举; 会相继创建选举算法的实现,创建当前节点与集群中其他节点选举通信的网络IO,并启动相应工作线程
     */
    startLeaderElection();

    /**
     * 启动 QuorumPeer 线程,监听当前节点服务状态
     */
    super.start();
}

加载数据文件

loadDataBase 方法中,ZK 会通过加载数据文件获取 lastProcessedZxid , 并通过读取 currentEpoch , acceptedEpoch 文件来获取相对应的值;若上述两文件不存在,则以 lastProcessedZxid 的高 32 位作为 currentEpoch , acceptedEpoch 值并写入对应文件中。

初始选举环境

synchronized public void startLeaderElection() {
    try {
        // 创建投票
        currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    } catch(IOException e) {
    }
    // 从集群中节点列表,查找当前节点与其他进行信息同步的地址
    for (QuorumServer p : getView().values()) {
        if (p.id == myid) {
            myQuorumAddr = p.addr;
            break;
        }
    }
    if (myQuorumAddr == null) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }

    // electionType == 3
    this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
        // 忽略其他算法的实现
    case 3:
        /**
         * 创建 QuorumCnxManager 实例,并启动 QuorumCnxManager.Listener 线程用于与集群中其他节点进行选举通信;
         */
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();
            /**
             * 创建选举算法 FastLeaderElection 实例
             */
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

初始节点的相关实例之后,执行 super.start() 方法,因 QuorumPeer 类继承 ZooKeeperThread 故会启动 QuorumPeer 线程

public void run() {
        // 代码省略
        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        // 只读模式下代码省略
                    } else {
                        try {
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                // 忽略其他状态下的处理逻辑
                }
            }
        } finally {

        }
    }

选举

从上述代码可以看出 QuorumPeer 线程在运行过程中轮询监听当前节点的状态并进行相应的逻辑处理,集群启动时节点状态为 LOOKING (也就是选举 Leader 过程),此时会调用 FastLeaderElection.lookForLeader 方法 (也是投票选举算法的核心)简化后源码如下:

public Vote lookForLeader() throws InterruptedException {
        // 忽略
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                // logicalclock 逻辑时钟加一
                logicalclock.incrementAndGet();
                /**
                 * 更新提案信息,用于后续投票;集群启动节点默认选举自身为 Leader
                 */
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            /**
             * 发送选举投票提案
             */
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                /**
                 * 从 recvqueue 队列中获取外部节点的选举投票信息
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    /**
                     * 检查上一次发送的选举投票信息是否全部发送;
                     * 若已发送则重新在发送一遍,反之说明当前节点与集群中其他节点未连接,则执行 connectAll() 建立连接 
                     */
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
                else if(self.getVotingView().containsKey(n.sid)) {
                    /**
                     * 只处理同一集群中节点的投票请求
                     */ 
                    switch (n.state) {
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            /**
                             * 外部投票选举周期大于当前节点选举周期
                             * 
                             * step1 : 更新选举周期值
                             * step2 : 清空已收到的选举投票数据
                             * step3 : 选举投票 PK,选举规则参见 totalOrderPredicate 方法
                             * step4 : 变更选举投票并发送
                             */
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            // 丢弃小于当前选举周期的投票
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            /**
                             * 同一选举周期
                             *                            
                             * step1 : 选举投票 PK,选举规则参见 totalOrderPredicate 方法
                             * step2 : 变更选举投票并发送
                             */
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        /**
                         * 记录外部选举投票信息
                         */
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        /**
                         * 统计选举投票结果,判断是否可以结束此轮选举
                         */
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // ......

                            if (n == null) {
                                /**
                                 * 选举结束判断当前节点状态; 若提案的 leader == myid 则 state = LEADING, 反之为 FOLLOWING 
                                 */
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                // 变更当前投票信息
                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock.get(),
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        // ...... 
                        break;
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                n.state, n.sid);
                        break;
                    }
                } else {
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                }
            }
            return null;
        } finally {
            // ......
        }
    }

lookForLeader 方法的实现可以看出,选举流程如下:

  • 发送内部投票

    内部投票发送逻辑参考后续小节

  • 接收外部投票

    接收外部投票逻辑参考后续小节

  • 选举投票 PK

    当接收到外部节点投票信息后会与内部投票信息进行 PK 已确定投票优先权;PK 规则参见 totalOrderPredicate 方法如下

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }

    /*
     * We return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch) || 
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

从其实现可以看出选举投票 PK 规则如下:

* 比较外部投票与内部投票的选举周期值,选举周期大的值优先
* 若选举周期值一致,则比较事务 ID; 事务 ID 最新的优先
* 若选举周期值一致且事务 ID 值相同,则比较投票节点的 server id; server id 最大的优先
  • 统计选举投票

    当接收到外部投票之后,都会统计下此轮选举的投票情况并判断是否可结束选举; 参考 termPredicate 方法

protected boolean termPredicate(
            HashMap<Long, Vote> votes,
            Vote vote) {

    HashSet<Long> set = new HashSet<Long>();

    /**
     * 统计接收的投票中与当前节点所推举 leader 投票一致的个数
     */
    for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())){
            set.add(entry.getKey());
        }
    }

    /**
     * 如果超过一半的投票一致 则说明可以终止本次选举
     */
    return self.getQuorumVerifier().containsQuorum(set);
}
  • 确认节点角色

    当此轮选举结束之后,通过判断所推举的 leader server id 是否与当前节点 server id 相等; 若相等则说明当前节点为 leader, 反之为 follower。

发送接收投票

上文中主要聊了下 ZK 选举算法的核心部分,下面接着看下集群节点在选举过程中是如何发送自己的投票和接收外部的投票及相关处理逻辑。

首先通过 FastLeaderElection.sendNotifications 方法看下发送投票逻辑:

private void sendNotifications() {
    for (QuorumServer server : self.getVotingView().values()) {
        long sid = server.id;

        /**
         * 发送投票通知信息
         *
         * leader : 被推举的服务器 myid
         * zxid : 被推举的服务器 zxid
         * electionEpoch : 当前节点选举周期
         * ServerState state : 当前节点状态
         * sid : 消息接收方 myid
         * peerEpoch : 被推举的服务器 epoch
         */
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch);

        /**
         * 将消息添加到队列 sendqueue 中;
         *
         * @see Messenger.WorkerSender sendqueue 队列会被 WorkerSender 消费
         */
        sendqueue.offer(notmsg);
    }
}

从实现可以看出节点在启动阶段会将自身信息封装为 ToSend 实例(也就是选举自身为 leader)并添加到队列 FastLeaderElection.sendqueue 中;那么此时我们会问到 FastLeaderElection.sendqueue 队列中的消息被谁消费处理呢 ? 让我们回过头看下节点在启动初始化选举环境时创建 QuorumCnxManager, FastLeaderElection 实例的过程。

PS : FastLeaderElection.sendqueue 队列中消息被谁消费 ?

QuorumCnxManager

public QuorumCnxManager(final long mySid,
                            Map<Long,QuorumPeer.QuorumServer> view,
                            QuorumAuthServer authServer,
                            QuorumAuthLearner authLearner,
                            int socketTimeout,
                            boolean listenOnAllIPs,
                            int quorumCnxnThreadsSize,
                            boolean quorumSaslAuthEnabled,
                            ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
    this.senderWorkerMap = senderWorkerMap;
    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();

    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }

    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;

    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
            quorumSaslAuthEnabled);

    listener = new Listener();
}

QuorumCnxManager 实例化后,会启动一个 QuorumCnxManager.Listener 线程;同时在 QuorumCnxManager 实例中存在三个重要的集合容器变量:

  • senderWorkerMap : 发送器集合,Map 类型按 server id 分组;为集群中的每个节点分配一个 SendWorker 负责消息的发送
  • recvQueue : 消息接收队列,用于存放从外部节点接收到的投票消息
  • queueSendMap : 消息发送队列,Map 类型按 server id 分组;为集群中的每个节点分配一个阻塞队列存放待发送的消息,从而保证各个节点之间的消息发送互不影响

下面我们再看下 QuorumCnxManager.Listener 线程启动后,主要做了什么:

public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    while((!shutdown) && (numRetries < 3)){
        try {
            ss = new ServerSocket();
            ss.setReuseAddress(true);

            /**
             * 获取当前节点的选举地址并 bind 监听等待外部节点连接
             */
            addr = view.get(QuorumCnxManager.this.mySid).electionAddr;
            ss.bind(addr);

            while (!shutdown) {

                /**
                 * 接收外部节点连接并处理
                 */
                Socket client = ss.accept();
                setSockOpts(client);                
                receiveConnection(client);

                numRetries = 0;
            }
        } catch (IOException e) {
            LOG.error("Exception while listening", e);
            numRetries++;
            ss.close();
            Thread.sleep(1000);
        }
    }
}

跟踪代码发现 receiveConnection 方法最终会调用方法 handleConnection 如下

private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
    /**
     * 读取外部节点的 server id 
     * ps : 此时的 server id 是什么时候发送的呢 ?
     */
    Long sid = din.readLong();

    if (sid < this.mySid) {
        /**
         * 若外部节点的 server id 小于当前节点的 server id,则关闭此连接,改为由当前节点发起连接
         * ps : 该限制说明选举过程中,zk 只允许 server id 较大的一方去主动发起连接避免重复连接
         */
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }

        closeSocket(sock);
        connectOne(sid);
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if(vsw != null)
            vsw.finish();

        /**
         * 按 server id 分组,为外部节点分配 SendWorker, RecvWorker 和一个消息发送队列
         */
        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));

        /**
         * 启动外部节点对应的 SendWorker, RecvWorker 线程
         */
        sw.start();
        rw.start();

        return;
    }
}

至此会发现 QuorumCnxManager.Listener 线程处理逻辑如下:

  • 监听当前节点的 election address 等待接收外部节点连接
  • 读取外部节点的 server id 并与当前节点的 server id 比较;若前者小则关闭连接,改由当前节点发起连接
  • 反之为外部节点分配 SendWorker,RecvWorker 线程及消息发送队列

PS : 此处我们会有个疑问外部节点的 server id 是什么时候发送过来的呢 ?

下面我们在看下为每个外部节点开启了 SendWorkerRecvWorker 线程后做了什么:

  • SendWorker
public void run() {
    // 省略
    try {
        while (running && !shutdown && sock != null) {

            ByteBuffer b = null;
            try {
                /**
                 * 通过 server id 获取待发送给集群中节点的消息队列
                 */
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                        .get(sid);
                if (bq != null) {
                    /**
                     * 从队列中获取待发送的消息
                     */
                    b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                } else {
                    LOG.error("No queue of incoming messages for " +
                              "server " + sid);
                    break;
                }

                if(b != null){
                    lastMessageSent.put(sid, b);
                    /**
                     * 写入 socket 的输出流完成消息的发送
                     */
                    send(b);
                }
            } catch (InterruptedException e) {               
            }
        }
    } catch (Exception e) {        
    }
}

synchronized void send(ByteBuffer b) throws IOException {
    byte[] msgBytes = new byte[b.capacity()];
    try {
        b.position(0);
        b.get(msgBytes);
    } catch (BufferUnderflowException be) {
        LOG.error("BufferUnderflowException ", be);
        return;
    }
    /**
     * 发送的报文包括:消息体正文长度和消息体正文
     */
    dout.writeInt(b.capacity());
    dout.write(b.array());
    dout.flush();
}

通过代码实现我们知道 SendWorker 的职责就是从 queueSendMap 队列中获取待发送给远程节点的消息并执行发送。

PS : 此处我们会有个疑问 QuorumCnxManager.queueSendMap 中节点对应队列中待发送的消息是谁生产的呢 ?

  • RecvWorker
public void run() {
    threadCnt.incrementAndGet();
    try {
        while (running && !shutdown && sock != null) {
            /**
             * 读取外部节点发送的消息
             * 由 SendWorker 可知前 4 字节为消息载体有效长度
             */
            int length = din.readInt();
            if (length <= 0 || length > PACKETMAXSIZE) {
                throw new IOException(
                        "Received packet with invalid packet: "
                                + length);
            }
            /**
             * 读取消息体正文
             */
            byte[] msgArray = new byte[length];
            din.readFully(msgArray, 0, length);
            ByteBuffer message = ByteBuffer.wrap(msgArray);
            /**
             * 将读取的消息包装为 Message 对象添加到队列 recvQueue 中
             */
            addToRecvQueue(new Message(message.duplicate(), sid));
        }
    } catch (Exception e) {
        LOG.warn("Connection broken for id " + sid + ", my id = "
                 + QuorumCnxManager.this.mySid + ", error = " , e);
    } finally {
        LOG.warn("Interrupting SendWorker");
        sw.finish();
        if (sock != null) {
            closeSocket(sock);
        }
    }
}

public void addToRecvQueue(Message msg) {
    synchronized(recvQLock) {
        // 省略
        try {
            recvQueue.add(msg);
        } catch (IllegalStateException ie) {
            // This should never happen
            LOG.error("Unable to insert element in the recvQueue " + ie);
        }
    }
}

从上面可以看出 RecvWorker 线程在运行期间会接收 server id 对应的外部节点发送的消息,并将其放入 QuorumCnxManager.recvQueue 队列中。
到目前为止我们基本完成对 QuorumCnxManager 核心功能的分析,发现其功能主要是负责集群中当前节点与外部节点进行选举通讯的网络 IO 操作,譬如接收外部节点选举投票和向外部节点发送内部投票。

FastLeaderElection

下面我们在接着回头看下 FastLeaderElection 类实例的过程:

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}
Messenger(QuorumCnxManager manager) {
    /**
     * 启动 WorkerSender 线程用于发送消息
     */
    this.ws = new WorkerSender(manager);

    Thread t = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();

    /**
     * 启动 WorkerReceiver 线程用于接收消息
     */
    this.wr = new WorkerReceiver(manager);

    t = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}

FastLeaderElection 实例化过程我们知道,其内部分别启动了线程 WorkerSenderWorkerReceiver ;那么接下来看下这两个线程具体做什么吧。

WorkerSender
public void run() {
    while (!stop) {
        try {
            /**
             * 从 sendqueue 队列中获取 ToSend 待发送的消息
             */ 
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;

            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
}

void process(ToSend m) {
    // 将 ToSend 转换为 40字节 ByteBuffer
    ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
                                            m.leader,
                                            m.zxid, 
                                            m.electionEpoch, 
                                            m.peerEpoch);
    // 交由 QuorumCnxManager 执行发送
    manager.toSend(m.sid, requestBuffer);
}

看了 WorkerSender 的实现是不是明白了什么? 还记得上文中 FastLeaderElection.sendNotifications 方法执行发送通知的时候的疑惑吗 ? FastLeaderElection.sendqueue 队列产生的消息就是被 WorkerSender 线程所消费处理, WorkerSender 会将消息转发至 QuorumCnxManager 处理

public void toSend(Long sid, ByteBuffer b) {
    /*
     * If sending message to myself, then simply enqueue it (loopback).
     * 如果是发给自己的投票,则将其添加到接收队列中等待处理
     */
    if (this.mySid == sid) {
         b.position(0);
         addToRecvQueue(new Message(b.duplicate(), sid));
        /*
         * Otherwise send to the corresponding thread to send.
         */
    } else {
         /*
          * Start a new connection if doesn't have one already.
          */
         ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
         ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);

         // 将发送的消息放入对应的队列中,若队列满了则将队列头部元素移除
         if (bqExisting != null) {
             addToSendQueue(bqExisting, b);
         } else {
             addToSendQueue(bq, b);
         }
         connectOne(sid);

    }
}

private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
          ByteBuffer buffer) {
    // 省略
    try {
        // 将消息插入节点对应的队列中
        queue.add(buffer);
    } catch (IllegalStateException ie) {
    }
}

QuorumCnxManager 在收到 FastLeaderElection.WorkerSender 转发的消息时,会判断当前消息是否发给自己的投票,若是则将消息添加到接收队列中,反之会将消息添加到 queueSendMap 对应 server id 的队列中;看到这里的时候是不是就明白了在 QuorumCnxManager.SendWorker 分析时候的疑惑呢 。 这个时候投票消息未必能够发送出去,因为当前节点与外部节点的通道是否已建立还未知,所以继续执行 connectOne

synchronized public void connectOne(long sid){
    /**
     * 判断当前服务节点是否与 sid 外部服务节点建立连接;有可能对方先发起连接
     * 若已连接则等待后续处理,反之发起连接
     */
    if (!connectedToPeer(sid)){
        InetSocketAddress electionAddr;
        if (view.containsKey(sid)) {
            electionAddr = view.get(sid).electionAddr;
        } else {
            LOG.warn("Invalid server id: " + sid);
            return;
        }
        try {

            LOG.debug("Opening channel to server " + sid);
            Socket sock = new Socket();
            setSockOpts(sock);
            sock.connect(view.get(sid).electionAddr, cnxTO);
            LOG.debug("Connected to server " + sid);

            initiateConnection(sock, sid);

        } catch (UnresolvedAddressException e) {

        } catch (IOException e) {

        }
    } else {
        LOG.debug("There is a connection already for server " + sid);
    }
}

public boolean connectedToPeer(long peerSid) {
    return senderWorkerMap.get(peerSid) != null;
}
private boolean startConnection(Socket sock, Long sid)
            throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        /**
         * 发送当前节点的 server id,需告知对方我是哪台节点
         */
        dout = new DataOutputStream(sock.getOutputStream());
        dout.writeLong(this.mySid);
        dout.flush();

        din = new DataInputStream(
                new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        LOG.warn("Ignoring exception reading or writing challenge: ", e);
        closeSocket(sock);
        return false;
    }

    // 只允许 sid 值大的服务器去主动和其他服务器连接,否则断开连接
    if (sid > this.mySid) {
        LOG.info("Have smaller server identifier, so dropping the " +
                 "connection: (" + sid + ", " + this.mySid + ")");
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if(vsw != null)
            vsw.finish();

        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));

        sw.start();
        rw.start();

        return true;    

    }
    return false;
}

从上述代码可以看出节点在与外部节点连接后会先发送 myid 报文告知对方我是哪个节点(这也是为什么 QuorumCnxManager.Listener 线程在接收到一个连接请求时会先执行 getLong 获取 server id 了);同样在连接建立的时候也遵循一个原则(只允许 server id 较大的一方发起连接)。

WorkerReceiver
public void run() {

    Message response;
    while (!stop) {
        // Sleeps on receive
        try{
            /**
             * 从 QuorumCnxManager.recvQueue 队列中获取接收的外部投票
             */
            response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
            if(response == null) continue;

            if(!self.getVotingView().containsKey(response.sid)){
                // 忽略对方是观察者的处理
            } else {
                // Instantiate Notification and set its attributes
                Notification n = new Notification();

                   // 将 message 转成 notification 对象

                if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                    // 当前节点状态为 looking,则将外部节点投票添加到 recvqueue 队列中
                    recvqueue.offer(n);

                    if((ackstate == QuorumPeer.ServerState.LOOKING)
                            && (n.electionEpoch < logicalclock.get())){
                        // 若外部节点选举周期小于当前节点选举周期则发送内部投票
                        Vote v = getVote();
                        ToSend notmsg = new ToSend(ToSend.mType.notification,
                                v.getId(),
                                v.getZxid(),
                                logicalclock.get(),
                                self.getPeerState(),
                                response.sid,
                                v.getPeerEpoch());
                        sendqueue.offer(notmsg);
                    }
                } else {
                    // 忽略其他状态时的处理
                }
            }
        } catch (InterruptedException e) {
        }
    }
    LOG.info("WorkerReceiver is down");
}

此时我们明白 WorkerReceiver 线程在运行期间会一直从 QuorumCnxManager.recvQueue 的队列中拉取接收到的外部投票信息,若当前节点为 LOOKING 状态,则将外部投票信息添加到 FastLeaderElection.recvqueue 队列中,等待 FastLeaderElection.lookForLeader 选举算法处理投票信息。

到此我们基本明白了 ZK 集群节点发送和接收投票的处理流程,但是这个时候您是不是又有一种懵的状态呢 笑哭,我们会发现选举过程中依赖了多个线程 WorkerSender, SendWorker, WorkerReceiver, RecvWorker ,多个阻塞队列 sendqueue, recvqueue,queueSendMap,recvQueue 而且名字起的很类似,更让人懵 ; 不过莫慌,我们来通过下面的图来缕下思路

小结

看了这么长时间的代码,也够累的;最后我们就来个小结吧 :

  • QuorumCnxManager 类主要职能是负责集群中节点与外部节点进行通信及投票信息的中转

  • FastLeaderElection 类是选举投票的核心实现

  • 选举投票规则

    • 比较外部投票与内部投票的选举周期值,选举周期大的值优先
    • 若选举周期值一致,则比较事务 ID; 事务 ID 最新的优先
    • 若选举周期值一致且事务 ID 值相同,则比较投票节点的 server id; server id 最大的优先
  • 集群中节点通信时为了避免重复建立连接,遵守一个原则:连接总是由 server id 较大的一方发起



# zookeeper  

tocToc: