暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解k8s informer设计原则

玩转云原生 2021-06-15
971
本文将从以下四个方面深入理解k8s
List-Watch
informer
设计原则:
  • informer
    是什么?解决了什么问题?
  • informer
    源码解读:k8s
    资源同步奥秘
  • 项目中如何使用informer

informer是什么?解决了什么问题?

我们知道k8s
有很多组件,频繁和apiserver
交互获取资源数据,这就带来了性能问题,特别是高并发请求下, 性能的损失是难以想象的。informer
组件就是在这种背景下产生的,能有效同步各组件通信的可靠性,并且减缓对apiserver
etcd
负担。

既然informer
组件作为k8s
同步资源的利器,那么它的工作原理如何呢?

先贴一张图,看图说话, 我们有个感性的认识,后面再进行验证和细化。

k8s informer 原理图

根据原理图,大致可以分为以下步骤:

  • reflector
    通过长连接list-watch
    资源对象,将资源对象加入一个先进先出的队列;
  • informrer
    消费资源对象;
  • 右侧,将消费的资源对象加入一个本地索引器indexer
    ,通过key-object
    方式储存;
  • 左侧,消费的资源对象通过分发给事件处理函数处理;

带着这个基本认识,我们一起读源码验证一下,看是否跟图中的流程匹配。

informer
源码解读:k8s
资源同步奥秘

先看看这个package
的入口,定义了哪些方法。

代码地址:

https://github.com/kubernetes/client-go/blob/master/informers/factory.go#L109:6

以下定义的是一个共享informer
工厂方法,具体有哪些属性可以以下的参考代码片段。这里的实现比较有意思,显示定义了默认参数,然后用opt(factory)
实现了个性化参数配置,这部分代码还是很精妙的。

image-20210611160229776

注意上面的informers
属性对应的map
中的cache.SharedIndexInformer
, 顾名思义,不同种类的共享索引informer
,是今天解剖的重点。

这里的Start
方法用协程起了工厂方法的每个informer
函数:

image-20210611161234684

InFormerFor
则返回给定对象资源的共享索引informer

informers
的同步,可以用WaitForCacheSync
来确定,该函数等待所有的启动的informer
同步资源。

image-20210611161422565

Informersynced
可用于确定Informer
是否已同步的bool
类型的函数。wait.PollImmediateUntil
的作用是在等待间隔之前运行条件函数。两者结合起来,可以用来判断informers
同步状态。

image-20210611170714471

自此,我们有了一个大概的认识,sharedInformerFactory
结构体是通过工厂方法来完成informers
的注册的,对应于sharedInformerFactory.informers
。此外,该结构体方法还实现了判断informers
同步状态,获取指定资源的informer
等功能。

下面,我们正式看看informer
为何方神物。

informer

https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L368

informer
的属性,我们就可以知道,它包括了Deltafifo
队列、list-Watcher
、回调函数和其他辅助参数。

sharedIndexInformer.Run
的函数体中,在controller.Run
之前会启动一个处理器s.processor.run
, 这个组件用来完成工作队列中获取资源对象进行回调函数消费。只需要记住,运行controller
之前有个消费者协程运行了,它可以注册linstener
,  用来从工作队列获取消息消费。

image-20210611173851940

我们再来看看controller.Run
的运行逻辑。

controller
运行的时候,创建了一个reflector
, 用它来进行listAndWatch
,同时会启动一个c.processLoop
, c.processLoop
作为消费者从    reflector
的队列中pop
对象资源。

这里先埋一个伏笔。后面会介绍这两个组件具体来干啥的。

image-20210611175129052
refector

refector.Run
的时候会运行ListAndWatch

image-20210611175336283

ListAndWatch
干的第一件事是启动一个协程list-watch
, 按照要求把数据同步过来,然后将对象资源进行指定版本提取。

go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// 如果listwatcher支持长连接,数据就用块方式获取, 如果不支持就全量返回
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
case options.ResourceVersion != "" && options.ResourceVersion != "0":
pager.PageSize = 0
}


list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}
----
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
----

数据处理完了将会怎样处理呢?

注意代码中做了一个refector.syncWith
的操作,调用了store
方法进行了DeletaQueue
队列资源对象更新。这里解释下store
, 它的抽象实现是DeletaQueue
,可以从上下文配置看到。

image-20210614131820188

代码接着往下走,会启动一个的协程去做周期性地强制同步,直到接受超时或者停止信号。

这个逻辑实现主要通过一个定时器Ticker
和同步函数来完成的。这个同步函数也就是r.store.Resync()
,作用是将store
的对象资源的key
同步给queue
,这一步将需要处理的key
进行了入队,便于后续任务分发步骤进行消费。注意这里只是入队了相应的key
,真正需要处理时才会从缓存索引器indexer
中获取。这个索引器有点超纲,莫慌,后面还会解释。

image-20210614132028745

最后,会进入一个死循环,主要逻辑在watchHandler
,用来观察资源变化对detlaqueue
队列资源更新。

image-20210612203101684

watchHandler
方法根据watch
到的不同事件和方法,来调用store
的相关方法进行处理。

image-20210612203252457

这个store
是一种接口,根据上下文,采用的是DeltaFIFO
队列作为接口的实现。

这个结构体提供了很多方法,感兴趣可以看看。比较重要的函数就是queueActionLocked
这个逻辑,主要是做了一个去重和广播,防止事件的重复处理。

image-20210614141929247

至此整个list-watch
就结束了。

如此看来,reflector
生产对象资源过程中,使用list-watch
建立长连接(可选一次性拉取过来),提取版本化对象资源,调用store
方法,来完成deltafifo
队列的消息入队。

对象资源消费

reflector
生产对象资源加入deltafifo
队列,有生产就有消费,那么,这个消费组件是什么?

根据图示,从deltafifo
队列中取出资源对象,会更新本地索引indexer
,  同时也会根据消息事件来完成回调函数处理逻辑。

前文有两个组件sharedIndexInformer.processor.run
controller.processLoop
没有解释,现在先回顾一下,然后解释他们的作用。

sharedIndexInformer.Run
会启动s.processor.run
协程和controller.Run
s.processor.run
协程用来捕获消息,加入工作队列,并触发回调函数处理逻辑;其次,controller.Run
的时候, 创建了reflector
controller.processLoop
协程,这个controller.processLoop
协程就是作为消费者,从deltafifo
队列中消费对象资源,传递给s.processor.run
协程消费。

由于这两个充当的角色相辅相成,按照上下文逻辑关系,先说controller.processLoop

controller.processLoop

controller.Run
启动时候,会启动一个c.processLoop
的循环,它作为消费者,从deltafifo
队列中pop
消费。

image-20210613183712412

Pop
函数从deltafifo
队列头部取出一个对象元素进行处理,处理失败则要加回去。如果队列中的元素不足时候会阻塞,直到有对象资源可以消费。

image-20210614145027121

Pop
函数体中有个process(item)
, 这里的process
,其实来源于sharedIndexInformer.HandleDeltas
,具体可以参考sharedIndexInformer.Run
函数体中cfg
的实例化。

通过sharedIndexInformer.HandleDeltas
这个函数,  实现了Deltas
资源对象的分发和索引器indexer
更新。

所谓indexer
是一个线程访问安全的本地存储组件,能实现资源对象key-object
快速访问。感兴趣的话可以研究下它的实现。

值得一提的是,queue
 只存储了资源的索引,在真正对资源进行处理时,需要根据资源索引去这个indexer
缓存中获取数据的。

image-20210613205518690

同时,s.processor.distribute
这个函数会将事件消息发送给processorListener
发送到通道processListener.addCh

image-20210613210936009
image-20210613183105270

也就是说,controller.processLoop
每次会消费deltafifo
队列的第一个对象元素,转发给processorListener
addCh
通道。

sharedIndexInformer.processor.run

前面说到sharedIndexInformer.Run
会启动s.processor.run
协程,它会遍历processorListeners
,通过·listener.run
listener.pop
两个函数,从每一个listener.addCh
获取消息,消费事件消息,完成函数回调处理。具体逻辑参考注册EventHandler
事件。

image-20210614122834765
注册EventHandler
事件

我们再来看注册回调函数这块,EventHandler
事件的注册是通过informer
AddEventHandler
方法进行的;

image-20210613152316850

在注册的时候,会判断共享索引informer
是否启动,然后通过addListener
注册listener
;

image-20210613152454803

注意到addListener
函数和上面启动的processor.Run
函数一样,都通过waitGroup
启动了两个函数,一个是listener.run
, 一个是listener.pop

image-20210613191252310

listener.run
函数负责从p.nextCh
遍历事件消息,通知相应的函数处理。

image-20210613124753402

listener.pop
函数负责从p.addCh
取出事件消息,放入nextCh
, 一旦有消息放入,就会唤醒listener.run
进行处理。

image-20210613124809257

这样,预先启动processor.Run
协,通过注册回调函数,完成了回调函数处理资源对象这个逻辑。

综上所述,图示左侧的对象资源分发任务通过sharedIndexInformer.processor.run
controller.processLoop
两部分协作完成,前者负责从后者的listener.addCh
取出事件进行消费,执行具体回调函数处理逻辑,后者则负责从deltafifo
队列中取出对象元素,发送给前者。

至此,我们一起读了大部分源码,回头再总结一下:

  • reflector
    通过长连接list-watch
    资源对象,将资源对象进行版本化提取,加入deltafifo
    队列;
  • controller.processLoop
    协程会从deltafifo
    队列取出资源对象进行消费,通过sharedIndexInformer.HandleDeltas
    这个函数,  实现了Deltas
    资源对象的分发和索引器indexer
    更新;
  • 关于资源分发这块,controller.processLoop
    每次会消费deltafifo
    队列的第一个对象元素,转发给processorListener
    addCh
    通道。
  • sharedIndexInformer.processor
    协程启动后,存在或注册了listener
    的情况下,会收到来自通道addCh
    的消息,就开始过分发给事件处理函数处理,形成闭环;

项目中如何使用informer

项目中使用informer
,只需要根据资源对象获取informer
, 注册回调函数即可。因为其他逻辑k8s
帮你处理好了,这就是operator
的魅力所在,结合crd
informer
的优秀设计,让我们快速开发一个满足业务需求的operator

image-20210614153800770

至此,本章就这么多啦~

码字不易,如果本篇对你有所帮助,别忘了点个小星星!更多内容请关注公众号:玩转云原生!



文章转载自玩转云原生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论