https://github.com/uber-go/fx是一个依赖注入框架,它是基于依赖注入库dig实现的(参考:dig 源码分析)。它简化了依赖注入,消除了全局状态的维护和 func init()。首先看下如何使用:
最简单的方式:
fx.New().Run()
或者
fx.New(fx.Provide(http.NewServeMux),).Run()
或者
func main(){var module1 = fx.Options(fx.Provide(NewDb),)module := fx.Module("main",fx.Provide(NewService,NewServer,),)fx.New(fx.Provide(http.NewServeMux),log.Module,fx.Invoke(registerHooks),module1,module,).Run()}func registerHooks(lifecycle fx.Lifecycle, mux *http.ServeMux,) {lifecycle.Append(fx.Hook{OnStart: func(ctx context.Context) error {go http.ListenAndServe(":8080", mux)return nil},},)}
可以看到,它将Provide和Invoke 都抽象成options,作为New的参数,来进行容器的初始化,然后通过Run开启应用。在Invoke方法中可以指定程序运行钱的Hook OnSatrt和程序结束前的Hook OnStop。下面通过源码进行分析:
func New(opts ...Option) *App {}app.root = &module{app: app}app.modules = append(app.modules, app.root)for _, opt := range opts {opt.apply(app.root)}app.lifecycle = &lifecycleWrapper{lifecycle.New(appLogger{app}, app.clock),}app.container = dig.New(dig.DeferAcyclicVerification(),dig.DryRun(app.validate),)for _, m := range app.modules {m.build(app, app.container)}for _, m := range app.modules {m.provideAll()}app.root.provide(provide{Target: func() Lifecycle { return app.lifecycle },Stack: frames,})app.root.provide(provide{Target: app.shutdowner, Stack: frames})app.root.provide(provide{Target: app.dotGraph, Stack: frames})err := app.root.executeInvokes(); err != nildig.Visualize(app.container, &b, dig.VisualizeError(err))}
首先看下New的参数
type Option interface {fmt.Stringerapply(*module)}
它通过 加载配置的方式来完成依赖注入的Provide和Invoke,这两个方法只需要实现了apply接口就行。
func Provide(constructors ...interface{}) Option {return provideOption{Targets: constructors,Stack: fxreflect.CallerStack(1, 0),}}
type provideOption struct {Targets []interface{}Stack fxreflect.Stack}
func (o provideOption) apply(mod *module){for _, target := range o.Targets{mod.provides = append(mod.provides, provide{Target: target,Stack: o.Stack,})}
将对应的注入函数append到module的provides里面。其中module的定义如下:
type module struct {parent *modulename stringscope *dig.Scopeprovides []provideinvokes []invokedecorators []decoratormodules []*moduleapp *App}
对应的Invoke,定义如下:
func Invoke(funcs ...interface{}) Option {return invokeOption{Targets: funcs,Stack: fxreflect.CallerStack(1, 0),}}
type invokeOption struct {Targets []interface{}Stack fxreflect.Stack}
func (o invokeOption) apply(mod *module) {for _, target := range o.Targets {mod.invokes = append(mod.invokes, invoke{Target: target,Stack: o.Stack,})}}
同样也是append 到module的invokes里面。
解释完入参,我们回过头来看看,App结构的定义。
type App struct {err errorclock fxclock.Clocklifecycle *lifecycleWrappercontainer *dig.Containerroot *modulemodules []*module// Used to setup logging within fx.log fxevent.LoggerlogConstructor *provide set only if fx.WithLogger was used// Timeouts usedstartTimeout time.DurationstopTimeout time.Duration// Decides how we react to errors when building the graph.errorHooks []ErrorHandlervalidate bool// Used to signal shutdowns.donesMu sync.Mutex guards dones and shutdownSigdones []chan os.SignalshutdownSig os.SignalosExit func(code int) os.Exit override; used for testing only}
可以看到root属性是module类型的指针,modules 是module类型的指针数组,回过头来看构造函数New,它调用option的apply方法的时候传入的正是root属性。可见依赖注入需要的所有的数据都绑定到app的root属性上了。
接着初始化了lifecycle 属性
type lifecycleWrapper struct{ *lifecycle.Lifecycle }
func (l *lifecycleWrapper) Append(h Hook) {l.Lifecycle.Append(lifecycle.Hook{OnStart: h.OnStart,OnStop: h.OnStop,})}
然后通过dig.New初始化了依赖注入的容器,也就是container属性。接着对所有的modules执行了build和provideAll()方法,注意这里的modules绑定了我们前文提到的依赖注入的provider和invoke。然后给app.root绑定了三个全局的provide方法:Lifecycle,shutdowner,dotGraph。最后执行了 app.root.executeInvokes(),如果执行出错回生成dot格式的调用图,方便可视化查看。其实是调用了dig的dig.Visualize
func (m *module) build(app *App, root *dig.Container)if m.parent == nil {m.scope = root.Scope(m.name)// TODO: Once fx.Decorate is in-place,// use the root container instead of subscope.} else {parentScope := m.parent.scopem.scope = parentScope.Scope(m.name)}for _, mod := range m.modules {mod.build(app, root)}
主要是做了scope等属性的赋值操作,对于子孙module进行递归调用build方法。
func (m *module) provideAll()for _, p := range m.provides {m.provide(p)}for _, m := range m.modules {m.provideAll()}
provideAll也是同样进行了递归操作,调用了provide方法
func (m *module) provide(p provide)if err := runProvide(m.scope, p, dig.FillProvideInfo(&info), dig.Export(true)); err != nil
func runProvide(c container, p provide, opts ...dig.ProvideOption) errorconstructor := p.Targetcase annotatedctor, err := constructor.Build()if err := c.Provide(ctor, opts...); err != nil {case Annotated:if err := c.Provide(ann.Target, opts...); err != nil {default:c.Provide(constructor, opts...)
最终调用了容器的Provide方法,也就是dig的Provide流程,实现了依赖的构造函数的注入。
接着看下executeInvokes
func (m *module) executeInvokes() error {for _, invoke := range m.invokes {if err := m.executeInvoke(invoke); err != nil {return err}}for _, m := range m.modules {if err := m.executeInvokes(); err != nil {return err}}return nil}
它同样是对孩子节点实现了递归调用。自己调用了executeInvoke方法
func (m *module) executeInvoke(i invoke) (err error) {fnName := fxreflect.FuncName(i.Target)m.app.log.LogEvent(&fxevent.Invoking{FunctionName: fnName,ModuleName: m.name,})err = runInvoke(m.scope, i)
func runInvoke(c container, i invoke) error {fn := i.Targetswitch fn := fn.(type) {case Option:case annotated:af, err := fn.Build()if err != nil {return err}return c.Invoke(af)default:return c.Invoke(fn)
最终调用了容器的Invoke方法,也就是dig的Invoke方法。至此完成了App的初始化流程,接着开始运行App,也就是调用Run接口。
func (app *App) Run() {}if code := app.run(app.Done()); code != 0 {app.exit(code)}
func (app *App) run(done <-chan os.Signal) (exitCode int)err := app.Start(startCtx)sig := <-doneerr := app.Stop(stopCtx);
内部调用了Start和Stop两个接口,chan 收到done的信号后开始执行Stop
func (app *App) Done() <-chan os.Signalsignal.Notify(c, os.Interrupt, _sigINT, _sigTERM)
func (app *App) Start(ctx context.Context) (err error)withTimeout(ctx, &withTimeoutParams{hook: _onStartHook,callback: app.start,lifecycle: app.lifecycle,log: app.log,})
func (app *App) Stop(ctx context.Context) (err error)withTimeout(ctx, &withTimeoutParams{hook: _onStopHook,callback: app.lifecycle.Stop,lifecycle: app.lifecycle,log: app.log,})
默认运行两个函数执行15s
func withTimeout(ctx context.Context, param *withTimeoutParams) errorc <- param.callback(ctx)控制超时时间,令一个协程里调用if param.hook == _onStartHook {r = param.lifecycle.startHookRecords()} else {r = param.lifecycle.stopHookRecords()}
在函数里面执行力callback方法,对于start来说就是
func (app *App) start(ctx context.Context) errorerr := app.lifecycle.Start(ctx);stopErr := app.lifecycle.Stop(ctx)
对于stop来说就是 app.lifecycle.Stop
lifecycle的定义如下:
type Lifecycle struct {clock fxclock.Clocklogger fxevent.Loggerhooks []HooknumStarted intstartRecords HookRecordsstopRecords HookRecordsrunningHook Hookmu sync.Mutex}
它有一个Append接口,我们可以添加Hook
func (l *Lifecycle) Append(hook Hook) {// Save the caller's stack frame to report file/line number.if f := fxreflect.CallerStack(2, 0); len(f) > 0 {hook.callerFrame = f[0]}l.hooks = append(l.hooks, hook)}
其中通过函数 frames := runtime.CallersFrames(pcs)来获取当前的栈帧
func (l *Lifecycle) Start(ctx context.Context) error {for _, hook := range l.hooks{l.runningHook = hookruntime, err := l.runStartHook(ctx, hook)l.startRecords = append(l.startRecords, HookRecord{
func (l *Lifecycle) Stop(ctx context.Context) error{hook := l.hooks[l.numStarted-1]runtime, err := l.runStopHook(ctx, hook)
相应的函数如下
func (l *Lifecycle) runStartHook(ctx context.Context, hook Hook) (runtime time.Duration, err error)err = hook.OnStart(ctx)
func (l *Lifecycle) runStopHook(ctx context.Context, hook Hook) (runtime time.Duration, err error)err = hook.OnStop(ctx)
内部就是运行了我们注册额开始或者结束的hook
type Hook struct {OnStart func(context.Context) errorOnStop func(context.Context) error}
当然也兼容dig的In写法
type In = dig.In
总结下:fx.New(...)是fx初始化的地方,启动时主要里面包含[]Option,启动后生成根对象app;provide是将多个构造函数放到fx中,支持组合放入,放入之后fx会对每个option展开,然后一一放入fx中,效果和单个放入每个option一样,但是方便了option过多时的管理。fx中还有一个lifecycle的概念,直接隶属于根对象app,用于完成对象启动和停止时的Hook(钩子)操作,主要包含OnStart和OnStop两个方法,用户可以在要初始化的对象中定义自己的OnStart/OnStop方法,然后append到app.lifecycle.Hooks中。它带来一个比较大的便利是我们可以在module里面定义Module变量,在注入的时候直接注入这个Module变量,这个Module是Options的返回值,本质上是Provid的返回值
var Module = fx.Options(fx.Provide(ProvideLogger),)






