client-go中的informer机制可以说是客户端交互中一种比较特别的机制,适用于像k8s这种控制器的各种模式。

informer

说完了client-go中的client的直接使用,下面我们来看看一种在k8s中很常用但是相对client又很独特的用法informer,client-go中区别其他语言的client的一个较为牛逼的设计就在于informer机制,其他语言很多都直接通过rest api同kubernetes交互,虽然在少量的情况下是没有问题的,但是在大量的连接下会出现各种问题,例如连接太多,状态不同步等等问题,client-go作为在kubernetes源码中都有很多应用的库,在解决这些问题上提供了一个高端的做法informer机制。

informer在初始化的时先通过reflector List去从Kubernetes API中取出资源的全部object对象,并同时缓存,然后开启Watch的机制去监控资源,并放入DeltaFIFO中.这样每次获取资源不需要再去kubernetes实时获取,而是通过本地的indexer缓存得到对象,整体连接资源的使用率都降低了不少。

Informer还提供了handler机制,需要提供ResourceEventHandler接口的实现来响应OnAdd/OnUpdate/OnDelete事件,下面详细说明。

informer提供了内置的原生的资源的支持,如果是自定义的crd资源,需要自己实现对应的控制器,我们实现的控制器也使用了informer机制。

架构图

我们先来看一下informer的实现架构图

根据架构图我们可以看出Informer主要有两个作用:

  1. 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;
  2. 注册相应的事件,之后如果监听到的事件变化就会调用事件对应的EventHandler,实现回调。

基本使用和源码解析

我们先来看一下informer源码的目录结构,在client-go中有一个专门的informer目录来实现informer。

[root@dev1 informers]# pwd
/root/go/pkg/mod/k8s.io/client-go@v0.21.3/informers
[root@dev1 informers]# tree .
.
├── admissionregistration
│   ├── interface.go
│   ├── v1
│   │   ├── interface.go
│   │   ├── mutatingwebhookconfiguration.go
│   │   └── validatingwebhookconfiguration.go
│   └── v1beta1
│       ├── interface.go
│       ├── mutatingwebhookconfiguration.go
│       └── validatingwebhookconfiguration.go
├── apiserverinternal
│   ├── interface.go
│   └── v1alpha1
│       ├── interface.go
│       └── storageversion.go
├── apps
│   ├── interface.go
│   ├── v1
│   │   ├── controllerrevision.go
│   │   ├── daemonset.go
│   │   ├── deployment.go
│   │   ├── interface.go
│   │   ├── replicaset.go
│   │   └── statefulset.go
│   ├── v1beta1
│   │   ├── controllerrevision.go
│   │   ├── deployment.go
│   │   ├── interface.go
│   │   └── statefulset.go
│   └── v1beta2
│       ├── controllerrevision.go
│       ├── daemonset.go
│       ├── deployment.go
│       ├── interface.go
│       ├── replicaset.go
│       └── statefulset.go
├── autoscaling
│   ├── interface.go
│   ├── v1
│   │   ├── horizontalpodautoscaler.go
│   │   └── interface.go
│   ├── v2beta1
│   │   ├── horizontalpodautoscaler.go
│   │   └── interface.go
│   └── v2beta2
│       ├── horizontalpodautoscaler.go
│       └── interface.go
├── batch
│   ├── interface.go
│   ├── v1
│   │   ├── cronjob.go
│   │   ├── interface.go
│   │   └── job.go
│   └── v1beta1
│       ├── cronjob.go
│       └── interface.go
├── certificates
│   ├── interface.go
│   ├── v1
│   │   ├── certificatesigningrequest.go
│   │   └── interface.go
│   └── v1beta1
│       ├── certificatesigningrequest.go
│       └── interface.go
├── coordination
│   ├── interface.go
│   ├── v1
│   │   ├── interface.go
│   │   └── lease.go
│   └── v1beta1
│       ├── interface.go
│       └── lease.go
├── core
│   ├── interface.go
│   └── v1
│       ├── componentstatus.go
│       ├── configmap.go
│       ├── endpoints.go
│       ├── event.go
│       ├── interface.go
│       ├── limitrange.go
│       ├── namespace.go
│       ├── node.go
│       ├── persistentvolumeclaim.go
│       ├── persistentvolume.go
│       ├── pod.go
│       ├── podtemplate.go
│       ├── replicationcontroller.go
│       ├── resourcequota.go
│       ├── secret.go
│       ├── serviceaccount.go
│       └── service.go
├── discovery
│   ├── interface.go
│   ├── v1
│   │   ├── endpointslice.go
│   │   └── interface.go
│   └── v1beta1
│       ├── endpointslice.go
│       └── interface.go
├── events
│   ├── interface.go
│   ├── v1
│   │   ├── event.go
│   │   └── interface.go
│   └── v1beta1
│       ├── event.go
│       └── interface.go
├── extensions
│   ├── interface.go
│   └── v1beta1
│       ├── daemonset.go
│       ├── deployment.go
│       ├── ingress.go
│       ├── interface.go
│       ├── networkpolicy.go
│       ├── podsecuritypolicy.go
│       └── replicaset.go
├── factory.go
├── flowcontrol
│   ├── interface.go
│   ├── v1alpha1
│   │   ├── flowschema.go
│   │   ├── interface.go
│   │   └── prioritylevelconfiguration.go
│   └── v1beta1
│       ├── flowschema.go
│       ├── interface.go
│       └── prioritylevelconfiguration.go
├── generic.go
├── internalinterfaces
│   └── factory_interfaces.go
├── networking
│   ├── interface.go
│   ├── v1
│   │   ├── ingressclass.go
│   │   ├── ingress.go
│   │   ├── interface.go
│   │   └── networkpolicy.go
│   └── v1beta1
│       ├── ingressclass.go
│       ├── ingress.go
│       └── interface.go
├── node
│   ├── interface.go
│   ├── v1
│   │   ├── interface.go
│   │   └── runtimeclass.go
│   ├── v1alpha1
│   │   ├── interface.go
│   │   └── runtimeclass.go
│   └── v1beta1
│       ├── interface.go
│       └── runtimeclass.go
├── policy
│   ├── interface.go
│   ├── v1
│   │   ├── interface.go
│   │   └── poddisruptionbudget.go
│   └── v1beta1
│       ├── interface.go
│       ├── poddisruptionbudget.go
│       └── podsecuritypolicy.go
├── rbac
│   ├── interface.go
│   ├── v1
│   │   ├── clusterrolebinding.go
│   │   ├── clusterrole.go
│   │   ├── interface.go
│   │   ├── rolebinding.go
│   │   └── role.go
│   ├── v1alpha1
│   │   ├── clusterrolebinding.go
│   │   ├── clusterrole.go
│   │   ├── interface.go
│   │   ├── rolebinding.go
│   │   └── role.go
│   └── v1beta1
│       ├── clusterrolebinding.go
│       ├── clusterrole.go
│       ├── interface.go
│       ├── rolebinding.go
│       └── role.go
├── scheduling
│   ├── interface.go
│   ├── v1
│   │   ├── interface.go
│   │   └── priorityclass.go
│   ├── v1alpha1
│   │   ├── interface.go
│   │   └── priorityclass.go
│   └── v1beta1
│       ├── interface.go
│       └── priorityclass.go
└── storage
    ├── interface.go
    ├── v1
    │   ├── csidriver.go
    │   ├── csinode.go
    │   ├── interface.go
    │   ├── storageclass.go
    │   └── volumeattachment.go
    ├── v1alpha1
    │   ├── csistoragecapacity.go
    │   ├── interface.go
    │   └── volumeattachment.go
    └── v1beta1
        ├── csidriver.go
        ├── csinode.go
        ├── csistoragecapacity.go
        ├── interface.go
        ├── storageclass.go
        └── volumeattachment.go

58 directories, 155 files

可以看到每一个资源都有一个对应的目录,每个目录下都定义了对应的资源接口。

我们通过一个实例来看informer机制的实现。

package main

import (
	"flag"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"log"
	"path/filepath"
	"time"
)

func main() {
	var kubeconfig *string
	//如果是windows,那么会读取C:\Users\xxx\.kube\config 下面的配置文件
	//如果是linux,那么会读取~/.kube/config下面的配置文件
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()

	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err)
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	stopCh := make(chan struct{})
	defer close(stopCh)
	//表示每分钟进行一次resync,resync会周期性地执行List操作
	sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)

	informer := sharedInformers.Core().V1().Pods().Informer()

	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New Pod Added to Store: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("Pod Deleted from Store: %s", mObj.GetName())
		},
	})

	informer.Run(stopCh)
}

运行

2020/10/17 15:13:10 New Pod Added to Store: dns-test
2020/10/17 15:13:10 New Pod Added to Store: web-1
2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph
2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2
2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k
2020/10/17 15:13:10 New Pod Added to Store: mongodb
2020/10/17 15:13:10 New Pod Added to Store: web-0
....

分析

1、初始化

首先clientset我们在上一篇已经讲解过了,可见informer也是基于clientset的,我们直接看informer的初始化NewSharedInformerFactory

// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

可以看到NewSharedInformerFactory方法最终会调用到NewSharedInformerFactoryWithOptions初始化一个sharedInformerFactory,在初始化的时候会初始化一个informers,用来缓存不同类型的informer。

2、创建具体类型的informer

我们来看不同类型的初始化

podInformer := sharedInformers.Core().V1().Pods().Informer()
//nodeInformer := sharedInformers.Node().V1beta1().RuntimeClasses().Informer()

可以看到不同的函数得到的是不同的结构体的informer,比如上面的pod和node。我来追踪一下对应的函数

func (f *sharedInformerFactory) Core() core.Interface {
	return core.New(f, f.namespace, f.tweakListOptions)
}

type group struct {
	factory          internalinterfaces.SharedInformerFactory
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
	return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {
	return v1.New(g.factory, g.namespace, g.tweakListOptions)
}
type version struct {
	factory          internalinterfaces.SharedInformerFactory
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
}
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
	return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
	return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
type podInformer struct {
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	namespace        string
}

func (f *podInformer) Informer() cache.SharedIndexInformer {
	//使用pod的结构体和defaultInformer方法来创建
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()
	//获取informer类型,这边就是pod
	informerType := reflect.TypeOf(obj)
	//查找map缓存,如果存在,那么直接返回
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}
	//根据类型查找resync的周期
	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}
	//调用defaultInformer方法创建informer,并且存储起来
	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

InformerFor方法里面首先会去sharedInformerFactory的map缓存中根据类型查找对应的informer,如果存在那么直接返回,如果不存在,那么则会调用newFunc方法创建informer,然后设置到informers缓存中。

3、创建informer

调用InformerFor方法的时候会传入defaultInformer方法用于创建informer。我们来看看defaultInformer方法

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				//调用client获取pod列表
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				//调用client监控pod列表
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

这里是真正的创建一个informer,并注册了List&Watch的回调函数。

4、index

sharedIndexInformer里面会创建sharedProcessor,设置List&Watch的回调函数,创建了一个indexer,我们这里看一下NewIndexer是怎么创建indexer的:

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
}

func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if d, ok := obj.(DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return MetaNamespaceKeyFunc(obj)
}

NewIndexer方法创建了一个cache,它的keyFunc是DeletionHandlingMetaNamespaceKeyFunc,即接受一个object,生成它的namepace/name的字符串。cache里面的数据会存放到cacheStorage中,它是一个threadSafeMap用来存储资源对象并自带索引功能的本地存储。

5、注册事件

上面的初始化已经结束,我们再继续回到实例往下看,通过上面创建的informer的AddEventHandler函数来注册事件函数,我们先来看一下AddEventHandler函数。

func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}


func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	s.startedLock.Lock()
	defer s.startedLock.Unlock()

	if s.stopped {
		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
		return
	}

	if resyncPeriod > 0 {
		if resyncPeriod < minimumResyncPeriod {
			klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
			resyncPeriod = minimumResyncPeriod
		}

		if resyncPeriod < s.resyncCheckPeriod {
			if s.started {
				klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
				resyncPeriod = s.resyncCheckPeriod
			} else {
				// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
				// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
				// accordingly
				s.resyncCheckPeriod = resyncPeriod
				s.processor.resyncCheckPeriodChanged(resyncPeriod)
			}
		}
	}

	//初始化listener
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

	//如果informer还没启动,那么直接将监听器加入到processor监听器列表中
	if !s.started {
		s.processor.addListener(listener)
		return
	}

	// in order to safely join, we have to
	// 1. stop sending add/update/delete notifications
	// 2. do a list against the store
	// 3. send synthetic "Add" events to the new handler
	// 4. unblock
	//如果informer已经启动,那么需要加锁
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	s.processor.addListener(listener)
	//然后将indexer中缓存的数据写入到listener中
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}


func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

先调用newProcessListener初始化listener,接着会校验informer是否已经启动,如果没有启动,那么直接将监听器加入到processor监听器列表中并返回;如果informer已经启动,那么需要加锁将监听器加入到processor监听器列表中,然后将indexer中缓存的数据写入到listener中。

6、run

前期做了这么多工作,最后我们会在实例中的最后调用了informer的run函数来启动informer模块。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	//初始化DeltaFIFO队列
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		//设置Queue为DeltaFIFO队列
		Queue:            fifo,
		//设置List&Watch的回调函数
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		//设置Resync周期
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		//判断有哪些监听器到期需要被Resync
		ShouldResync:     s.processor.shouldResync,

		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		//异步创建controller
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	//调用run方法启动processor
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	//启动controller
	s.controller.Run(stopCh)
}

核心流程如下

  • 调用NewDeltaFIFOWithOptions方法初始化DeltaFIFO队列;
  • 初始化Config结果体,作为创建controller的参数;
  • 异步创建controller;
  • 调用run方法启动processor;
  • 调用run方法启动controller;

7、processor的run

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		//遍历监听器
		for _, listener := range p.listeners {
			//下面两个方法是核心的事件call back的方法
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可见同时调用了processorListener的run方法和pop方法。

func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

监听的事件都是从这个方法add传入的,然后写入到addCh管道中。然后pop方法在select代码块中会获取addCh管道中的数据,第一个循环的时候notification是nil,所以会将nextCh设置为p.nextCh;第二个循环的时候会将数据写入到nextCh中。

当notification不为空的时候是直接将数据存入pendingNotifications缓存中的,取也是从pendingNotifications中读取。

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil {
				notification = notificationToAdd
				nextCh = p.nextCh
			} else {
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

再来看run

func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

run每秒遍历一次nextCh中的数据,然后根据不同的notification类型执行不同的回调方法,这里会回调到我们在main方法中注册的eventHandler。

8、controller的Run方法

// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	//创建Reflector
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	//启动Reflector
	wg.StartWithChannel(stopCh, r.Run)

	//每秒循环调用DeltaFIFO队列的pop方法
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

可见创建Reflector并进行监听,循环调用DeltaFIFO队列的pop方法进行分发

9、启动Reflector

首先调用的是Reflector的run方法

func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

最后会调用到Reflector的ListAndWatch方法进行监听获取资源。ListAndWatch代码比较长,主要分为两部分,一部分是List,一部分是Watch。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string

	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	//list
	if err := func() error {
		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
		defer initTrace.LogIfLong(10 * time.Second)
		var list runtime.Object
		var paginatedResult bool
		var err error
		listCh := make(chan struct{}, 1)
		panicCh := make(chan interface{}, 1)
		go func() {
			defer func() {
				if r := recover(); r != nil {
					panicCh <- r
				}
			}()
			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
			// list request will return the full response.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				//根据参数获取pod 列表
				return r.listerWatcher.List(opts)
			}))
			switch {
			case r.WatchListPageSize != 0:
				pager.PageSize = r.WatchListPageSize
			case r.paginatedResult:
				// We got a paginated result initially. Assume this resource and server honor
				// paging requests (i.e. watch cache is probably disabled) and leave the default
				// pager size set.
			case options.ResourceVersion != "" && options.ResourceVersion != "0":
				// User didn't explicitly request pagination.
				//
				// With ResourceVersion != "", we have a possibility to list from watch cache,
				// but we do that (for ResourceVersion != "0") only if Limit is unset.
				// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
				// switch off pagination to force listing from watch cache (if enabled).
				// With the existing semantic of RV (result is at least as fresh as provided RV),
				// this is correct and doesn't lead to going back in time.
				//
				// We also don't turn off pagination for ResourceVersion="0", since watch cache
				// is ignoring Limit in that case anyway, and if watch cache is not enabled
				// we don't introduce regression.
				pager.PageSize = 0
			}

			list, paginatedResult, err = pager.List(context.Background(), options)
			if isExpiredError(err) || isTooLargeResourceVersionError(err) {
				r.setIsLastSyncResourceVersionUnavailable(true)
				// Retry immediately if the resource version used to list is unavailable.
				// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
				// continuation pages, but the pager might not be enabled, the full list might fail because the
				// resource version it is listing at is expired or the cache may not yet be synced to the provided
				// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
				// the reflector makes forward progress.
				list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
			}
			close(listCh)
		}()
		select {
		case <-stopCh:
			return nil
		case r := <-panicCh:
			panic(r)
		case <-listCh:
		}
		if err != nil {
			return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
		}

		// We check if the list was paginated and if so set the paginatedResult based on that.
		// However, we want to do that only for the initial list (which is the only case
		// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
		// situations we may force listing directly from etcd (by setting ResourceVersion="")
		// which will return paginated result, even if watch cache is enabled. However, in
		// that case, we still want to prefer sending requests to watch cache if possible.
		//
		// Paginated result returned for request with ResourceVersion="0" mean that watch
		// cache is disabled and there are a lot of objects of a given type. In such case,
		// there is no need to prefer listing from watch cache.
		if options.ResourceVersion == "0" && paginatedResult {
			r.paginatedResult = true
		}

		r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
		initTrace.Step("Objects listed")
		listMetaInterface, err := meta.ListAccessor(list)
		if err != nil {
			return fmt.Errorf("unable to understand list result %#v: %v", list, err)
		}
		//获取资源版本号
		resourceVersion = listMetaInterface.GetResourceVersion()
		initTrace.Step("Resource version extracted")
		//将资源数据转换成资源对象列表
		items, err := meta.ExtractList(list)
		if err != nil {
			return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
		}
		initTrace.Step("Objects extracted")
		//将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中
		if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("unable to sync list result: %v", err)
		}
		initTrace.Step("SyncWith done")
		r.setLastSyncResourceVersion(resourceVersion)
		initTrace.Step("Resource version updated")
		return nil
	}(); err != nil {
		return err
	}

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
				if err := r.store.Resync(); err != nil {
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()

	//watch
	for {
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			// We want to avoid situations of hanging watchers. Stop any wachers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}

		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
		start := r.clock.Now()
		//调用clientset客户端api与apiServer建立长连接,监控指定资源的变更
		w, err := r.listerWatcher.Watch(options)
		if err != nil {
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
			// It doesn't make sense to re-list all objects because most likely we will be able to restart
			// watch where we ended.
			// If that's the case begin exponentially backing off and resend watch request.
			if utilnet.IsConnectionRefused(err) {
				<-r.initConnBackoffManager.Backoff().C()
				continue
			}
			return err
		}

		//处理资源的变更事件
		if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				switch {
				case isExpiredError(err):
					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
					// has a semantic that it returns data at least as fresh as provided RV.
					// So first try to LIST with setting RV to resource version of last observed object.
					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}

list部分主要逻辑

  • 调用listerWatcher.List方法,获取资源下的所有对象的数据,这个方法会通过api调用到apiServer获取资源列表。
  • 调用listMetaInterface.GetResourceVersion获取资源版本号;
  • 调用meta.ExtractList方法将资源数据转换成资源对象列表;
  • 将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中;
  • 最后调用setLastSyncResourceVersion方法更新资源版本号;

watch部分的主要逻辑

  • 循环调用clientset客户端api与apiServer建立长连接,监控指定资源的变更,如果监控到有资源变更,那么会调用watchHandler处理资源的变更事件。

watchHandler方法会根据传入的资源类型调用不同的方法转换成不同的Delta然后存入到DeltaFIFO队列中。

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrors.FromObject(event.Object)
			}
			if r.expectedType != nil {
				if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
					continue
				}
			}
			if r.expectedGVK != nil {
				if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
					continue
				}
			}
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
				continue
			}
			// 获取资源版本号
			newResourceVersion := meta.GetResourceVersion()
			switch event.Type {
			//将添加资源事件添加到DeltaFIFO队列中
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			//将更新资源事件添加到DeltaFIFO队列中
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			//将删除资源事件添加到DeltaFIFO队列中
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			if rvu, ok := r.store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(newResourceVersion)
			}
			eventCount++
		}
	}

	watchDuration := r.clock.Since(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
	return nil
}

10、DeltaFIFO队列任务分发

在启动Reflector之后,就将listandwatch到事件加入到了DeltaFIFO队列中,下面controller的run逻辑中还需要实现DeltaFIFO队列任务分发,也就是最后调用的Until方法。

wait.Until(c.processLoop, time.Second, stopCh)

可见processLoop方法,以1s为周期,周期性的执行。

func (c *controller) processLoop() {	for {		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))		if err != nil {			if err == ErrFIFOClosed {				return			}			if c.config.RetryOnError {				// This is the safe way to re-enqueue.				c.config.Queue.AddIfNotPresent(obj)			}		}	}}

这里会循环将DeltaFIFO队列中数据pop出队,然后交给Process方法进行处理,Process方法是在上面调用sharedIndexInformer的Run方法的数据设置,设置的方法是sharedIndexInformer的HandleDeltas方法。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	//根据obj的Type类型进行分发
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			//如果缓存中存在该对象
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				//更新indexr
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					// Sync events are only propagated to listeners that requested resync
					isSync = true
				case d.Type == Replaced:
					//新老对象获取版本号进行比较
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							// Replaced events that didn't change resourceVersion are treated as resync events
							// and only propagated to listeners that requested resync
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			//	如果缓存中不存在该对象
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

HandleDeltas会与indexer缓存交互更新我们从Delta FIFO中取到的内容,之后通过s.processor.distribute()进行消息的分发。

在distribute中,sharedProcesser通过listener.add(obj)向每个listener分发该object。而该函数中又执行了p.addCh <- notification

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

这里可以结合上面的p.wg.Start(listener.run)p.wg.Start(listener.pop)方法来进行理解,这里将notification传入到addCh管道之后会触发EventHandler事件,形成了一个完整的循环,也就是我们的informer机制。

总结