暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

WaitGroup 剖析

老码农空杯修行记 2021-05-15
343

1、源码分析

1.1、WaitGroup 结构

    type WaitGroup struct {
      // 嵌入 noCopy 表示该结构不能被复制
      // 不允许通过复制产生一个新的 WaitGroup
    noCopy noCopy


      // 记录调用 Add() 后累加的数目,记作 Counter
      // 记录调用 Wait() 的协程数目,记作 Waiter
      // 记录信号量,用来同步 Add 和 Wait,记作 state
    state1 [3]uint32
    }
    WaitGroup 数据结构本身简单,关键一点,就是不能复制生成新的 WaitGroup,另外state1 的结构因硬件而异,具体如下:
    • Counter:调用 wg.Add 或 wg.Done 增加或减少它,代表人物协程数

    • Waiter:调用 wg.Wait 的协程数目,表示有多少个协程在等待该 WaitGroup 的结束
    • state:存储信号量,同步 Add/Done 和 Wait,本质来讲就是信号量
    1.2、Done 方法

    Done 方法很简单,内部调用了 Add(-1),它是在那些归 WaitGroup 的任务协程结束任务后调用的方法。

      func (wg *WaitGroup) Done() {
      比较简单,内部调用了 Add
      wg.Add(-1)
      }
      1.3、Add 方法
      Add 方法调用后增加 counter 计数,这些计数代表 WaitGroup 将来要管理执行的任务数,但是有一点要注意 counter ≥ 0,不允许出现 counter < 0。当所有的任务数目的协程结束后,最后一个调用 Done() (Done 内部调用 Add) 的协程会通知所有阻塞在 Wait() 上的协程解除阻塞,而通知的本质就是让 Wait 协程有足够多的信号量去消费。

      需要注意的两点:

      • 注意 counter ≥ 0,不允许出现 counter < 0

      • 注意  counter = 0,water > 0,也就是任务协程刚结束时刻,不能有其它协程并发调用 Add

        func (wg *WaitGroup) Add(delta int) {
          // 解析 state1 结构
          // statep: 高32位为 counter 数量
          // 低32位为 waiter 数量
          // semap: 信号,用来同步
        statep, semap := wg.state()
          
          // 由于高 32 位是counter数量,所以加之前左移32位
        state := atomic.AddUint64(statep, uint64(delta)<<32)


          // v: counter 数量
        v := int32(state >> 32)


        w: 调用 Wait() 的协程数量
        w := uint32(state)


          // 不允许 counter 的数量为负数
        if v < 0 {
        panic("sync: negative WaitGroup counter")
        }


        不允许并发调用 Add 和 wait
        if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }


          // 调用 Add 发生在调用 Wait 之前
          // 因此正常返回
        if v > 0 || w == 0 {
        return
        }


          // 到此,说明 v=0, w>0, 此时不能并发的修改状态
          // 1. Add 绝对不能和 Wait 并发发生
          // 2. Wait 在 counter=0 的时候,wait 不能增加waiter的数量
        if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }


          // v=0, w>0,此时说明所有 goroutine 结束了
          // 所有 goroutine 结束后,给 w 发送信号,便于
          // 唤醒因调用 Wait 而阻塞的协程
        *statep = 0


          // 有多少个 Wait 等待,就给出多少个信号量
          // 为的就是喂饱所有的 Wait 保证这些阻塞的
          // Wait 协程能够正常解除
        for ; w != 0; w-- {
            // 增加 sema 资源,每调用一次 +1
        runtime_Semrelease(semap, false, 0)
        }
        }

        1.4、Wait 方法

        正常调用 Wait 的协程会被阻塞,阻塞是由于信号量不足引起的,一旦最后一个调用 Done(内部调用add)方法的协程增加了信号量资源,则阻塞解除,需要注意的是:

        • 未调用 Add,直接调用 Wait,由于没有任务协程,所以不会阻塞

        • Wait 从阻塞解除时刻,应当是 waiter 和 counter 的记录值为0,不为0,说明在此 WaitGroup 上并发调用了 Wait() 或 Add,就是说上一个Wait 结束时刻不能有其它协程并发操作该 WaitGroup

          // Wait blocks until the WaitGroup counter is zero.
          func (wg *WaitGroup) Wait() {
          // statep: 计数器,高32位,协程任务数目; 低32位 wait 数目
          statep, semap := wg.state()


          for {
              // 读取当前 wainter 和 counter 的值
          state := atomic.LoadUint64(statep)


          // v 计数器,也就是协程数量
          v := int32(state >> 32)


          // w 调用了Wait() 的协程数量
          w := uint32(state)


              // counter 为 0,说明没有 Add 的情况下直接调用了
              // Wait, 此时直接返回,不需要阻塞等待
          if v == 0 {       
          return
          }


          // 原子操作,增加 wait() 数量
          if atomic.CompareAndSwapUint64(statep, state, state+1) {
                // 阻塞等待信号量,当观察到 semap > 0
                // 是立马对 semap-1,然后返回
          runtime_Semacquire(semap)


                // 解除阻塞    
                // wait 返回之后,如果发现 counter 数不为 0,说明wg
                // 被重用了,就是说 Wait 刚从阻塞中唤醒,唤醒是因为
                // 之前 Add 过的协程都 Done了,结束了,此时 counter应当
                // 为 0,但是此刻又有其它协程在此 wg 上调用了 Add
          if *statep != 0 {
          panic("sync: WaitGroup is reused before previous Wait has returned")
                }
          return
          }
          }
          }
          2、总结
          • WaitGroup 作用

            管理一组协程,同步等待他们结束
          • WaitGroup 同步原理

            本质来讲就是信号量,初始化时信号量资源是0,所以在有任务协程运行的情况下,调用Wait会则塞协程等待信号量资源大于0。最后一个结束的任务协程会根据当前等待Wait协程数量增加信号量,从而便于Wait中的协程消费,从而解除阻塞。

          • 注意几点
          • WaitGroup 计数值 counter 必须大于等于0;

          • 上一组 Wait 结束时刻,不能并发调用 Add;
          • 对于没有Add 的WaitGroup进行 Wait不会阻塞;
          • 对于没有 Add 的WaitGroup,直接Done将异常
          • 与Errgroup的异同:
          • 它们都能管理一组协程;
          • WaitGroup 管理的目的纯粹为了同步跟踪结束,至于是否逻辑运行正常它不关心;ErrGroup 本质来讲管理的协程有逻辑关系,它们是一个总任务的任务分支,任何一个分支失败都将结束所有协程运行,为的就是保证任务正常完成;
          文章转载自老码农空杯修行记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论