暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

uber-go/fx 源码分析

        https://github.com/uber-go/fx是一个依赖注入框架,它是基于依赖注入库dig实现的(参考:dig 源码分析它简化了依赖注入,消除了全局状态的维护和 func init()。首先看下如何使用:

最简单的方式:

    fx.New().Run()

    或者

          fx.New(
      fx.Provide(http.NewServeMux),
      ).Run()

      或者

        func main(){
        var module1 = fx.Options(
        fx.Provide(NewDb),
        )
        module := fx.Module("main",
            fx.Provide(
        NewService,
        NewServer,
        ),
        )


        fx.New(
            fx.Provide(http.NewServeMux),
        log.Module,
        fx.Invoke(registerHooks),
        module1,
        module,
        ).Run()
        }


        func registerHooks(
        lifecycle fx.Lifecycle, mux *http.ServeMux,
        ) {
        lifecycle.Append(
        fx.Hook{
        OnStart: func(ctx context.Context) error {
        go http.ListenAndServe(":8080", mux)
        return nil
        },
        },
        )
        }

        可以看到,它将Provide和Invoke 都抽象成options,作为New的参数,来进行容器的初始化,然后通过Run开启应用。在Invoke方法中可以指定程序运行钱的Hook OnSatrt和程序结束前的Hook OnStop。下面通过源码进行分析:

          func New(opts ...Option) *App {}
            app.root = &module{app: app}
          app.modules = append(app.modules, app.root)
            for _, opt := range opts {
          opt.apply(app.root)
          }
          app.lifecycle = &lifecycleWrapper{
          lifecycle.New(appLogger{app}, app.clock),
          }
          app.container = dig.New(
          dig.DeferAcyclicVerification(),
          dig.DryRun(app.validate),
          )
          for _, m := range app.modules {
          m.build(app, app.container)
          }
          for _, m := range app.modules {
          m.provideAll()
          }
          app.root.provide(provide{
          Target: func() Lifecycle { return app.lifecycle },
          Stack: frames,
          })
          app.root.provide(provide{Target: app.shutdowner, Stack: frames})
            app.root.provide(provide{Target: app.dotGraph, Stack: frames})
            err := app.root.executeInvokes(); err != nil 
            dig.Visualize(app.container, &b, dig.VisualizeError(err))
          }

          首先看下New的参数

            type Option interface {
              fmt.Stringer
            apply(*module)
            }

            它通过 加载配置的方式来完成依赖注入的Provide和Invoke,这两个方法只需要实现了apply接口就行。

              func Provide(constructors ...interface{}) Option {
              return provideOption{
              Targets: constructors,
              Stack: fxreflect.CallerStack(1, 0),
              }
              }
                type provideOption struct {
                Targets []interface{}
                Stack fxreflect.Stack
                }
                   func (o provideOption) apply(mod *module){
                     for _, target := range o.Targets
                  {
                  mod.provides = append(mod.provides, provide{
                  Target: target,
                  Stack: o.Stack,
                  })
                  }

                    将对应的注入函数append到module的provides里面。其中module的定义如下:

                    type module struct {
                    parent *module
                    name string
                    scope *dig.Scope
                    provides []provide
                    invokes []invoke
                    decorators []decorator
                    modules []*module
                    app *App
                    }

                    对应的Invoke,定义如下:

                      func Invoke(funcs ...interface{}) Option {
                      return invokeOption{
                      Targets: funcs,
                      Stack: fxreflect.CallerStack(1, 0),
                      }
                      }
                        type invokeOption struct {
                        Targets []interface{}
                        Stack fxreflect.Stack
                        }
                          func (o invokeOption) apply(mod *module) {
                          for _, target := range o.Targets {
                          mod.invokes = append(mod.invokes, invoke{
                          Target: target,
                          Stack: o.Stack,
                          })
                          }
                          }

                          同样也是append 到module的invokes里面。

                                      解释完入参,我们回过头来看看,App结构的定义。

                            type App struct {
                            err error
                            clock fxclock.Clock
                            lifecycle *lifecycleWrapper


                            container *dig.Container
                            root *module
                            modules []*module


                            // Used to setup logging within fx.
                            log fxevent.Logger
                            logConstructor *provide set only if fx.WithLogger was used
                            // Timeouts used
                            startTimeout time.Duration
                            stopTimeout time.Duration
                            // Decides how we react to errors when building the graph.
                            errorHooks []ErrorHandler
                            validate bool
                            // Used to signal shutdowns.
                            donesMu sync.Mutex guards dones and shutdownSig
                            dones []chan os.Signal
                            shutdownSig os.Signal


                            osExit func(code int) os.Exit override; used for testing only
                            }

                            可以看到root属性是module类型的指针,modules 是module类型的指针数组,回过头来看构造函数New,它调用option的apply方法的时候传入的正是root属性。可见依赖注入需要的所有的数据都绑定到app的root属性上了。

                                    接着初始化了lifecycle 属性

                              type lifecycleWrapper struct{ *lifecycle.Lifecycle }
                                func (l *lifecycleWrapper) Append(h Hook) {
                                l.Lifecycle.Append(lifecycle.Hook{
                                OnStart: h.OnStart,
                                OnStop: h.OnStop,
                                })
                                }

                                然后通过dig.New初始化了依赖注入的容器,也就是container属性。接着对所有的modules执行了build和provideAll()方法,注意这里的modules绑定了我们前文提到的依赖注入的provider和invoke。然后给app.root绑定了三个全局的provide方法Lifecycle,shutdowner,dotGraph。最后执行了 app.root.executeInvokes(),如果执行出错回生成dot格式的调用图,方便可视化查看。其实是调用了dig的dig.Visualize

                                  func (m *module) build(app *App, root *dig.Container) 
                                  if m.parent == nil {
                                  m.scope = root.Scope(m.name)
                                  // TODO: Once fx.Decorate is in-place,
                                  // use the root container instead of subscope.
                                  } else {
                                  parentScope := m.parent.scope
                                  m.scope = parentScope.Scope(m.name)
                                  }
                                  for _, mod := range m.modules {
                                  mod.build(app, root)
                                    }

                                  主要是做了scope等属性的赋值操作,对于子孙module进行递归调用build方法。

                                    func (m *module) provideAll()
                                    for _, p := range m.provides {
                                    m.provide(p)
                                    }
                                    for _, m := range m.modules {
                                    m.provideAll()
                                      }

                                    provideAll也是同样进行了递归操作,调用了provide方法

                                      func (m *module) provide(p provide)
                                      if err := runProvide(m.scope, p, dig.FillProvideInfo(&info), dig.Export(true)); err != nil
                                        func runProvide(c container, p provide, opts ...dig.ProvideOption) error 
                                        constructor := p.Target
                                        case annotated
                                        ctor, err := constructor.Build()
                                        if err := c.Provide(ctor, opts...); err != nil {
                                        case Annotated:
                                        if err := c.Provide(ann.Target, opts...); err != nil {
                                          default:
                                            c.Provide(constructor, opts...)

                                                    最终调用了容器的Provide方法,也就是dig的Provide流程,实现了依赖的构造函数的注入。

                                                接着看下executeInvokes

                                          func (m *module) executeInvokes() error {
                                          for _, invoke := range m.invokes {
                                          if err := m.executeInvoke(invoke); err != nil {
                                          return err
                                          }
                                          }


                                          for _, m := range m.modules {
                                          if err := m.executeInvokes(); err != nil {
                                          return err
                                          }
                                          }
                                          return nil
                                          }

                                          它同样是对孩子节点实现了递归调用。自己调用了executeInvoke方法

                                            func (m *moduleexecuteInvoke(i invoke) (err error) {
                                            fnName := fxreflect.FuncName(i.Target)
                                            m.app.log.LogEvent(&fxevent.Invoking{
                                            FunctionName: fnName,
                                            ModuleName: m.name,
                                            })
                                            err = runInvoke(m.scope, i)
                                              func runInvoke(c container, i invoke) error {
                                              fn := i.Target
                                              switch fn := fn.(type) {
                                               case Option:
                                               case annotated:
                                              af, err := fn.Build()
                                              if err != nil {
                                              return err
                                              }


                                              return c.Invoke(af)
                                              default:
                                              return c.Invoke(fn)

                                              最终调用了容器的Invoke方法,也就是dig的Invoke方法。至此完成了App的初始化流程,接着开始运行App,也就是调用Run接口。

                                                func (app *App) Run() {}
                                                if code := app.run(app.Done()); code != 0 {
                                                app.exit(code)
                                                }
                                                  func (app *App) run(done <-chan os.Signal) (exitCode int)
                                                  err := app.Start(startCtx)
                                                  sig := <-done
                                                  err := app.Stop(stopCtx);

                                                  内部调用了Start和Stop两个接口,chan 收到done的信号后开始执行Stop

                                                    func (app *App) Done() <-chan os.Signal
                                                    signal.Notify(c, os.Interrupt, _sigINT, _sigTERM)
                                                      func (app *App) Start(ctx context.Context) (err error)
                                                      withTimeout(ctx, &withTimeoutParams{
                                                      hook: _onStartHook,
                                                      callback: app.start,
                                                      lifecycle: app.lifecycle,
                                                      log: app.log,
                                                      })
                                                        func (app *App) Stop(ctx context.Context) (err error) 
                                                        withTimeout(ctx, &withTimeoutParams{
                                                        hook: _onStopHook,
                                                        callback: app.lifecycle.Stop,
                                                        lifecycle: app.lifecycle,
                                                        log: app.log,
                                                        })

                                                        默认运行两个函数执行15s

                                                          func withTimeout(ctx context.Context, param *withTimeoutParams) error
                                                          c <- param.callback(ctx)
                                                          控制超时时间,令一个协程里调用
                                                          if param.hook == _onStartHook {
                                                          r = param.lifecycle.startHookRecords()
                                                          } else {
                                                          r = param.lifecycle.stopHookRecords()
                                                          }

                                                          在函数里面执行力callback方法,对于start来说就是

                                                            func (app *App) start(ctx context.Context) error
                                                            err := app.lifecycle.Start(ctx);
                                                            stopErr := app.lifecycle.Stop(ctx)

                                                            对于stop来说就是 app.lifecycle.Stop

                                                            lifecycle的定义如下:

                                                              type Lifecycle struct {
                                                              clock fxclock.Clock
                                                              logger fxevent.Logger
                                                              hooks []Hook
                                                              numStarted int
                                                              startRecords HookRecords
                                                              stopRecords HookRecords
                                                              runningHook Hook
                                                              mu sync.Mutex
                                                              }

                                                              它有一个Append接口,我们可以添加Hook

                                                                  func (l *Lifecycle) Append(hook Hook) {
                                                                // Save the caller's stack frame to report file/line number.
                                                                if f := fxreflect.CallerStack(2, 0); len(f) > 0 {
                                                                hook.callerFrame = f[0]
                                                                }
                                                                l.hooks = append(l.hooks, hook)
                                                                }

                                                                其中通过函数 frames := runtime.CallersFrames(pcs)来获取当前的栈帧

                                                                    func (l *Lifecycle) Start(ctx context.Context) error {
                                                                  for _, hook := range l.hooks
                                                                  {
                                                                  l.runningHook = hook
                                                                  runtime, err := l.runStartHook(ctx, hook)
                                                                  l.startRecords = append(l.startRecords, HookRecord{
                                                                      func (l *Lifecycle) Stop(ctx context.Context) error{
                                                                    hook := l.hooks[l.numStarted-1]
                                                                    runtime, err := l.runStopHook(ctx, hook)

                                                                    相应的函数如下

                                                                        func (l *Lifecycle) runStartHook(ctx context.Context, hook Hook) (runtime time.Duration, err error) 
                                                                      err = hook.OnStart(ctx)
                                                                        func (l *Lifecycle) runStopHook(ctx context.Context, hook Hook) (runtime time.Duration, err error) 
                                                                            err = hook.OnStop(ctx)

                                                                        内部就是运行了我们注册额开始或者结束的hook

                                                                          type Hook struct {
                                                                          OnStart func(context.Context) error
                                                                          OnStop func(context.Context) error
                                                                          }

                                                                          当然也兼容dig的In写法

                                                                            type In = dig.In

                                                                            总结下:fx.New(...)是fx初始化的地方,启动时主要里面包含[]Option,启动后生成根对象app;provide是将多个构造函数放到fx中,支持组合放入,放入之后fx会对每个option展开,然后一一放入fx中,效果和单个放入每个option一样,但是方便了option过多时的管理。fx中还有一个lifecycle的概念,直接隶属于根对象app,用于完成对象启动和停止时的Hook(钩子)操作,主要包含OnStart和OnStop两个方法,用户可以在要初始化的对象中定义自己的OnStart/OnStop方法,然后append到app.lifecycle.Hooks中。它带来一个比较大的便利是我们可以在module里面定义Module变量,在注入的时候直接注入这个Module变量,这个Module是Options的返回值,本质上是Provid的返回值

                                                                              var Module = fx.Options(
                                                                              fx.Provide(ProvideLogger),
                                                                              )

                                                                              文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                              评论