“ 假设我们有四个Go routine组件,分别是运行一个状态机、一个http服务器、定时任务读取状态机状态、和一个运行信号监听器,每个组件相互独立运行,问题在于我们如何将各个组件做为一个整体运行,并有序结束?”
oklog/run是一个通用的管理协程生命周期的库,它比较简单,核心代码大概只有60多行,相似功能的有WaitGroup和errgroup,都是可以管理多个协程生命周期的工具。
01
—
运行中Go routine的关闭
在go语言中,大家都很熟悉如何使用channel来shutdown一个正在运行中的协程,本质就是向运行中的goroutine传递关闭的消息,无论是使用共享内存还是channel都可以传递,这块的区别更像csp和actor的区别,一个关注点在处理者上,另一个在消息上。
// 通过cancel传递消息来shutdown协程cancel := make(chan struct{})go func() {select {case <-cancel:break}return}()
02
—
控制权反转
oklog/run 使用来名为actor的结构体,actor包含主要业务逻辑的执行函数以及相对应的一个shutdown函数,多个actor为一个group,每个actor提供自己的运行和关闭接口(就是两个函数),并将控制的权力交给group,来实现多个协程的统一管理。
// oklog/run的使用interrupt := errors.New("interrupt")var g run.Groupg.Add(func() error { return interrupt }, func(error) {})cancel := make(chan struct{})g.Add(func() error { <-cancel; return nil }, func(error) { close(cancel) })res := make(chan error)go func() { res <- g.Run() }()select {case err := <-res:if want, have := interrupt, err; want != have {t.Errorf("want %v, have %v", want, have)}case <-time.After(100 * time.Millisecond):t.Errorf("timeout")}
03
—
源码及注释
package run// 一个actor的集合type Group struct {actors []actor}// 向集合中添加actorfunc (g *Group) Add(execute func() error, interrupt func(error)) {g.actors = append(g.actors, actor{execute, interrupt})}// 并发运行集合中所有的actorfunc (g *Group) Run() error {if len(g.actors) == 0 {return nil}errors := make(chan error, len(g.actors))for _, a := range g.actors {go func(a actor) {errors <- a.execute()}(a)}// 等待actor的报错err := <-errors// 一旦任意actor报错,使用每个actor的关闭接口,关闭所有的actorfor _, a := range g.actors {a.interrupt(err)}// 等待所有的函数返回再交出控制权for i := 1; i < cap(errors); i++ {<-errors}// 返回初识错误return err}// actor,为协程提供了关闭接口type actor struct {execute func() errorinterrupt func(error)}
04
—
扩展
现在的run包是按照add的顺序依次关闭,能否完成更复杂的顺序控制,以及能否实现更复杂的执行逻辑控制
文章转载自小张的科学日记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




