暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Golang 每周一库:oklog/run

小张的科学日记 2020-03-29
2092

 假设我们有四个Go routine组件,分别是运行一个状态机、一个http服务器、定时任务读取状态机状态、和一个运行信号监听器,每个组件相互独立运行,问题在于我们如何将各个组件做为一个整体运行,并有序结束?


oklog/run个通用的管理协程生命周期的库,它比较简单,核心代码大概只有60多行,相似功能的有WaitGroup和errgroup,都是可以管理多个协程生命周期的工具。


01

运行中Go routine的关闭



在go语言中,大家都很熟悉如何使用channel来shutdown一个正在运行中的协程,本质就是向运行中的goroutine传递关闭的消息,无论是使用共享内存还是channel都可以传递,这块的区别更像csp和actor的区别,一个关注点在处理者上,另一个在消息上。


      // 通过cancel传递消息来shutdown协程
      cancel := make(chan struct{})
    go func() {
    select {
    case <-cancel:
    break
    }
    return
    }()



    02


    控制权反转


    oklog/run 使用来名为actor的结构体,actor包含主要业务逻辑的执行函数以及相对应的一个shutdown函数,多个actor为一个group,每个actor提供自己的运行和关闭接口(就是两个函数),并将控制的权力交给group,来实现多个协程的统一管理。

        // oklog/run的使用
      interrupt := errors.New("interrupt")
        var g run.Group
      g.Add(func() error { return interrupt }, func(error) {})
      cancel := make(chan struct{})
      g.Add(func() error { <-cancel; return nil }, func(error) { close(cancel) })
      res := make(chan error)
      go func() { res <- g.Run() }()
      select {
      case err := <-res:
      if want, have := interrupt, err; want != have {
      t.Errorf("want %v, have %v", want, have)
      }
      case <-time.After(100 * time.Millisecond):
      t.Errorf("timeout")
      }



      03


      源码及注释


        package run


        // 一个actor的集合
        type Group struct {
        actors []actor
        }


        // 向集合中添加actor
        func (g *Group) Add(execute func() error, interrupt func(error)) {
        g.actors = append(g.actors, actor{execute, interrupt})
        }


        // 并发运行集合中所有的actor
        func (g *Group) Run() error {
        if len(g.actors) == 0 {
        return nil
        }


        errors := make(chan error, len(g.actors))
        for _, a := range g.actors {
        go func(a actor) {
        errors <- a.execute()
        }(a)
        }


        // 等待actor的报错
        err := <-errors


          // 一旦任意actor报错,使用每个actor的关闭接口,关闭所有的actor
        for _, a := range g.actors {
        a.interrupt(err)
        }


        // 等待所有的函数返回再交出控制权
        for i := 1; i < cap(errors); i++ {
        <-errors
        }


        // 返回初识错误
        return err
        }




        // actor,为协程提供了关闭接口
        type actor struct {
        execute func() error
        interrupt func(error)
        }




        04


        扩展

        现在的run包是按照add的顺序依次关闭,能否完成更复杂的顺序控制,以及能否实现更复杂的执行逻辑控制



        文章转载自小张的科学日记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论