暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Elasticsearch源码分析之选举

民生运维人 2020-10-12
2582

在Elasticsearch中选举master节点是Elasticsearch集群中所有master-eligible节点需要一起完成的基本任务。要了解选举流程,我们需要回答几个问题,什么情况下会触发选举?具体选举流程是什么样的?与以前的版本相比,7.x都有哪些变化?现在我们来回答下这几个问题。

1.选举条件

Elasticsearch会在以下两种情况触发选举:

  • Elasticsearch集群启动
  • Elasticsearch集群当前的master节点挂掉

2.选举流程

Elasticsearch选举流程是基于Raft协议实现的:

(1)节点启动

从节点启动到进入选举的流程如下所示:

整个选举流程的入口如下:

    clusterService.addStateApplier(transportService.getTaskManager());
    // start after transport service so the local disco is known
    discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
    clusterService.start();
    assert clusterService.localNode().equals(localNodeFactory.getNode())
            : "clusterService has a different local node than the factory provided";
    transportService.acceptIncomingRequests();
    discovery.startInitialJoin();
    final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings());
    configureNodeAndClusterIdStateListener(clusterService);

startInitialJoin()方法如下,该方法中最重要的就是becomeCandidate()方法。

@Override
    public void startInitialJoin() {
        synchronized (mutex) {
            becomeCandidate("startInitialJoin");
        }
        clusterBootstrapService.scheduleUnconfiguredBootstrap();
    }

在becomeCandidate()方法中,它首先先将节点的状态转换为candidate。在activate()方法中调用onFoundPeersUpdated()方法检查集群中的节点是否达到多数,如果达到多数则会发起选举流程startElectionScheduler()。核心代码如下:

if (mode != Mode.CANDIDATE) {
    final Mode prevMode = mode;
    mode = Mode.CANDIDATE;
    cancelActivePublication("become candidate: " + method);
    joinAccumulator.close(mode);
    joinAccumulator = joinHelper.new CandidateJoinAccumulator();

    peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
    clusterFormationFailureHelper.start();

@Override
        protected void onFoundPeersUpdated() {
            synchronized (mutex) {
                final Iterable<DiscoveryNode> foundPeers = getFoundPeers();
                if (mode == Mode.CANDIDATE) {
                    final VoteCollection expectedVotes = new VoteCollection();
                    foundPeers.forEach(expectedVotes::addVote);
                    expectedVotes.addVote(Coordinator.this.getLocalNode());
                    final boolean foundQuorum = coordinationState.get().isElectionQuorum(expectedVotes);

                    if (foundQuorum) {
                        if (electionScheduler == null) {
                            startElectionScheduler();
                        }
                    } else {
                        closePrevotingAndElectionScheduler();
                    }
                }
            }

            clusterBootstrapService.onFoundPeersUpdated();
        }
    }

(2)Prevote

引入prevote的原因是为了防止集群发生毫无意义的反复选举的情况,例如:集群中存在网络故障使得某个节点暂时脱离集群,在该节点重新加入到集群后会干扰到集群的运行,具体分析:当该节点脱离集群后,在timeout时间内没有收到leader的心跳后就会发起新的选举,每次发起选举时term就会加1。由于网络隔离,该节点既不会被选为leader,也收不到leader的心跳,所以它会一直发起选举,使得其term不断增大。网络恢复后,当该节点重新加入集群时,由于其term比集群中其他节点的term大,导致集群中原来的节点更新自己的term并使leader变为follower,进行重新选举。为了防止这种情况发生,在选举开始前先进行prevote,以确认集群中大多数节点认为当前集群中没有leader。详细内容见:https://github.com/elastic/elasticsearch/pull/32847

prevote流程可以用下图来描述:

prevote流程的入口如下所示:

/**
     * Start a new pre-voting round.
     *
     * @param clusterState   the last-accepted cluster state
     * @param broadcastNodes the nodes from whom to request pre-votes
     * @return the pre-voting round, which can be closed to end the round early.
     */
    public Releasable start(final ClusterState clusterState, final Iterable<DiscoveryNode> broadcastNodes) {
        PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());
        preVotingRound.start(broadcastNodes);
        return preVotingRound;
    }

void start(final Iterable<DiscoveryNode> broadcastNodes) {
            assert StreamSupport.stream(broadcastNodes.spliterator(), false).noneMatch(Coordinator::isZen1Node) : broadcastNodes;
            logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
            broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
                new TransportResponseHandler<PreVoteResponse>() {

在prevote流程中节点会向所有discoveredNodes发送REQUEST_PRE_VOTE_ACTION_NAME请求,等待discoveredNodes的response并调用handlePreVoteResponse()处理。discoveredNodes在收到PreVoteRequest后会调用handlePreVoteRequest来处理。

handlePreVoteRequest()的处理逻辑如下:

  • 更新MaxTermSeen:updateMaxTermSeen.accept(request.getCurrentTerm());
  • 判断leader是否为空,如果为空则直接返回response
  • 判断发起PreVoteRequest的节点是否是leader,如果是则直接返回response
  • 如果集群存在leader,并且这个leader不是发起PreVoteRequest的节点,则抛出异常:拒绝PreVoteRequest请求,因为集群中已存在leader
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
        updateMaxTermSeen.accept(request.getCurrentTerm());

        Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
        assert state != null : "received pre-vote request before fully initialised";

        final DiscoveryNode leader = state.v1();
        final PreVoteResponse response = state.v2();

        if (leader == null) {
            return response;
        }

        if (leader.equals(request.getSourceNode())) {
            // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible
            // that the leader lost its quorum, but while we'
re still a follower we will not offer joins to any other node so there is no
            // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
            // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
            // to also detect its failure.
            return response;
        }

        throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
    }

handlePreVoteResponse()的处理逻辑如下:

  • 更新MaxTermSeen:updateMaxTermSeen.accept(response.getCurrentTerm());
  • 如果满足以下两种情况之一,则忽略该response:
    • response中lastAcceptedTerm大于clusterState中的term
    • response中lastAcceptedTerm 等于clusterState中的term并且response中lastAcceptedVersion大于clusterState中的version
  • 节点接收所有没有被忽略的response
  • 节点根据接收到的response来构造Join(选票)
  • 调用electionStrategy.isElectionQuorum()判断选票是否达到大多数,如果没有则直接返回
  • 根据electionStarted来判断选举是否已经开始,如果已经开始则直接返回
  • 调用startElection.run()来发起选举

handlePreVoteResponse()的核心代码如下:

updateMaxTermSeen.accept(response.getCurrentTerm());
if (response.getLastAcceptedTerm() > clusterState.term()
    || (response.getLastAcceptedTerm() == clusterState.term()
    && response.getLastAcceptedVersion() > clusterState.getVersionOrMetaDataVersion())) {
    logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
    return;
}
preVotesReceived.put(sender, response);
// create a fake VoteCollection based on the pre-votes and check if there is an election quorum
final VoteCollection voteCollection = new VoteCollection();
final DiscoveryNode localNode = clusterState.nodes().getLocalNode();
final PreVoteResponse localPreVoteResponse = getPreVoteResponse();
preVotesReceived.forEach((node, preVoteResponse) -> voteCollection.addJoinVote(
    new Join(node, localNode, preVoteResponse.getCurrentTerm(),
    preVoteResponse.getLastAcceptedTerm(), preVoteResponse.getLastAcceptedVersion())));
if (electionStrategy.isElectionQuorum(clusterState.nodes().getLocalNode(), localPreVoteResponse.getCurrentTerm(),
    localPreVoteResponse.getLastAcceptedTerm(), localPreVoteResponse.getLastAcceptedVersion(),
    clusterState.getLastCommittedConfiguration(), clusterState.getLastAcceptedConfiguration(), voteCollection) == false) {
    logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
    return;
}
if (electionStarted.compareAndSet(falsetrue) == false) {
    logger.debug("{} added {} from {} but election has already started", this, response, sender);
    return;
}
logger.debug("{} added {} from {}, starting election", this, response, sender);
startElection.run();

(3)正式选举

正式选举流程可以用下图来描述:

在进行正式选举时,节点会构造StartJoinRequest,并将该请求发送给discoveredNodes,请求中的term的值取节点当前 term 与maxTermSeen的最大值并加1。该节点的discoveredNodes 在接收到StartJoinRequest后会使用handleStartJoin(StartJoinRequest startJoinRequest)方法来处理该请求:如果StartJoinRequest中的term大于discoveredNodes的currentTerm,就会构造Join来为节点投票。

核心代码如下:

final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
getDiscoveredNodes().forEach(node -> {
    if (isZen1Node(node) == false) {
        joinHelper.sendStartJoinRequest(startJoinRequest, node);
    }
});

public Join handleStartJoin(StartJoinRequest startJoinRequest) {
    if (startJoinRequest.getTerm() <= getCurrentTerm()) {
        logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
            startJoinRequest, getCurrentTerm());
        throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() +
            " not greater than current term " + getCurrentTerm());
    }

    logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest);

    if (joinVotes.isEmpty() == false) {
        final String reason;
        if (electionWon == false) {
            reason = "failed election";
        } else if (startJoinRequest.getSourceNode().equals(localNode)) {
            reason = "bumping term";
        } else {
            reason = "standing down as leader";
        }
        logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason);
    }

    persistedState.setCurrentTerm(startJoinRequest.getTerm());
    assert getCurrentTerm() == startJoinRequest.getTerm();
    lastPublishedVersion = 0;
    lastPublishedConfiguration = getLastAcceptedConfiguration();
    startedJoinSinceLastReboot = true;
    electionWon = false;
    joinVotes = new VoteCollection();
    publishVotes = new VoteCollection();

    return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(),
        getLastAcceptedVersionOrMetaDataVersion());
}

当节点收到选票后会使用handleJoin()方法来处理,具体处理逻辑如下:

  • 判断选民返回的投票(Join)中的term与当前节点的term的是否相等,如果不相等则会抛出CoordinationStateRejectedException( “incoming term does not match current term ” ),拒绝将选票添加
  • 判断startedJoinSinceLastReboot是否为false,这个场景是指在节点reboot后term没有增加,当startedJoinSinceLastReboot为false时,抛出CoordinationStateRejectedException(“ignored join as term has not been incremented yet after reboot”),拒绝将选票添加
  • 判断选民返回的投票(Join)中的lastAcceptedTerm与当前节点的lastAcceptedTerm的关系,如果前者大于后者,则抛出异常CoordinationStateRejectedException,拒绝将选票添加
  • 如果选民返回的投票(Join)中的lastAcceptedTerm与当前节点的lastAcceptedTerm相等,但是Join中的lastAcceptedVersion大于当前节点的lastAcceptedVersion,则抛出CoordinationStateRejectedException并拒绝将选票添加
  • 判断节点的lastAcceptedConfiguration是否为空,如果为空则抛出CoordinationStateRejectedException
  • 将该选票添加到joinVotes中
  • 判断是否到达法定人数
public boolean handleJoin(Join join) {
    assert join.targetMatches(localNode) : "handling join " + join + " for the wrong node " + localNode;

    if (join.getTerm() != getCurrentTerm()) {
        logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])",
            getCurrentTerm(), join.getTerm());
        throw new CoordinationStateRejectedException(
            "incoming term " + join.getTerm() + " does not match current term " + getCurrentTerm());
    }

    if (startedJoinSinceLastReboot == false) {
        logger.debug("handleJoin: ignored join as term was not incremented yet after reboot");
        throw new CoordinationStateRejectedException("ignored join as term has not been incremented yet after reboot");
    }

    final long lastAcceptedTerm = getLastAcceptedTerm();
    if (join.getLastAcceptedTerm() > lastAcceptedTerm) {
        logger.debug("handleJoin: ignored join as joiner has a better last accepted term (expected: <=[{}], actual: [{}])",
            lastAcceptedTerm, join.getLastAcceptedTerm());
        throw new CoordinationStateRejectedException("incoming last accepted term " + join.getLastAcceptedTerm() +
            " of join higher than current last accepted term " + lastAcceptedTerm);
    }

    if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersionOrMetaDataVersion()) {
        logger.debug(
            "handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}]) in term {}",
            getLastAcceptedVersionOrMetaDataVersion(), join.getLastAcceptedVersion(), lastAcceptedTerm);
        throw new CoordinationStateRejectedException("incoming last accepted version " + join.getLastAcceptedVersion() +
            " of join higher than current last accepted version " + getLastAcceptedVersionOrMetaDataVersion()
            + " in term " + lastAcceptedTerm);
    }

    if (getLastAcceptedConfiguration().isEmpty()) {
        // We do not check for an election won on setting the initial configuration, so it would be possible to end up in a state where
        // we have enough join votes to have won the election immediately on setting the initial configuration. It'd be quite
        // complicated to restore all the appropriate invariants when setting the initial configuration (it'
s not just electionWon)
        // so instead we just reject join votes received prior to receiving the initial configuration.
        logger.debug("handleJoin: rejecting join since this node has not received its initial configuration yet");
        throw new CoordinationStateRejectedException("rejecting join since this node has not received its initial configuration yet");
    }

    boolean added = joinVotes.addJoinVote(join);
    boolean prevElectionWon = electionWon;
    electionWon = isElectionQuorum(joinVotes);
    assert !prevElectionWon || electionWon : // we cannot go from won to not won
        "locaNode= " + localNode + ", join=" + join + ", joinVotes=" + joinVotes;
    logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join,
        join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());

    if (electionWon && prevElectionWon == false) {
        logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes);
        lastPublishedVersion = getLastAcceptedVersion();
    }
    return added;
}

最后processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback)方法中当该节点收到多数的投票后会调用becomeLeader("handleJoinRequest")方法使得该节点的状态由candidate转换为leader。至此集群的master节点就选出来了。上述的核心代码如下:

private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
    final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
    synchronized (mutex) {
        final CoordinationState coordState = coordinationState.get();
        final boolean prevElectionWon = coordState.electionWon();

        optionalJoin.ifPresent(this::handleJoin);
        joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback);

        if (prevElectionWon == false && coordState.electionWon()) {
            becomeLeader("handleJoinRequest");
        }
    }
}

上述描述的正常的选举流程,我们现在来考虑以下场景中如何选举?

  • 当集群中有一个master-eligible节点已经启动并且已经成为leader后,如果有节点要加入当前的master,具体如何实现?这个主要是靠PeerFinder 实现,PeerFinder如果发现了一个 Master 节点,就会执行 onActiveMasterFound ,调用 joinLeaderInTerm构造一个选票,然后 sendJoinRequest加入现有集群。
  • 整个集群重启与集群第一次启动有些类似,区别在于节点在启动的时候会从data目录加载集群的clusterstate
  • 缩容master-eligible节点分为两种情况,一种是缩容的节点是当前集群的master节点,一种是master-eligible节点。如果是master节点,则当master节点下线后,集群中会有master-eligible节点发现当前没有master,然后该节点会使自己转换为candidate然后发起选举;如果是master-eligible节点则不会对当前集群有太多影响,除了会使得集群的state发生变化。

3.选举相关信息

(1)7.x版本中Elasticsearch节点类型分为以下四种类型,选举流程主要是针对master-eligible node

master-eligible node:节点具有被选为master node的资格并且该节点的配置文件中node.master需要设置为true

(2)在7.x中引入了FollowersChecker和LeaderChecker。FollowersChecker的作用是:Leader检查follower是否可以连接及follower是否healthy。当发现follower failed后会将其从集群中删除。默认检查的频率是1s。LeaderChecker的作用是:Follower检查leader是否可以连接及leader是否healthy。

(3)在7.x版本中在配置文件中新增了cluster.initial_master_nodes配置选项,该选项必须进行配置,集群在第一次集群过程中会检查该项配置。具体如何进行配置可以参考官方链接:

https://www.elastic.co/guide/en/elasticsearch/reference/7.4/discovery-settings.html#initial_master_nodes https://www.elastic.co/guide/en/elasticsearch/reference/7.4/modules-discovery-settings.html

public static boolean discoveryIsConfigured(Settings settings) {
        return Stream.of(DISCOVERY_SEED_PROVIDERS_SETTING, LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING,
            DISCOVERY_SEED_HOSTS_SETTING, LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
            INITIAL_MASTER_NODES_SETTING).anyMatch(s -> s.exists(settings));
    }

(4)在集群的选举过程中会调用isElectionQuoum()方法来判断集群中的节点是否达到多数,其判断依据如下:

  • 判断当前节点收到的投票的节点与lastCommittedConfiguration 中的节点取交集,交集中节点的个数是否大于lastCommittedConfiguration 节点个数的一半
  • 判断 当前节点收到的投票的节点与lastAcceptedConfiguration中的节点取交集,交集中节点的个数是否大于lastAcceptedConfiguration节点个数的一半
  • 如果前两项同样达到多数的要求则认为达到多数

核心代码如下:

/**
     * Whether there is an election quorum from the point of view of the given local node under the provided voting configurations
     */
    public final boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm, long localAcceptedVersion,
                                          VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
                                          VoteCollection joinVotes) {
        return joinVotes.isQuorum(lastCommittedConfiguration) &&
            joinVotes.isQuorum(lastAcceptedConfiguration) &&
            satisfiesAdditionalQuorumConstraints(localNode, localCurrentTerm, localAcceptedTerm, localAcceptedVersion,
                lastCommittedConfiguration, lastAcceptedConfiguration, joinVotes);
    }

(5)在7.x版本中引入了coordination。cluster state的不一致会导致搜索结果不一致和数据丢失,引入coordination就是为了避免这种不一致。原来在老版本中错误的配置minimum_master_nodes是导致集群状态不一致的最常见原因之一。官方链接:

https://github.com/elastic/elasticsearch/issues/32006

(6)在Elasticsearch中有个很重要的类ElectionScheduler,它在集群中没有master时,负责随机调度选举,在出现故障时及时退出,可以在快速选举主节点以及多个node发起选举中平衡。

    private static final String ELECTION_INITIAL_TIMEOUT_SETTING_KEY = "cluster.election.initial_timeout";
    private static final String ELECTION_BACK_OFF_TIME_SETTING_KEY = "cluster.election.back_off_time";
    private static final String ELECTION_MAX_TIMEOUT_SETTING_KEY = "cluster.election.max_timeout";
    private static final String ELECTION_DURATION_SETTING_KEY = "cluster.election.duration";
/*
     * The first election is scheduled to occur a random number of milliseconds after the scheduler is started, where the random number of
     * milliseconds is chosen uniformly from
     *
     *     (0, min(ELECTION_INITIAL_TIMEOUT_SETTING, ELECTION_MAX_TIMEOUT_SETTING)]
     *
     * For `n > 1`, the `n`th election is scheduled to occur a random number of milliseconds after the `n - 1`th election, where the random
     * number of milliseconds is chosen uniformly from
     *
     *     (0, min(ELECTION_INITIAL_TIMEOUT_SETTING + (n-1) * ELECTION_BACK_OFF_TIME_SETTING, ELECTION_MAX_TIMEOUT_SETTING)]
     *
     * Each election lasts up to ELECTION_DURATION_SETTING.
     */

    public static final Setting<TimeValue> ELECTION_INITIAL_TIMEOUT_SETTING = Setting.timeSetting(ELECTION_INITIAL_TIMEOUT_SETTING_KEY,
        TimeValue.timeValueMillis(100), TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(10), Property.NodeScope);

    public static final Setting<TimeValue> ELECTION_BACK_OFF_TIME_SETTING = Setting.timeSetting(ELECTION_BACK_OFF_TIME_SETTING_KEY,
        TimeValue.timeValueMillis(100), TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(60), Property.NodeScope);

    public static final Setting<TimeValue> ELECTION_MAX_TIMEOUT_SETTING = Setting.timeSetting(ELECTION_MAX_TIMEOUT_SETTING_KEY,
        TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(200), TimeValue.timeValueSeconds(300), Property.NodeScope);

    public static final Setting<TimeValue> ELECTION_DURATION_SETTING = Setting.timeSetting(ELECTION_DURATION_SETTING_KEY,
        TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(300), Property.NodeScope);

(7)在Elasticsearch中Coordinator定义了节点的三种状态:candidate, leader, follower,JoinHelper中包含了有关处理join的方法

(8)在7.x版本中引入了voting configuration,它其实是master-eligible nodes的集合,在选举新的master或者发布新的cluster state需要做出决策时,会对这些节点的响应进行计算,官方链接:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-quorums.html

(9)Elasticsearch中的核心算法的模型可以参照:https://github.com/elastic/elasticsearch-formal-models

孙玺

中国民生银行信息科技部开源软件支持组工程师,目前主要负责与elasticsearch相关的大数据方面的工作。



文章转载自民生运维人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论