k8s的
List-Watch和
informer设计原则:
informer
是什么?解决了什么问题?informer
源码解读:k8s
资源同步奥秘项目中如何使用 informer
?
informer是什么?解决了什么问题?
我们知道k8s
有很多组件,频繁和apiserver
交互获取资源数据,这就带来了性能问题,特别是高并发请求下, 性能的损失是难以想象的。informer
组件就是在这种背景下产生的,能有效同步各组件通信的可靠性,并且减缓对apiserver
和etcd
负担。
既然informer
组件作为k8s
同步资源的利器,那么它的工作原理如何呢?
先贴一张图,看图说话, 我们有个感性的认识,后面再进行验证和细化。

根据原理图,大致可以分为以下步骤:
reflector
通过长连接list-watch
资源对象,将资源对象加入一个先进先出的队列;informrer
消费资源对象;右侧,将消费的资源对象加入一个本地索引器 indexer
,通过key-object
方式储存;左侧,消费的资源对象通过分发给事件处理函数处理;
带着这个基本认识,我们一起读源码验证一下,看是否跟图中的流程匹配。
informer
源码解读:k8s
资源同步奥秘
先看看这个package
的入口,定义了哪些方法。
代码地址:
https://github.com/kubernetes/client-go/blob/master/informers/factory.go#L109:6
以下定义的是一个共享informer
工厂方法,具体有哪些属性可以以下的参考代码片段。这里的实现比较有意思,显示定义了默认参数,然后用opt(factory)
实现了个性化参数配置,这部分代码还是很精妙的。

注意上面的informers
属性对应的map
中的cache.SharedIndexInformer
, 顾名思义,不同种类的共享索引informer
,是今天解剖的重点。
这里的Start
方法用协程起了工厂方法的每个informer
函数:

InFormerFor
则返回给定对象资源的共享索引informer
:

而informers
的同步,可以用WaitForCacheSync
来确定,该函数等待所有的启动的informer
同步资源。

Informersynced
可用于确定Informer
是否已同步的bool
类型的函数。wait.PollImmediateUntil
的作用是在等待间隔之前运行条件函数。两者结合起来,可以用来判断informers
同步状态。

自此,我们有了一个大概的认识,sharedInformerFactory
结构体是通过工厂方法来完成informers
的注册的,对应于sharedInformerFactory.informers
。此外,该结构体方法还实现了判断informers
同步状态,获取指定资源的informer
等功能。
下面,我们正式看看informer
为何方神物。
informer
https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L368
从informer
的属性,我们就可以知道,它包括了Deltafifo
队列、list-Watcher
、回调函数和其他辅助参数。
sharedIndexInformer.Run
的函数体中,在controller.Run
之前会启动一个处理器s.processor.run
, 这个组件用来完成工作队列中获取资源对象进行回调函数消费。只需要记住,运行controller
之前有个消费者协程运行了,它可以注册linstener
, 用来从工作队列获取消息消费。

我们再来看看controller.Run
的运行逻辑。
controller
运行的时候,创建了一个reflector
, 用它来进行listAndWatch
,同时会启动一个c.processLoop
, c.processLoop
作为消费者从 reflector
的队列中pop
对象资源。
这里先埋一个伏笔。后面会介绍这两个组件具体来干啥的。

refector
refector.Run
的时候会运行ListAndWatch
。

ListAndWatch
干的第一件事是启动一个协程list-watch
, 按照要求把数据同步过来,然后将对象资源进行指定版本提取。
go func() {defer func() {if r := recover(); r != nil {panicCh <- r}}()// 如果listwatcher支持长连接,数据就用块方式获取, 如果不支持就全量返回pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)}))switch {case r.WatchListPageSize != 0:pager.PageSize = r.WatchListPageSizecase r.paginatedResult:case options.ResourceVersion != "" && options.ResourceVersion != "0":pager.PageSize = 0}list, paginatedResult, err = pager.List(context.Background(), options)if isExpiredError(err) || isTooLargeResourceVersionError(err) {r.setIsLastSyncResourceVersionUnavailable(true)list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})}close(listCh)}----r.setIsLastSyncResourceVersionUnavailable(false) // list was successfulinitTrace.Step("Objects listed")listMetaInterface, err := meta.ListAccessor(list)if err != nil {return fmt.Errorf("unable to understand list result %#v: %v", list, err)}resourceVersion = listMetaInterface.GetResourceVersion()initTrace.Step("Resource version extracted")items, err := meta.ExtractList(list)if err != nil {return fmt.Errorf("unable to understand list result %#v (%v)", list, err)}initTrace.Step("Objects extracted")if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("unable to sync list result: %v", err)}initTrace.Step("SyncWith done")r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")return nil----
数据处理完了将会怎样处理呢?
注意代码中做了一个refector.syncWith
的操作,调用了store
方法进行了DeletaQueue
队列资源对象更新。这里解释下store
, 它的抽象实现是DeletaQueue
,可以从上下文配置看到。

代码接着往下走,会启动一个的协程去做周期性地强制同步,直到接受超时或者停止信号。
这个逻辑实现主要通过一个定时器Ticker
和同步函数来完成的。这个同步函数也就是r.store.Resync()
,作用是将store
的对象资源的key
同步给queue
,这一步将需要处理的key
进行了入队,便于后续任务分发步骤进行消费。注意这里只是入队了相应的key
,真正需要处理时才会从缓存索引器indexer
中获取。这个索引器有点超纲,莫慌,后面还会解释。

最后,会进入一个死循环,主要逻辑在watchHandler
,用来观察资源变化对detlaqueue
队列资源更新。

watchHandler
方法根据watch
到的不同事件和方法,来调用store
的相关方法进行处理。

这个store
是一种接口,根据上下文,采用的是DeltaFIFO
队列作为接口的实现。
这个结构体提供了很多方法,感兴趣可以看看。比较重要的函数就是queueActionLocked
这个逻辑,主要是做了一个去重和广播,防止事件的重复处理。

至此整个list-watch
就结束了。
如此看来,reflector
生产对象资源过程中,使用list-watch
建立长连接(可选一次性拉取过来),提取版本化对象资源,调用store
方法,来完成deltafifo
队列的消息入队。
对象资源消费
reflector
生产对象资源加入deltafifo
队列,有生产就有消费,那么,这个消费组件是什么?
根据图示,从deltafifo
队列中取出资源对象,会更新本地索引indexer
, 同时也会根据消息事件来完成回调函数处理逻辑。
前文有两个组件sharedIndexInformer.processor.run
和controller.processLoop
没有解释,现在先回顾一下,然后解释他们的作用。
sharedIndexInformer.Run
会启动s.processor.run
协程和controller.Run
, s.processor.run
协程用来捕获消息,加入工作队列,并触发回调函数处理逻辑;其次,controller.Run
的时候, 创建了reflector
和controller.processLoop
协程,这个controller.processLoop
协程就是作为消费者,从deltafifo
队列中消费对象资源,传递给s.processor.run
协程消费。
由于这两个充当的角色相辅相成,按照上下文逻辑关系,先说controller.processLoop
。
controller.processLoop
controller.Run
启动时候,会启动一个c.processLoop
的循环,它作为消费者,从deltafifo
队列中pop
消费。

Pop
函数从deltafifo
队列头部取出一个对象元素进行处理,处理失败则要加回去。如果队列中的元素不足时候会阻塞,直到有对象资源可以消费。

Pop
函数体中有个process(item)
, 这里的process
,其实来源于sharedIndexInformer.HandleDeltas
,具体可以参考sharedIndexInformer.Run
函数体中cfg
的实例化。
通过sharedIndexInformer.HandleDeltas
这个函数, 实现了Deltas
资源对象的分发和索引器indexer
更新。
所谓indexer
是一个线程访问安全的本地存储组件,能实现资源对象key-object
快速访问。感兴趣的话可以研究下它的实现。
值得一提的是,queue
只存储了资源的索引,在真正对资源进行处理时,需要根据资源索引去这个indexer
缓存中获取数据的。

同时,s.processor.distribute
这个函数会将事件消息发送给processorListener
发送到通道processListener.addCh
。


也就是说,controller.processLoop
每次会消费deltafifo
队列的第一个对象元素,转发给processorListener
的addCh
通道。
sharedIndexInformer.processor.run
前面说到sharedIndexInformer.Run
会启动s.processor.run
协程,它会遍历processorListeners
,通过·listener.run
和listener.pop
两个函数,从每一个listener.addCh
获取消息,消费事件消息,完成函数回调处理。具体逻辑参考注册EventHandler
事件。

注册EventHandler
事件
我们再来看注册回调函数这块,EventHandler
事件的注册是通过informer
的AddEventHandler
方法进行的;

在注册的时候,会判断共享索引informer
是否启动,然后通过addListener
注册listener
;

注意到addListener
函数和上面启动的processor.Run
函数一样,都通过waitGroup
启动了两个函数,一个是listener.run
, 一个是listener.pop
。

listener.run
函数负责从p.nextCh
遍历事件消息,通知相应的函数处理。

listener.pop
函数负责从p.addCh
取出事件消息,放入nextCh
, 一旦有消息放入,就会唤醒listener.run
进行处理。

这样,预先启动processor.Run
协,通过注册回调函数,完成了回调函数处理资源对象这个逻辑。
综上所述,图示左侧的对象资源分发任务通过sharedIndexInformer.processor.run
和controller.processLoop
两部分协作完成,前者负责从后者的listener.addCh
取出事件进行消费,执行具体回调函数处理逻辑,后者则负责从deltafifo
队列中取出对象元素,发送给前者。
至此,我们一起读了大部分源码,回头再总结一下:
reflector
通过长连接list-watch
资源对象,将资源对象进行版本化提取,加入deltafifo
队列;controller.processLoop
协程会从deltafifo
队列取出资源对象进行消费,通过sharedIndexInformer.HandleDeltas
这个函数, 实现了Deltas
资源对象的分发和索引器indexer
更新;关于资源分发这块, controller.processLoop
每次会消费deltafifo
队列的第一个对象元素,转发给processorListener
的addCh
通道。当 sharedIndexInformer.processor
协程启动后,存在或注册了listener
的情况下,会收到来自通道addCh
的消息,就开始过分发给事件处理函数处理,形成闭环;
项目中如何使用informer
?
项目中使用informer
,只需要根据资源对象获取informer
, 注册回调函数即可。因为其他逻辑k8s
帮你处理好了,这就是operator
的魅力所在,结合crd
和informer
的优秀设计,让我们快速开发一个满足业务需求的operator
。

至此,本章就这么多啦~
码字不易,如果本篇对你有所帮助,别忘了点个小星星!更多内容请关注公众号:玩转云原生!




