关注微信公众号《云原生CTO》更多云原生干货等你来探索
专注于 云原生技术
分享
提供优质 云原生开发
视频技术培训
面试技巧
,及技术疑难问题 解答

云原生技术分享不仅仅局限于Go
、Rust
、Python
、Istio
、containerd
、CoreDNS
、Envoy
、etcd
、Fluentd
、Harbor
、Helm
、Jaeger
、Kubernetes
、Open Policy Agent
、Prometheus
、Rook
、TiKV
、TUF
、Vitess
、Argo
、Buildpacks
、CloudEvents
、CNI
、Contour
、Cortex
、CRI-O
、Falco
、Flux
、gRPC
、KubeEdge
、Linkerd
、NATS
、Notary
、OpenTracing
、Operator Framework
、SPIFFE
、SPIRE
和 Thanos
等


Kubernetes Client-go中的Reflector事件反射器
Reflector
是 Kubernetes
的事件反射器。controller
和 Informer
使用它来change
、List
和 Watch
apiserver
中特定类型的资源对象,并将它们更新到 DeltaFIFO
和对象缓存,以便客户端随后可以从本地缓存中检索对象,而不必每次都与 apiserver
交互。
在介绍 Relfector
之前,先介绍一下 Reflector
使用的 ListerWatcher
接口。
注意⚠️此篇内容包含涉及关于
Kubernetes
源码二次开发概念偏多,并涉及到k8s client-go
交互式编程原理,如需深入理解,需要理解整个kubernetes
源码结构等相关知识,才能充分理解关于下述所有内容,深入掌握整个k8s client-go
对于从业kubernetes
云原生运维开发SRE
或者希望从业k8s
开发人员帮助巨大,对此我们开展了关于kubernetes源码
二次开发及k8s operator
二次开发的系列课,目的就是希望带你深入理解kubernetes
底层,通过原理与实战,成为kubernetes
专家,让接触k8s
的同学而不仅限于运维层面的理解,感兴趣请扫码联系我,并获取课程体系

ListWatcher 接口
ListWatcher
接口定义了 List
和 Watch
特定资源类型的方法:
List()
:从apiserver
获取一批特定类型的对象;Watch()
:从上面List()
获取的ResourceVersion
开始,通过apiserver Watch etcd
进行对象的改变。
ListWatcher
需要通过 RESTClient
和 apiserver
的 Get
方法进行通信,该方法由 Getter
接口定义。
// From k8s.io/client-go/tools/cache/listwatch.go`
type ListerWatcher interface {
// List() returns the list of objects; extracts ResourceVersion from these objects, the user's subsequent Watch() method parameters;
List(options metav1.ListOptions) (runtime.Object, error)
// The Watch() method starts Watch from the specified version (specified in options).
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// Getter interface knows how to access Get method from RESTClient.
type Getter interface {
Get() *restclient.Request
}
ListWatch 实现 ListWatcher 接口
ListWatch
类型实现了ListWatcher
接口,供K8S
内置资源对象的NewReflector()
函数和Informer
创建函数使用,例如NewFiltered Deployment Informer()
:
// From k8s.io/client-go/tools/cache/listwatch.go
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
DisableChunking bool
}
函数 NewListWatchFromClient()
和 NewFilteredListWatchFromClient()
返回 ListWatch
对象。
传入的Getter
参数是已经配置好的K8S
特定API Group/Version
的REST Client
。这里的“configured”
指的是rest.Config
,它创建了RESTClient
,已经配置了GroupVersion、APIPath、Negotiated Serializer
等参数。
对于K8S
内置对象,配置的REST Client
位于k8s.io/client-go/kubernetes/<group>/<version>/<group>_client.go
文件中,如ExtensionsV1beta1Client
对于自定义类型对象,配置的 RESTClient
位于 pkg/client/clientset/versioned/typed/<group>/<version>/<group>_client.go
文件中。
此配置的 RESTClient
适用于特定 APIGroup/Version
下的所有资源类型。发送Get()
请求时,还需要给Resource()
方法传递一个具体的资源类型名称,比如Resource("deployments")
以便唯一确定资源类型:/<APIPath>/<group >/<version>/namespaces/<resource>/<name>
比如/apis/apps/v1beta1/namespaces/default/deployment/my-nginx-111
。
// From k8s.io/client-go/tools/cache/listwatch.go
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// Resource type objects corresponding to List and Watch calling the Get() method of RESTClient
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
// c is a REST Client for an API Group/Version
return c.Get().
Namespace(namespace).
Resource(resource). // Specify a resource type name, such as "deployments"
VersionedParams(&options, metav1.ParameterCodec).
Do().
Get()
}
// When the WatchFunc function is actually invoked, options contain the ResultVersion value of the list of objects that ListFunc put back last time
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
ListWatch
的 List()
和 Watch()
方法是通过直接调用内部 ListFunc()
或 WatchFunc()
函数来实现的。它们更加直接和简单,因此不再对其进行分析。
使用 ListWatch 的ListWatch
后面我们会看到,每种资源类型都有自己特定的Informer
(自动生成的codegen
工具),比如DeploymentInformer
,它们使用自己资源类型的ClientSet
来初始化ListWatch
,只返回对应类型的对象:
// From k8s.io/client-go/informers/extensions/v1beta1/deployment.go
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// Creating ListWatch with RESTClient of a specific resource type
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ExtensionsV1beta1().Deployments(namespace).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ExtensionsV1beta1().Deployments(namespace).Watch(options)
},
},
&extensionsv1beta1.Deployment{},
resyncPeriod,
indexers,
)
}
Reflector反射器
解析完ListWatch pad
,我们终于可以开始解析Reflector
的实现了!
Reflector反射器类型定义
Reflector
使用 Lister Watcher
从 apiserver
同步 expectedType
对象和事件并将它们缓存到存储中(DeltaFIFO
类型)。
// From: k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// The name of the Reflector and the default value is file:line
name string
// metrics tracks basic metric information about the reflector
metrics *reflectorMetrics
// A Reflector specifies the type of object to monitor, and the field specifies the type of object.
expectedType reflect.Type
// store is used to cache object change events, usually DeltaFIFO, with reference to the NewInformer/NewIndexerInformer function
store Store
// listerWatcher for List and Watch resource objects
listerWatcher ListerWatcher
// When the ListAndWatch() method returns with an error (timeout), wait for the period time to execute the ListAndWatch() method again
period time.Duration
// The Reflector cycle calls the Reync () method of the store.
// For DeltaFIFO, all objects in the knownObjects object cache are synchronized to DeltaFIFO
resyncPeriod time.Duration
// Additional Judgment Functions in Executing resync
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
}
创建反射器Reflector对象的函数
函数 NewNamespace KeyedIndexerAndReflector()
、NewReflector()
、NewNamedReflector()
返回 Reflector
对象,其中 NewNamedReflector()
是其他两个方法的基础:
// From: k8s.io/client-go/tools/cache/reflector.go
var internalPackages = []string{"client-go/tools/cache/"}
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
// Use Namespace as IndexFunc and KeyFunc
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
return indexer, reflector
}
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
// Create a Reflector name in file:linenum using the directory file where the Reflector resides
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
r := &Reflector{
name: name,
// we need this to be unique per process (some names are still the same) but obvious who it belongs to
metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second, // Re-execution interval of Run() method wait.Until()
resyncPeriod: resyncPeriod, // Time interval for periodically calling the Reync () method of store
clock: &clock.RealClock{},
}
return r
}
Relector
对象一般由 Informer
的控制器创建
// Source: k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
...
// Create Reflector with controller's Config parameter
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue, // DeltaFIFO
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
...
// Run Method for Running Reflector
wg.StartWithChannel(stopCh, r.Run)
...
}
Run() 方法
Run()
方法总是运行 ListAndWatch()
方法,如果出现问题,它会等待 r.period
时间再次执行 ListAndWatch()
方法,因此Run()
方法直到 stopCh
关闭才会返回。
// From: k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
ListAndWatch() 方法
该方法是Refelector
的核心方法,它实现:
apiserver List
资源类型(ResourceVersion 0
)的所有对象;从对象列表中获取该类型对象的 resourceVersion
;调用内部 Store(DeltaFIFO)
的Replace()
方法将List
对象更新为内部DeltaFIFO
(生成Sync
事件,或者Deleted FinalState Unknown
类型的Deleted
事件);resyncPeriod
调用DeltaFIFO
的Rescycn()
方法将knownObjects
缓存中的对象同步到DeltaFIFO
(SYNC
事件)中,从而实现对所有资源类型对象进行周期性处理的功能。从 List
获取的resourceVersion
开始阻塞Watch apiserver
,并根据接收到的事件类型更新DeltaFIFO
;
// From: k8s.io/client-go/tools/cache/reflector.go
// The minimum timeout time of a Watch is actually a random value between [minWatch Timeout, 2 * minWatch Timeout]
var minWatchTimeout = 5 * time.Minute
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
// ResourceVersion: "0" denotes the current value of the object in etcd, and lists all objects of this type currently in etcd
options := metav1.ListOptions{ResourceVersion: "0"}
...
list, err := r.listerWatcher.List(options)
...
listMetaInterface, err := meta.ListAccessor(list)
...
// Get the resourceVersion of this type of object from the list of objects, and use it in subsequent Watch es
resourceVersion = listMetaInterface.GetResourceVersion()
...
items, err := meta.ExtractList(list)
...
// Synchronize items to r.store, or DeltaFIFO, using DeltaFIFO's Raplace () method
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
// Caching resourceVersion
r.setLastSyncResourceVersion(resourceVersion)
...
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
// The Reync () method of r.store is called periodically to synchronize the objects in the knownObjects object cache into DeltaFIFO.
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
...
// Watch will timeout after timeout Seconds, when r.watchHandler() error, ListAndWatch() method error return
// Reflecter waits for the r.period event to re-execute the ListAndWatch() method.
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
w, err := r.listerWatcher.Watch(options)
...
// Blocking Handling Watch Events
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
}
}
// Replace r.store with objects in items
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
// According to the type of event Watch arrives at, the method of r.store is called to update the object to r.store.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
loop:
for {
select {
...
case event, ok := <-w.ResultChan():
...
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
}
使用Reflector反射器的控制器
Controller
封装了Reflector
。Reflector
使用 Lister Watcher
从 apiserver
检索对象列表和事件,并将它们存储在 Delta FIFO
中。Controller
不断弹出Reflector
的Delta FIFO
,用弹出的Delta
更新其ClientState
缓存,并调用Informer
设置的OnAdd/Ondate/OnDelete
回调函数。
// Source: k8s.io/client-go/tools/cache/controller.go
type Config struct {
// The queue that caches ObjectType objects is also used by Reflector.
// When subsequent NewInformer and NewIndexerInformer create this configuration, they actually create a Queue of DeltaFIFO type.
Queue
// Controller creates Lister Watcher for ObjectType Reflector;
ListerWatcher
// For each event of the object, the handler is called
Process ProcessFunc
// Object types that the Controller focuses on and manages
ObjectType runtime.Object
// Periodic call to Queue's Reync () method
FullResyncPeriod time.Duration
// External judgment of whether a function of Resync() is needed (usually nil)
ShouldResync ShouldResyncFunc
// If Process fails to process pop-up objects, do you add objects back to Queue (usually false)
RetryOnError bool
}
// Controller is an implementation of Controller, but there is no New method to create it.
// So the controller is actually created and used by NewInformer and NewIndexer Informer.
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
Run() 方法
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
// Close the queue at the end of the controller run
c.config.Queue.Close()
}()
// Initialize Reflector for monitoring ObjectType objects according to Controller configuration
// When Refector is initialized, the Queue(DeltaFIFO) of Controller is passed in, so the Refector updates the Queue synchronously.
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
// Start Reflector in another goroutine
wg.StartWithChannel(stopCh, r.Run)
// Blocking executes c.processLoop, which is a dead Loop that returns the value only when an error occurs, and then waits for 1s to execute again.
wait.Until(c.processLoop, time.Second, stopCh)
}
processLoop() 方法
Deltas
事件从 DeltaFIFO
中弹出,然后调用配置的 PopProcessFunc
函数,它:
使用弹出对象更新 DeltaFIFO
使用的knownObjests
对象缓存(由控制器创建的客户端状态);调用用户注册的回调函数。
DelteFIFO
的 Pop()
方法在锁定条件下执行 PopProcessFunc
,所以即使多个 goroutine
并发调用 Pop()
方法,它们也是串行执行的,因此不会有多个 goroutine
同时处理一个资源对象。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
c.config.RetryOnError
一般为false
(参考后面的NewInformer()
函数),所以当PopProcessFunc
执行错误时,不会向DeltaFIFO
添加对象。
Informer使用控制器
NewInformer()
、NewIndexInformer()
函数使用 controller
来 List/Watch
特定资源类型的对象,在本地缓存它们,并调用用户提供的回调函数(存储在 ResourceEventHandler
中)。
// Source: k8s.io/client-go/tools/cache/controller.go
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// Store is a memory database that stores objects. It uses the KeyFunc function to get the unique access key of the object.
// NewStore actually returns a ThreadSafe type
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
// Do not put the object that failed to execute the failed Process back to Queue
RetryOnError: false,
// DeltaFIFO's Pop() method executes the Process function with internal locking, so updating and calling the OnUpdate/OnAdd/OnDelted processing function for clientState is serial.
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
// Decide whether to call OnUpdate() or OnAdd() Handler Based on whether or not the object is in clientState
// So, when Controller starts, because clientState is empty, OnAdd() handler is called for all the objects to which the list arrives.
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
// When deleting, the object is deleted from the clientState first, and then the user processing function is called.
if err := clientState.Delete(d.Object); err != nil {
return err
}
// d.Object may be either a native resource type object or a Deleted FinalState Unknowown type object, so the OnDeleted() function needs to be able to distinguish and process it.
h.OnDelete(d.Object)
}
}
return nil
},
}
return clientState, New(cfg)
}
HasSynced() 方法
调用 DeltaFIFO
的 HasSynced()
方法。
当processLoop()
弹出Delta FIFO
中的第一个Reflector List
对象并结束处理时,该方法返回true
,然后一直返回true
。
func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}
sharedInformer/sharedIndexInformer
的 HasSynced()
方法实际上调用了控制器的 HasSynced()
方法,该方法的签名与 InformerSynced
函数类型的签名相同:
// Source: k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
return false
}
return s.controller.HasSynced()
}
type InformerSynced func() bool
使用 HasSynced() 方法自定义控制器场景
在开发 K8S Controller
时,一个约定是调用 cache.WaitForCacheSync
并等待所有 Informer Cache
同步,然后再启动消费 workqueue
的 worker
:
// Source: https://github.com/kubernetes/sample-controller/blob/master/controller.go
// Customized Controller
type Controller struct {
...
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced // InformerSynced function type
...
}
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {
...
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced, // Infomer's HasSynced() method
}
...
}
// Wait for all types of Informer's HasSynced() method to return to true before starting workers
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
...
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
// Launch two workers to process Foo resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
...
}
为什么要等到informer
的HasSynced()
返回true
才启动worker
?
因为当 HasSynced()
返回 true
时,说明第一批 Reflecter List
对象从 DeltaFIFO
中弹出并被控制器更新到 clientState
缓存中,这样 worker
就可以通过对象名从 Lister
中获取到对象了。否则,对象可能仍然在 DeltaFIFO
中,并且没有同步到 clientState
缓存,从而使 worker
无法通过其对象名称从 Lister
中获取对象。
5.3 参考资料
参考[1]
参考资料
参考: https://programmer.ink/think/kubernetes-event-reflector.html




