暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Consul 核心原理解析之-Raft 数据同步

云中间件技术 2018-12-11
1348

2 日志拷贝

继上一章节,当集群中选出了leader,接下来关键的工作是将收到数据同步到Follower,即,leader接受到客户端的cmd请求,构造raft log (类似mysql binlog),并且log拷贝到Follower,得到集群中过半数节点存储成功确认后,给客户端返回成功。

首先看一下runLeader的处理逻辑

raft库中日志拷贝有两种方式: 1)对每个follower会单独开一个协程进行拷贝操作,并且是串行拷贝的,每条日志项都是严格按照index的顺序进行拷贝的,并且每次拷贝请求都要等待follower的ack,才会执行下一次拷贝。 2) 并行拷贝(pipeline),一个协程里面会并行进行拷贝操作,不等对端的follower返回,继续处理其他的拷贝请求

2.1 leader端主循环

  1. func (r *Raft) runLeader() {

  2.    ......

  3.    // Start a replication routine for each peer

  4.    r.startStopReplication()  //leader节点根据当前的configuration,为集群中其他节点开启一个复制日志的goroutine

  5.    ......

  6.    noop := &logFuture{

  7.        log: Log{

  8.            Type: LogNoop,

  9.        },

  10.    }

  11.    r.dispatchLogs([]*logFuture{noop})  //成为leader,raft安全性要求,必须马上复制一条noop的日志

  12.    // Sit in the leader loop until we step down

  13.    r.leaderLoop() //进入leader主循环,接受新来的日志等

  14. }

关于noop日志类型:

Leader一旦当选,立即向其它节点同步一个心跳消息(no-op)。这是为了确保当前没有提交的日志也能尽快得到提交。Leader只会追加日志序列,刚当选时已经存在的日志序列,Leader会努力将它们同步到所有节点,如果Follower存在的日志和Leader有冲突,就会被抹平。最终Leader和Follower的日志就会完全一致。 在raft算法的论文中,&5.4.2章节有详细说明该日志的作用

从上面分析可以看出,成为leader后,主要是有三个大步骤:首先是新建goroutine来接受复制日志的请求,然后通过函数dispatchLogs发送一条noop的日志,最后进入r.leaderLoop()主循环,接受其他goroutine的信号请求。

可以看到,日志拷贝中,函数dispatchLogs是关键性函数,下面看看该函数的实现。

  1. func (r *Raft) dispatchLogs(applyLogs []*logFuture) {//分发日志,第一次会增加一条noop日志,index=2

  2.    term := r.getCurrentTerm()  //当前的leader任期,第一次成为leader的时候,这里应该是2

  3.    lastIndex := r.getLastIndex() //最后index,初始化的时候应该为1

  4.    logs := make([]*Log, len(applyLogs))


  5.    for idx, applyLog := range applyLogs {

  6.        applyLog.dispatch = now

  7.        lastIndex++

  8.        applyLog.log.Index = lastIndex

  9.        applyLog.log.Term = term

  10.        logs[idx] = &applyLog.log

  11.        r.leaderState.inflight.PushBack(applyLog)   //这里保存了将要apply的日志

  12.    }


  13.    if err := r.logs.StoreLogs(logs); err != nil {   //leader本地写数据失败,那么马上通知调用方失败

  14.        for _, applyLog := range applyLogs {

  15.        {

  16.            //applylog里面有一个channel,外部调用者通过这个channel来获取反馈。respond就往该channel写入操作结果

  17.            applyLog.respond(err)

  18.        }

  19.        //写日志失败,leader直接变成follower,每次runLeader的时候,会自动把inflight清空,新建一个链表

  20.        r.setState(Follower)

  21.        return

  22.    }

  23.    /**

  24.     * 这里是计算集群中大多数节点已经写到磁盘的日志的最大index,称作quorumMatchIndex;

  25.     * 如果quorumMatchIndex > commitIndex(已知的最大的已经被提交的日志条目的索引值);

  26.     * 会通过commitCh发起一个apply请求,触发更新本地的fsm

  27.     */

  28.    r.leaderState.commitment.match(r.localID, lastIndex)


  29.    // Update the last log since it's on disk now

  30.    r.setLastLog(lastIndex, term)  //保存本地最后一条日志的index和term


  31.    // Notify the replicators of the new log

  32.    for _, f := range r.leaderState.replState {  //通知所有节点,已经写入了新的日志,需要拷贝日志

  33.        //这里会通知到r.startStopReplication()里面创建的goroutine

  34.        //goroutine中,通过s.triggerCh来接受日志拷贝请求,然后给对端发起AppendEntriesRequest的rpc请求

  35.        asyncNotifyCh(f.triggerCh)

  36.    }

  37. }

2.2 leader接受客户端的日志请求

前面介绍的是,candidate 转换为-> leader的时候,在leader主循环中同步一条noop日志和往期日志的逻辑。在raft集群正常运行的情况,大多数还是leader收到客户端的日志请求,然后分发处理的场景。该处理逻辑放到前面提高的leader主循环函数leaderloop里,对应的raft库中,对于调用提供了提交日志的api函数func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture,调用者可以使用这个函数apply一条日志。 Apply函数:

  1. func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {  //写入一个命令到日志里

  2. ......

  3. logFuture := &logFuture{

  4.     log: Log{

  5.         Type: LogCommand,

  6.         Data: cmd,

  7.     },

  8. }

  9. logFuture.init()

  10. select {

  11. ......

  12. case r.applyCh <- logFuture:  //这里可以看到,是把日志信息写到channel里面

  13.     return logFuture //返回一个future结构体,调用者可以根据结构体的里的channel,阻塞等待日志apply的结果

  14. }

  15. }

leader处理的主循环的leaderLoop:

  1. func (r *Raft) leaderLoop() {  //主循环

  2. for r.getState() == Leader {

  3.     select {

  4.     case newLog := <-r.applyCh://主循环从channel中拿到日志请求 收到apply日志请求,applyCh是一个无缓冲的channel。必须处理完了这个请求,才能收到下一个applyCh的内容

  5.         // Group commit, gather all the ready commits

  6.         ready := []*logFuture{newLog}

  7.         for i := 0; i < r.conf.MaxAppendEntries; i++ { //如果是批量apply,则把另外的entry也拿到

  8.             select {

  9.             case newLog := <-r.applyCh:

  10.                 ready = append(ready, newLog)

  11.             ......

  12.             }

  13.         }

  14.         ......

  15.         r.dispatchLogs(ready)  //分发日志,把日志拷贝到其他节点,并且触发一次match操作,查看是否需要触发fsm的更新

  16.         }

  17.     }

  18. }

  19. ......

2.3 follower和candidate的处理

leader发送了AppendEntriesRequest的rpc请求,其他的节点肯定会收到leader发过来的增加日志请求,并进行逻辑处理和回包。前面有提到,所有的raft节点都会有函数r.processRPC(rpc)处理收到其他节点的rpc请求。因此处理AppendEntriesRequest也是在该函数进行分发处理的。

  1. r.processRPC(rpc) -> case *AppendEntriesRequest: -> r.appendEntries(rpc, cmd)

函数appendEntries:

  1. func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {

  2.    resp := &AppendEntriesResponse{ //日志拷贝的回包

  3.        RPCHeader:      r.getRPCHeader(),

  4.        Term:           r.getCurrentTerm(),

  5.        LastLog:        r.getLastIndex(), //当前Follower拥有的logIndex

  6.        Success:        false,

  7.        NoRetryBackoff: false,

  8.    }

  9.    var rpcErr error

  10.    defer func() {

  11.        rpc.Respond(resp, rpcErr)   //退出的时候,给其他节点发送一个回包,当前log已经append成功与否

  12.    }()


  13.    /**

  14.     * 如果同步请求的任期号比Follower的任期号还小,那就直接拒绝,并带上自己的任期号,以便Leader进行跟上时代(更新自己的任期号)。

  15.     */

  16.    if a.Term < r.getCurrentTerm() {

  17.        return

  18.    }


  19.    //如果leader的term比自己的大或者candidate收到一个appendEntry请求,立即变成Follower并刷新本地的term

  20.    if a.Term > r.getCurrentTerm() || r.getState() != Follower {  

  21.        r.setState(Follower)

  22.        r.setCurrentTerm(a.Term)

  23.        resp.Term = a.Term

  24.    }


  25.    // Save the current leader

  26.    r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader)))

  27.    /**

  28.     * 检查prevLogIndex与prevLogTerm是否在当前的follower中

  29.     * prevLogIndex: 新的日志条目紧随之前的索引值

  30.     * prevLogTerm: revLogIndex 条目的任期号

  31.     * 如果在Follower中找不到对应的prevlogIndex,那么会返回日志拷贝失败。

  32.     * leader端收到后,根据resp回包带的lastindex,算出应该给Follower拷贝的日志索引从哪个index开始

  33.     */

  34.    if a.PrevLogEntry > 0 {

  35.        lastIdx, lastTerm := r.getLastEntry()


  36.        var prevLogTerm uint64

  37.        if a.PrevLogEntry == lastIdx {

  38.            prevLogTerm = lastTerm


  39.        } else {

  40.            var prevLog Log

  41.            if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {

  42.                ......

  43.                return

  44.            }

  45.            prevLogTerm = prevLog.Term

  46.        }


  47.        if a.PrevLogTerm != prevLogTerm {

  48.            ......

  49.            return

  50.        }

  51.    }


  52.    .......

  53.    /**

  54.     * 上面对比完日志项,找到Follower与leader匹配的日之后,follower还要有以下的操作步骤

  55.     * _1) 检查leader的日志与follower的日志的冲突部分,把Follower冲突部分删除掉,日志项以leader为准。

  56.     * 具体的方式,把Follower_lastIndex到leader_newLogEntryFisrtIndex(leader新日志的第一个index)这两个范围的index的日志从Follower中删除掉

  57.     * _2) 通过r.logs.StoreLogs(newEntries)更新本地日志

  58.     * _3) r.processLogs(idx, nil)检查当前leader的commitIndex是否已经更新,如果是,则更新Follower的fsm

  59.     */


  60.    //上面的动作完成之后,给leader回包

  61.    resp.Success = true    //

  62.    r.setLastContact() //更新与leader最后一次通信的时间

  63.    return

  64. }

2.4 leader收到Follower回包,更新fsm

日志条目在leader先写入磁盘,然后通过发送AppendEntriesRequest更新到集群中的Follower节点中,然后在leaderloop监听commitCh查看日志项是否已经提交成功。那日志从leader拷贝到接受Follower回包,是如何发起一个commitCh的操作的呢?下面分析一下leader收到Follower回包的过程。

leader发起拷贝是由主协程发起的,由主循环监听applyCh触发一个增加日志的动作,然后再次触发拷贝的动作。而日志拷贝对于每个Follower而言,leader端是并发进行的,不是由主协程直接与Follower进行通信。而是asyncNotifyCh(f.triggerCh)通知拷贝日志的协程开始拷贝日志的动作。leader会为每个Follower都建立一个goroutine拷贝日志,主要的处理函数是func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool)

  1. //replicateTo函数中,收到Follower的回包,如果是Follower拷贝成功,调用updateLastAppended更新Follower已经拷贝的日志项

  2.    if resp.Success {

  3.        // Update our replication state

  4.        updateLastAppended(s, &req)


  5.        // Clear any failures, allow pipelining

  6.        s.failures = 0

  7.        s.allowPipeline = true

  8.    }

  1. func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {

  2.    // Mark any inflight logs as committed

  3.    if logs := req.Entries; len(logs) > 0 {

  4.        last := logs[len(logs)-1]

  5.        s.nextIndex = last.Index + 1

  6.        //match函数前面提到,会计算每个节点已经拷贝的日志项

  7.        //然后计算大多数达到的最大index,通过recalculate发起一个commitCh操作,触发leader的fsm更新

  8.        s.commitment.match(s.peer.ID, last.Index)  

  9.    }

  10.    ......

  11. }

leader节点在主协程中,监听到commitCh,会做一下动作

  1. //收到一个commit完成的请求,准备应用日志,更新到状态机,产生函数func (c *commitment) recalculate()

  2.    case <-r.leaderState.commitCh:  

  3.        commitIndex := r.leaderState.commitment.getCommitIndex() //已知的最大的已经被提交的日志条目的索引值

  4.        r.setCommitIndex(commitIndex)

  5.        ......

  6.        for {

  7.            //inflight可以认为这个链表保存了写到磁盘但是没有应用到fsm的log

  8.            //

  9.            e := r.leaderState.inflight.Front() //这是一个链表,包含了log命令。把log加入链表的函数dispatchLogs

  10.            commitLog := e.Value.(*logFuture)

  11.            idx := commitLog.log.Index

  12.            if idx > commitIndex {

  13.                break

  14.            }

  15.            // Measure the commit time

  16.            metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)

  17.            r.processLogs(idx, commitLog) //准备更新本地状态机 //这里会通过commitLog反馈给调用者

  18.            r.leaderState.inflight.Remove(e)  //应用这条log到状态机了,就把log从链表中删除

  19.        }

  20.        ......

至此,一个日志下发,拷贝至Follower,leader统计response,commit状态发起,应用到fsm,全部过程已经完成。下面用以一个简单的流程图描述一下。

2.5 leader定期心跳包

raft中规定,leader需要定期向follower发送心跳包,保证自己的任期的权威性。raft库中心跳包和日志拷贝发送包的是一样的,都是AppendEntriesRequest。区别是: 1) 心跳包不包含任何日志项Entries,日志条数为0 2) follower中有一个专门处理心跳包heartbeat的函数,当收到心跳包的时候,会直接进行快速回复。follower中心跳包处理的协程和日志复制拷贝的协程不是同一个。(目的:当follower阻塞在日志拷贝的IO中时,follower依然可以收到并且处理leader的心跳包) 3) 心跳包中不包含LeaderCommitIndex(leader已经提交的日志index),这里可以优化一下,心跳包也发送CommitIndex,加快apply日志到follower中

2.6 总结

本文重点介绍了consul raft 算法的日志复制流程及核心点,下次将介绍集群中节点的加入移除原理。


文章转载自云中间件技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论