client-go中的informer机制可以说是客户端交互中一种比较特别的机制,适用于像k8s这种控制器的各种模式。
informer
说完了client-go中的client的直接使用,下面我们来看看一种在k8s中很常用但是相对client又很独特的用法informer,client-go中区别其他语言的client的一个较为牛逼的设计就在于informer机制,其他语言很多都直接通过rest api同kubernetes交互,虽然在少量的情况下是没有问题的,但是在大量的连接下会出现各种问题,例如连接太多,状态不同步等等问题,client-go作为在kubernetes源码中都有很多应用的库,在解决这些问题上提供了一个高端的做法informer机制。
informer在初始化的时先通过reflector List去从Kubernetes API中取出资源的全部object对象,并同时缓存,然后开启Watch的机制去监控资源,并放入DeltaFIFO中.这样每次获取资源不需要再去kubernetes实时获取,而是通过本地的indexer缓存得到对象,整体连接资源的使用率都降低了不少。
Informer还提供了handler机制,需要提供ResourceEventHandler接口的实现来响应OnAdd/OnUpdate/OnDelete事件,下面详细说明。
informer提供了内置的原生的资源的支持,如果是自定义的crd资源,需要自己实现对应的控制器,我们实现的控制器也使用了informer机制。
架构图
我们先来看一下informer的实现架构图
根据架构图我们可以看出Informer主要有两个作用:
- 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;
- 注册相应的事件,之后如果监听到的事件变化就会调用事件对应的EventHandler,实现回调。
基本使用和源码解析
我们先来看一下informer源码的目录结构,在client-go中有一个专门的informer目录来实现informer。
[root@dev1 informers]# pwd
/root/go/pkg/mod/k8s.io/client-go@v0.21.3/informers
[root@dev1 informers]# tree .
.
├── admissionregistration
│ ├── interface.go
│ ├── v1
│ │ ├── interface.go
│ │ ├── mutatingwebhookconfiguration.go
│ │ └── validatingwebhookconfiguration.go
│ └── v1beta1
│ ├── interface.go
│ ├── mutatingwebhookconfiguration.go
│ └── validatingwebhookconfiguration.go
├── apiserverinternal
│ ├── interface.go
│ └── v1alpha1
│ ├── interface.go
│ └── storageversion.go
├── apps
│ ├── interface.go
│ ├── v1
│ │ ├── controllerrevision.go
│ │ ├── daemonset.go
│ │ ├── deployment.go
│ │ ├── interface.go
│ │ ├── replicaset.go
│ │ └── statefulset.go
│ ├── v1beta1
│ │ ├── controllerrevision.go
│ │ ├── deployment.go
│ │ ├── interface.go
│ │ └── statefulset.go
│ └── v1beta2
│ ├── controllerrevision.go
│ ├── daemonset.go
│ ├── deployment.go
│ ├── interface.go
│ ├── replicaset.go
│ └── statefulset.go
├── autoscaling
│ ├── interface.go
│ ├── v1
│ │ ├── horizontalpodautoscaler.go
│ │ └── interface.go
│ ├── v2beta1
│ │ ├── horizontalpodautoscaler.go
│ │ └── interface.go
│ └── v2beta2
│ ├── horizontalpodautoscaler.go
│ └── interface.go
├── batch
│ ├── interface.go
│ ├── v1
│ │ ├── cronjob.go
│ │ ├── interface.go
│ │ └── job.go
│ └── v1beta1
│ ├── cronjob.go
│ └── interface.go
├── certificates
│ ├── interface.go
│ ├── v1
│ │ ├── certificatesigningrequest.go
│ │ └── interface.go
│ └── v1beta1
│ ├── certificatesigningrequest.go
│ └── interface.go
├── coordination
│ ├── interface.go
│ ├── v1
│ │ ├── interface.go
│ │ └── lease.go
│ └── v1beta1
│ ├── interface.go
│ └── lease.go
├── core
│ ├── interface.go
│ └── v1
│ ├── componentstatus.go
│ ├── configmap.go
│ ├── endpoints.go
│ ├── event.go
│ ├── interface.go
│ ├── limitrange.go
│ ├── namespace.go
│ ├── node.go
│ ├── persistentvolumeclaim.go
│ ├── persistentvolume.go
│ ├── pod.go
│ ├── podtemplate.go
│ ├── replicationcontroller.go
│ ├── resourcequota.go
│ ├── secret.go
│ ├── serviceaccount.go
│ └── service.go
├── discovery
│ ├── interface.go
│ ├── v1
│ │ ├── endpointslice.go
│ │ └── interface.go
│ └── v1beta1
│ ├── endpointslice.go
│ └── interface.go
├── events
│ ├── interface.go
│ ├── v1
│ │ ├── event.go
│ │ └── interface.go
│ └── v1beta1
│ ├── event.go
│ └── interface.go
├── extensions
│ ├── interface.go
│ └── v1beta1
│ ├── daemonset.go
│ ├── deployment.go
│ ├── ingress.go
│ ├── interface.go
│ ├── networkpolicy.go
│ ├── podsecuritypolicy.go
│ └── replicaset.go
├── factory.go
├── flowcontrol
│ ├── interface.go
│ ├── v1alpha1
│ │ ├── flowschema.go
│ │ ├── interface.go
│ │ └── prioritylevelconfiguration.go
│ └── v1beta1
│ ├── flowschema.go
│ ├── interface.go
│ └── prioritylevelconfiguration.go
├── generic.go
├── internalinterfaces
│ └── factory_interfaces.go
├── networking
│ ├── interface.go
│ ├── v1
│ │ ├── ingressclass.go
│ │ ├── ingress.go
│ │ ├── interface.go
│ │ └── networkpolicy.go
│ └── v1beta1
│ ├── ingressclass.go
│ ├── ingress.go
│ └── interface.go
├── node
│ ├── interface.go
│ ├── v1
│ │ ├── interface.go
│ │ └── runtimeclass.go
│ ├── v1alpha1
│ │ ├── interface.go
│ │ └── runtimeclass.go
│ └── v1beta1
│ ├── interface.go
│ └── runtimeclass.go
├── policy
│ ├── interface.go
│ ├── v1
│ │ ├── interface.go
│ │ └── poddisruptionbudget.go
│ └── v1beta1
│ ├── interface.go
│ ├── poddisruptionbudget.go
│ └── podsecuritypolicy.go
├── rbac
│ ├── interface.go
│ ├── v1
│ │ ├── clusterrolebinding.go
│ │ ├── clusterrole.go
│ │ ├── interface.go
│ │ ├── rolebinding.go
│ │ └── role.go
│ ├── v1alpha1
│ │ ├── clusterrolebinding.go
│ │ ├── clusterrole.go
│ │ ├── interface.go
│ │ ├── rolebinding.go
│ │ └── role.go
│ └── v1beta1
│ ├── clusterrolebinding.go
│ ├── clusterrole.go
│ ├── interface.go
│ ├── rolebinding.go
│ └── role.go
├── scheduling
│ ├── interface.go
│ ├── v1
│ │ ├── interface.go
│ │ └── priorityclass.go
│ ├── v1alpha1
│ │ ├── interface.go
│ │ └── priorityclass.go
│ └── v1beta1
│ ├── interface.go
│ └── priorityclass.go
└── storage
├── interface.go
├── v1
│ ├── csidriver.go
│ ├── csinode.go
│ ├── interface.go
│ ├── storageclass.go
│ └── volumeattachment.go
├── v1alpha1
│ ├── csistoragecapacity.go
│ ├── interface.go
│ └── volumeattachment.go
└── v1beta1
├── csidriver.go
├── csinode.go
├── csistoragecapacity.go
├── interface.go
├── storageclass.go
└── volumeattachment.go
58 directories, 155 files
可以看到每一个资源都有一个对应的目录,每个目录下都定义了对应的资源接口。
我们通过一个实例来看informer机制的实现。
package main
import (
"flag"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"log"
"path/filepath"
"time"
)
func main() {
var kubeconfig *string
//如果是windows,那么会读取C:\Users\xxx\.kube\config 下面的配置文件
//如果是linux,那么会读取~/.kube/config下面的配置文件
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
//表示每分钟进行一次resync,resync会周期性地执行List操作
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
informer := sharedInformers.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Store: %s", mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("Pod Deleted from Store: %s", mObj.GetName())
},
})
informer.Run(stopCh)
}
运行
2020/10/17 15:13:10 New Pod Added to Store: dns-test
2020/10/17 15:13:10 New Pod Added to Store: web-1
2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph
2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2
2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k
2020/10/17 15:13:10 New Pod Added to Store: mongodb
2020/10/17 15:13:10 New Pod Added to Store: web-0
....
分析
1、初始化
首先clientset我们在上一篇已经讲解过了,可见informer也是基于clientset的,我们直接看informer的初始化NewSharedInformerFactory
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
可以看到NewSharedInformerFactory方法最终会调用到NewSharedInformerFactoryWithOptions初始化一个sharedInformerFactory,在初始化的时候会初始化一个informers,用来缓存不同类型的informer。
2、创建具体类型的informer
我们来看不同类型的初始化
podInformer := sharedInformers.Core().V1().Pods().Informer()
//nodeInformer := sharedInformers.Node().V1beta1().RuntimeClasses().Informer()
可以看到不同的函数得到的是不同的结构体的informer,比如上面的pod和node。我来追踪一下对应的函数
func (f *sharedInformerFactory) Core() core.Interface {
return core.New(f, f.namespace, f.tweakListOptions)
}
type group struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {
return v1.New(g.factory, g.namespace, g.tweakListOptions)
}
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
//使用pod的结构体和defaultInformer方法来创建
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
//获取informer类型,这边就是pod
informerType := reflect.TypeOf(obj)
//查找map缓存,如果存在,那么直接返回
informer, exists := f.informers[informerType]
if exists {
return informer
}
//根据类型查找resync的周期
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
//调用defaultInformer方法创建informer,并且存储起来
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
InformerFor方法里面首先会去sharedInformerFactory的map缓存中根据类型查找对应的informer,如果存在那么直接返回,如果不存在,那么则会调用newFunc方法创建informer,然后设置到informers缓存中。
3、创建informer
调用InformerFor方法的时候会传入defaultInformer方法用于创建informer。我们来看看defaultInformer方法
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
//调用client获取pod列表
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
//调用client监控pod列表
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
这里是真正的创建一个informer,并注册了List&Watch的回调函数。
4、index
sharedIndexInformer里面会创建sharedProcessor,设置List&Watch的回调函数,创建了一个indexer,我们这里看一下NewIndexer是怎么创建indexer的:
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return MetaNamespaceKeyFunc(obj)
}
NewIndexer方法创建了一个cache,它的keyFunc是DeletionHandlingMetaNamespaceKeyFunc,即接受一个object,生成它的namepace/name的字符串。cache里面的数据会存放到cacheStorage中,它是一个threadSafeMap用来存储资源对象并自带索引功能的本地存储。
5、注册事件
上面的初始化已经结束,我们再继续回到实例往下看,通过上面创建的informer的AddEventHandler函数来注册事件函数,我们先来看一下AddEventHandler函数。
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
//初始化listener
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
//如果informer还没启动,那么直接将监听器加入到processor监听器列表中
if !s.started {
s.processor.addListener(listener)
return
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
//如果informer已经启动,那么需要加锁
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
//然后将indexer中缓存的数据写入到listener中
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
先调用newProcessListener初始化listener,接着会校验informer是否已经启动,如果没有启动,那么直接将监听器加入到processor监听器列表中并返回;如果informer已经启动,那么需要加锁将监听器加入到processor监听器列表中,然后将indexer中缓存的数据写入到listener中。
6、run
前期做了这么多工作,最后我们会在实例中的最后调用了informer的run函数来启动informer模块。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
//初始化DeltaFIFO队列
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
//设置Queue为DeltaFIFO队列
Queue: fifo,
//设置List&Watch的回调函数
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
//设置Resync周期
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
//判断有哪些监听器到期需要被Resync
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
//异步创建controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
//调用run方法启动processor
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
//启动controller
s.controller.Run(stopCh)
}
核心流程如下
- 调用NewDeltaFIFOWithOptions方法初始化DeltaFIFO队列;
- 初始化Config结果体,作为创建controller的参数;
- 异步创建controller;
- 调用run方法启动processor;
- 调用run方法启动controller;
7、processor的run
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
//遍历监听器
for _, listener := range p.listeners {
//下面两个方法是核心的事件call back的方法
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
可见同时调用了processorListener的run方法和pop方法。
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
监听的事件都是从这个方法add传入的,然后写入到addCh管道中。然后pop方法在select代码块中会获取addCh管道中的数据,第一个循环的时候notification是nil,所以会将nextCh设置为p.nextCh;第二个循环的时候会将数据写入到nextCh中。
当notification不为空的时候是直接将数据存入pendingNotifications缓存中的,取也是从pendingNotifications中读取。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil {
notification = notificationToAdd
nextCh = p.nextCh
} else {
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
再来看run
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
run每秒遍历一次nextCh中的数据,然后根据不同的notification类型执行不同的回调方法,这里会回调到我们在main方法中注册的eventHandler。
8、controller的Run方法
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
//创建Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
//启动Reflector
wg.StartWithChannel(stopCh, r.Run)
//每秒循环调用DeltaFIFO队列的pop方法
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
可见创建Reflector并进行监听,循环调用DeltaFIFO队列的pop方法进行分发
9、启动Reflector
首先调用的是Reflector的run方法
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
最后会调用到Reflector的ListAndWatch方法进行监听获取资源。ListAndWatch代码比较长,主要分为两部分,一部分是List,一部分是Watch。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
//list
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
//根据参数获取pod 列表
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager.PageSize = 0
}
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
//获取资源版本号
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
//将资源数据转换成资源对象列表
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
//将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
//watch
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
//调用clientset客户端api与apiServer建立长连接,监控指定资源的变更
w, err := r.listerWatcher.Watch(options)
if err != nil {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
if utilnet.IsConnectionRefused(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
return err
}
//处理资源的变更事件
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
list部分主要逻辑
- 调用listerWatcher.List方法,获取资源下的所有对象的数据,这个方法会通过api调用到apiServer获取资源列表。
- 调用listMetaInterface.GetResourceVersion获取资源版本号;
- 调用meta.ExtractList方法将资源数据转换成资源对象列表;
- 将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中;
- 最后调用setLastSyncResourceVersion方法更新资源版本号;
watch部分的主要逻辑
- 循环调用clientset客户端api与apiServer建立长连接,监控指定资源的变更,如果监控到有资源变更,那么会调用watchHandler处理资源的变更事件。
watchHandler方法会根据传入的资源类型调用不同的方法转换成不同的Delta然后存入到DeltaFIFO队列中。
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
// 获取资源版本号
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
//将添加资源事件添加到DeltaFIFO队列中
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//将更新资源事件添加到DeltaFIFO队列中
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//将删除资源事件添加到DeltaFIFO队列中
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
10、DeltaFIFO队列任务分发
在启动Reflector之后,就将listandwatch到事件加入到了DeltaFIFO队列中,下面controller的run逻辑中还需要实现DeltaFIFO队列任务分发,也就是最后调用的Until方法。
wait.Until(c.processLoop, time.Second, stopCh)
可见processLoop方法,以1s为周期,周期性的执行。
func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } }}
这里会循环将DeltaFIFO队列中数据pop出队,然后交给Process方法进行处理,Process方法是在上面调用sharedIndexInformer的Run方法的数据设置,设置的方法是sharedIndexInformer的HandleDeltas方法。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
//根据obj的Type类型进行分发
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
//如果缓存中存在该对象
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
//更新indexr
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
//新老对象获取版本号进行比较
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
// 如果缓存中不存在该对象
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
HandleDeltas会与indexer缓存交互更新我们从Delta FIFO中取到的内容,之后通过s.processor.distribute()
进行消息的分发。
在distribute中,sharedProcesser通过listener.add(obj)
向每个listener分发该object。而该函数中又执行了p.addCh <- notification
。
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
这里可以结合上面的p.wg.Start(listener.run)
和p.wg.Start(listener.pop)
方法来进行理解,这里将notification传入到addCh管道之后会触发EventHandler事件,形成了一个完整的循环,也就是我们的informer机制。