waitgroup 组织管理一组任务协程运行,同步跟踪它们的结束,其它的逻辑它一概不管。此时若是需要这样一个功能:一个任务的任何一个子任务失败时,应该尽早结束其它任务,防止无效的工作。那么此时就需要额外的编码(chan 通信,context等机制),但是我们不必亲自做,该功能有 errgroup 实现了。
管理一个任务的一组子任务,每个子任务一个协程
子任务必须保证都成功,一个出现失败应当立马停止所有子任务 想知道子任务失败的原因
2.1、Group 数据结构
type Group struct {// cancel 就是 context 返 回 的// cancel 函 数cancel func()// wg 管 理 一 组 子 任 务 协 程wg sync.WaitGroup// errOnce 单例模式,保证失败只发// 一次,并且由出现错误的子协程最先// 发送errOnce sync.Once// err 存储子协程的错误// 同步返回后,通过它取得// 失败的原因err error}
2.2、Go 方法:子任务如何启动
Go 包装了 WaitGroup 的 Add 方法的同时启动了任务协程,同时内部采用单例模式,一旦出错,发送 Context 的 Cancel 信号,从而结束 Context 树上的协程。
// 提供了 Go 接口,用于启动子任务协程// f: 子任务, 需要满足接口 func() errorfunc (g *Group) Go(f func() error) {// WaitGroup 添加一个子任务g.wg.Add(1)// 启动子任务协程go func() {// 同步通知,减少 WaitGroup 等待的任务数目defer g.wg.Done()// 错误处理if err := f(); err != nil {// 子任务出现错误,单例启动处理// 错误,错误处理程序包含:// 1. 存储 错误 error// 2. cancel 发送信号,结束 context 上下文g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}}()}
2.3、Wait方法:同步等待任务正常或异常结束
// Wait 调用后,内部调用 WaitGroup 的// Wait 进入同步阻塞等待func (g *Group) Wait() error {// WaitGroup 同步等待g.wg.Wait()// 给所有的 context 关联的子协程发送// cancel 信号if g.cancel != nil {g.cancel()}// 返回协程的错误return g.err}
2.4、WithContext 工厂方法:创建带上下文的Group
// WithContext 接收上下文,一次为上下文创建// 可被取消的 context,同时返回 Group 组func WithContext(ctx context.Context) (*Group, context.Context) {ctx, cancel := context.WithCancel(ctx)// Group 的 cancel 由 context 得到return &Group{cancel: cancel}, ctx}
3、实战
场景:计算一个目录下所有文件的 MD5 值,任何一个文件都需要正确计算
分析:从场景上讲,总任务目标是计算目录下所有文件md5,所以可以有多个协程去分别计算一部分文件的md5,可以用 WaitGroup 管理,但是要求所有文件结果计算准确,所以要用 ErrGroup
设计:

G-pro:负责目录遍历协程,分发文件路径到 chan G-w*:负责消费 G-pro 结果的协程,并计算 MD5,并将结果发送到结果 chan
G-result:结果收集协程,读取计算结果并存储
G-wait:同步等待携程组技术的协程,它调用 wait 等待
代码:
func main() {// 计算当前路径下文件的 md5 值m, err := MD5All(context.Background(), ".")if err != nil {log.Fatal(err)}for k, sum := range m {fmt.Printf("%s:\t%x\n", k, sum)}}// 结果集type result struct {path stringsum [md5.Size]byte}// 计算所有文件的 md5func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {// 创建含有上下文信息的 Groupg, ctx := errgroup.WithContext(ctx)// 用来发送读到的文件路径paths := make(chan string)// G-pro 协程,遍历目录g.Go(func() error {// 发送方只有一个,发送负责关闭 chandefer close(paths)// 遍历目录return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}// select 监听取消信号// 出错时,其它协程会发送// cancel 信号,此时监听// ctx.Done,便于结束select {case paths <- path:// 被取消case <-ctx.Done():return ctx.Err()}return nil})})// 结果通道c := make(chan result)const numDigesters = 20// G-w* 子任务组协程,计算一部分文件的 md5for i := 0; i < numDigesters; i++ {g.Go(func() error {// 监听读到的文件路径for path := range paths {// 读取文件内容data, err := ioutil.ReadFile(path)if err != nil {return err}// 计算 md5 并发送计算结果select {case c <- result{path, md5.Sum(data)}:// 同样等待出错结束任务的通知case <-ctx.Done():return ctx.Err()}}return nil})}// G-wait 协程,等待 errgroup 的返回go func() {g.Wait()close(c)}()// G-result 协程,汇总存储结果m := make(map[string][md5.Size]byte)for r := range c {m[r.path] = r.sum}// Check whether any of the goroutines failed. Since g is accumulating the}
4、总结
场景:总任务分多个子任务,要保证所有子任务全部正确
原理:WaitGroup 组织子任务,Context 保证所有子任务能够监听 Done 信号结束,Once 单例模式保证只有第一个出错协程有资格发送 Context 的 Cancel 信号,最后 Wait 返回时,从全局状态变量 err 读到出错原因
文章转载自老码农空杯修行记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




