暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

errgroup 剖析

老码农空杯修行记 2021-05-15
1261

waitgroup 组织管理一组任务协程运行,同步跟踪它们的结束,其它的逻辑它一概不管。此时若是需要这样一个功能:一个任务的任何一个子任务失败时,应该尽早结束其它任务,防止无效的工作。那么此时就需要额外的编码(chan 通信,context等机制),但是我们不必亲自做,该功能有 errgroup 实现了。

1、errgroup 场景
  • 管理一个任务的一组子任务,每个子任务一个协程

  • 子任务必须保证都成功,一个出现失败应当立马停止所有子任务
  • 想知道子任务失败的原因
2、errgroup 源码分析

2.1、Group 数据结构

    type Group struct {
      // cancel  就是 context 返 回 的
      // cancel  函 数
    cancel func()


      // wg 管 理 一 组 子 任 务 协 程
    wg sync.WaitGroup


      // errOnce 单例模式,保证失败只发
      // 一次,并且由出现错误的子协程最先
      // 发送
    errOnce sync.Once

      // err 存储子协程的错误
      // 同步返回后,通过它取得
      // 失败的原因
    err error
    }

    2.2、Go 方法:子任务如何启动

    Go 包装了 WaitGroup 的 Add 方法的同时启动了任务协程,同时内部采用单例模式,一旦出错,发送 Context 的 Cancel 信号,从而结束 Context 树上的协程。

      // 提供了 Go 接口,用于启动子任务协程
      // f: 子任务, 需要满足接口 func() error
      func (g *Group) Go(f func() error) {
        // WaitGroup 添加一个子任务
      g.wg.Add(1)


      // 启动子任务协程
      go func() {
          // 同步通知,减少 WaitGroup 等待的任务数目
      defer g.wg.Done()


          // 错误处理
      if err := f(); err != nil {
      // 子任务出现错误,单例启动处理
            // 错误,错误处理程序包含:
            // 1. 存储 错误 error
            // 2. cancel 发送信号,结束 context 上下文
      g.errOnce.Do(func() {
      g.err = err
      if g.cancel != nil {
      g.cancel()
      }
      })
      }
      }()
      }

      2.3、Wait方法:同步等待任务正常或异常结束

      Wait 包装了 WaitGroup 的 Wait,同时返回字协程出错的原因
        // Wait 调用后,内部调用 WaitGroup 的
        // Wait 进入同步阻塞等待
        func (g *Group) Wait() error {
          // WaitGroup 同步等待
        g.wg.Wait()

          // 给所有的 context 关联的子协程发送
          // cancel 信号
        if g.cancel != nil {
        g.cancel()
        }

        // 返回协程的错误
        return g.err
        }

        2.4、WithContext 工厂方法:创建带上下文的Group

          // WithContext 接收上下文,一次为上下文创建
          // 可被取消的 context,同时返回 Group 组
          func WithContext(ctx context.Context) (*Group, context.Context) {
          ctx, cancel := context.WithCancel(ctx)
            // Group 的 cancel 由 context 得到
          return &Group{cancel: cancel}, ctx
          }

          3、实战

          • 场景:计算一个目录下所有文件的 MD5 值,任何一个文件都需要正确计算

          • 分析:从场景上讲,总任务目标是计算目录下所有文件md5,所以可以有多个协程去分别计算一部分文件的md5,可以用 WaitGroup 管理,但是要求所有文件结果计算准确,所以要用 ErrGroup

          • 设计
          • G-pro:负责目录遍历协程,分发文件路径到 chan
          • G-w*:负责消费 G-pro 结果的协程,并计算 MD5,并将结果发送到结果 chan

          • G-result:结果收集协程,读取计算结果并存储

          • G-wait:同步等待携程组技术的协程,它调用 wait 等待

          • 代码:

            func main() {
              // 计算当前路径下文件的 md5 值
            m, err := MD5All(context.Background(), ".")
            if err != nil {
            log.Fatal(err)
            }


            for k, sum := range m {
            fmt.Printf("%s:\t%x\n", k, sum)
            }
            }


            // 结果集
            type result struct {
            path string
            sum [md5.Size]byte
            }


            // 计算所有文件的 md5 
            func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {


            // 创建含有上下文信息的 Group
            g, ctx := errgroup.WithContext(ctx)


              // 用来发送读到的文件路径
            paths := make(chan string)


            // G-pro 协程,遍历目录
            g.Go(func() error {
                // 发送方只有一个,发送负责关闭 chan
            defer close(paths)


                // 遍历目录
            return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
            return err
            }


            if !info.Mode().IsRegular() {
            return nil
            }


            // select 监听取消信号
            // 出错时,其它协程会发送
                  // cancel 信号,此时监听
                  // ctx.Done,便于结束
            select {
            case paths <- path:


            // 被取消
            case <-ctx.Done():
            return ctx.Err()
            }


            return nil
            })
            })


              // 结果通道
            c := make(chan result)


            const numDigesters = 20


              // G-w* 子任务组协程,计算一部分文件的 md5
            for i := 0; i < numDigesters; i++ {
            g.Go(func() error {


                  // 监听读到的文件路径
            for path := range paths {


            // 读取文件内容
            data, err := ioutil.ReadFile(path)
            if err != nil {
            return err
            }


            // 计算 md5 并发送计算结果
            select {
            case c <- result{path, md5.Sum(data)}:


                    // 同样等待出错结束任务的通知
            case <-ctx.Done():
            return ctx.Err()
            }
            }
            return nil
            })
            }


            // G-wait 协程,等待 errgroup 的返回
            go func() {
            g.Wait()
            close(c)
            }()


              // G-result 协程,汇总存储结果
            m := make(map[string][md5.Size]byte)
            for r := range c {
            m[r.path] = r.sum
            }
            // Check whether any of the goroutines failed. Since g is accumulating the
            }

            4、总结

            • 场景:总任务分多个子任务,要保证所有子任务全部正确

            • 原理:WaitGroup 组织子任务,Context 保证所有子任务能够监听 Done 信号结束,Once 单例模式保证只有第一个出错协程有资格发送 Context 的 Cancel 信号,最后 Wait 返回时,从全局状态变量 err 读到出错原因
            文章转载自老码农空杯修行记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论