这篇文章主要是对client-go中的informer机制的原理做源码解析。
cache
Informer 本身逻辑主要聚合在 cache 包中,cache 包设计了以下几个核心接口或类:
k8s.io/client-go/tools/cache$ tree .
.
├── OWNERS
├── controller.go
├── controller_test.go
├── delta_fifo.go
├── delta_fifo_test.go
├── doc.go
├── expiration_cache.go
├── expiration_cache_fakes.go
├── expiration_cache_test.go
├── fake_custom_store.go
├── fifo.go
├── fifo_test.go
├── heap.go
├── heap_test.go
├── index.go
├── index_test.go
├── listers.go
├── listwatch.go
├── main_test.go
├── mutation_cache.go
├── mutation_detector.go
├── mutation_detector_test.go
├── processor_listener_test.go
├── reflector.go
├── reflector_metrics.go
├── reflector_test.go
├── shared_informer.go
├── shared_informer_test.go
├── store.go
├── store_test.go
├── testing
│ ├── fake_controller_source.go
│ └── fake_controller_source_test.go
├── thread_safe_store.go
├── thread_safe_store_test.go
├── undelta_store.go
└── undelta_store_test.go
1 directory, 36 files
简单概括下:
- ListerWatcher 是对原始接口的封装,包括 List 和 Watch 两个方法,是数据的源头。
- Store 是缓存用的对象存储,包括对对象进行增删改查的基本方法,比如Add, Update, Delete, List, Get 等等,Store 有 Queue 和 Indexer 两个扩展接口。
- Queue 扩展自 Store,在 Store 的基础上增加了 Pop 等几个方法,用于实现先进先出 (FIFO) 队列。在 cache 包中 Queue 被设计用于缓存变更 (Delta),变更就是引发状态改变的事件 (Event)。通过 Queue 将事件产生和事件处理解耦。
- Reflector 是连接上述 ListerWatcher 和 Queue 的桥梁,将原始数据转换为统一的队列,方便控制器处理。
- Indexer 也扩展自 Store,在 Store 的基础上增加了多索引设计,即允许对对象采用多种方式建立索引。在 cache 包中 Store 的作用之一是用于保存当前的资源状态,Indexer 继承了 Store 的这部分功能,在重新同步 (resync) 过程中起重要作用。
- Controller 接口是对某一组控制逻辑的封装,最主要的一个方法是 Run(stopCh <-chan struct{}),一般设计为无限的循环,不断消费队列,直到 stopCh 被关闭。
- Informer指的是一类专门用于实现 Informer 机制的 Controller。
- SharedInformer 是对 Controller 的再次封装,目的是实现多个处理程序对同一个资源的事件响应,从 AddEventHandler 这个方法就可以大致明白 SharedInformer 和 Informer(一类专门用于实现 Informer 机制的 Controller) 的区别。
- SharedIndexInformer 和 SharedInformer 作用是一样的,唯一区别在于 SharedIndexInformer 使用了 Indexer。
下面我们通过对这些接口的源码分析和实现来理解informer是如何才cache中实现的。
ListerWatcher
直接看源码,k8s.io/client-go/tools/cache/listwatch.go
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
可以看到ListerWatcher接口是一个组合接口,包含了Lister和Watcher这两个接口,分别包含List和Watch方法,这两个方法都是与资源类型无关的,操作的都是 runtime.Object 对象,在使用的时候一般需要进行反射或转换为具体的类型。
在这个代码文件中有一个结构体ListWatch实现了ListerWatcher接口,我们通过它来看看对应的基本使用方法。
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
使用实例
package main
import (
"fmt"
"github.com/spongeprojects/magicconch"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
"os"
"os/signal"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func mustClientset() kubernetes.Interface {
kubeconfig := os.Getenv("KUBECONFIG")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
magicconch.Must(err)
clientset, err := kubernetes.NewForConfig(config)
magicconch.Must(err)
return clientset
}
// newConfigMapsListerWatcher 用于创建 tmp namespace 下 configmaps 资源的 ListerWatcher 实例
func newConfigMapsListerWatcher() cache.ListerWatcher {
clientset := mustClientset() // 创建clientset
client := clientset.CoreV1().RESTClient() // 客户端,请求器
resource := "configmaps" // GET 请求参数之一
namespace := "tmp" // GET 请求参数之一
selector := fields.Everything() // GET 请求参数之一
lw := cache.NewListWatchFromClient(client, resource, namespace, selector)
return lw
}
func main() {
//clientset := mustClientset()
//
//namespaces, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
//magicconch.Must(err)
//
//for _, namespace := range namespaces.Items {
// fmt.Println(namespace.Name)
//}
fmt.Println("----- 1-list-watcher -----")
lw := newConfigMapsListerWatcher()
// list 的类型为 runtime.Object, 需要经过反射或类型转换才能使用,
// 传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector
list, err := lw.List(metav1.ListOptions{})
magicconch.Must(err)
// meta 包封装了一些处理 runtime.Object 对象的方法,屏蔽了反射和类型转换的过程,
// 提取出的 items 类型为 []runtime.Object
items, err := meta.ExtractList(list)
magicconch.Must(err)
fmt.Println("Initial list:")
for _, item := range items {
configMap, ok := item.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Println(configMap.Name)
// 如果只关注 meta 信息,也可以不进行类型转换,而是使用 meta.Accessor 方法
// accessor, err := meta.Accessor(item)
// magicconch.Must(err)
// fmt.Println(accessor.GetName())
}
listMetaInterface, err := meta.ListAccessor(list)
magicconch.Must(err)
// resourceVersion 在同步过程中非常重要,看下面它在 Watch 接口中的使用
resourceVersion := listMetaInterface.GetResourceVersion()
// w 的类型为 watch.Interface,提供 ResultChan 方法读取事件,
// 和 List 一样,传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector,
// ResourceVersion 是 Watch 时非常重要的参数,
// 它代表一次客户端与服务器进行交互时对应的资源版本,
// 结合另一个参数 ResourceVersionMatch,表示本次请求对 ResourceVersion 的筛选,
// 比如以下请求表示:获取版本新于 resourceVersion 的事件。
// 在考虑连接中断和定期重新同步 (resync) 的情况下,
// 对 ResourceVersion 的管理就变得更为复杂,我们先不考虑这些情况。
w, err := lw.Watch(metav1.ListOptions{
ResourceVersion: resourceVersion,
})
magicconch.Must(err)
stopCh := make(chan os.Signal)
signal.Notify(stopCh, os.Interrupt)
fmt.Println("Start watching...")
loop:
for {
select {
case <-stopCh:
fmt.Println("Interrupted")
break loop
case event, ok := <-w.ResultChan():
if !ok {
fmt.Println("Broken channel")
break loop
}
configMap, ok := event.Object.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("%s: %s\n", event.Type, configMap.Name)
}
}
}
运行输出
----- 1-list-watcher -----
Initial list:
demo
demo1
Start watching...
DELETED: demo
ADDED: demo
说明
1、创建结构体
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()).
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO())
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
获得带有listFunc和watchFunc函数的ListWatch结构体。
2、list相关资源
调用ListWatch结构体的list方法,其实就是调用刚刚创建结构体中的listFunc方法,通过client获取对应namespace的resource,关于client的get方法我们在之前已经说过。
3、获取resourceVersion
它代表一次客户端与服务器进行交互时对应的资源版本,对于下面的watch有着很重要的作用。
4、watch相关资源
和list一样就是调用了结构体ListWatch中的watchFunc函数,这个函数最后调用了
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here.
if r.err != nil {
return nil, r.err
}
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
isErrRetryableFunc := func(request *http.Request, err error) bool {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return true
}
return false
}
var retryAfter *RetryAfter
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
}
done, transformErr := func() (bool, error) {
defer readAndCloseResponseBody(resp)
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
}
if resp == nil {
// the server must have sent us an error in 'err'
return true, nil
}
if result := r.transformResponse(resp, req); result.err != nil {
return true, result.err
}
return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}()
if done {
if isErrRetryableFunc(req, err) {
return watch.NewEmptyWatch(), nil
}
if err == nil {
// if the server sent us an HTTP Response object,
// we need to return the error object from that.
err = transformErr
}
return nil, err
}
}
}
返回一个watch.Interface类型的接口,其实就是一个事件接口来接收所有的事件。
// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, the implementation will close this channel and
// release any resources used by the watch.
ResultChan() <-chan Event
}
然后就是for循环不断重ResultChan读取event。
在上面代码使用的过程有以下两个问题
- 以上对事件响应是同步的,如果执行复杂的操作会引起阻塞,需要引入队列。
- 以上代码缺少重新连接和重新同步机制,有可能出现数据不一致问题。
Store
Store 是缓存用的对象存储,包括对对象进行增删改查的基本方法,我们可以通过这个接口定义来看
type Store interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{}
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
在对应结构体定义的文件中,有一个结构体实现了这个接口
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}
我们可以cache是有两个成员组成的,其中ThreadSafeStore接口是真正实现了增删改查的接口。
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
// Resync is a no-op and is deprecated
Resync() error
}
也有一个实现的结构体
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
可以看出存储在items中,增删改查也是对这些进行操作。另一个KeyFunc就是获取key的方法。
我们可以通过NewStore创建上面的结构体
Queue
Queue 扩展自 Store,在 Store 的基础上增加了 Pop 等几个方法,用于实现先进先出 (FIFO) 队列。
type Queue interface {
Store
// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
// In the former case Pop atomically picks one key to process,
// removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
// was processed and the result of processing. The PopProcessFunc
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
AddIfNotPresent(interface{}) error
// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool
// Close the queue
Close()
}
可见是基于Store接口之上,创建的时候参数包含了之前创建的Store,新增了Pop,AddIfNotPresent,HasSynced,Close方法。
在fifo文件中,多个结构体实现了这个queue,最常用的就是DeltaFIFO
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas
// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
}
Reflector
Reflector 被设计来解决listwatcher中提到的两个问题。我们直接重实例使用开始看起
package main
import (
"fmt"
"github.com/spongeprojects/magicconch"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"time"
)
// newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储
func newStore() cache.Store {
return cache.NewStore(cache.MetaNamespaceKeyFunc)
}
// newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列,
// 注意在初始化时 store 作为 KnownObjects 参数传入其中,
// 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态,
// 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。
// 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。
func newQueue(store cache.Store) cache.Queue {
return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
KnownObjects: store,
EmitDeltaTypeReplaced: true,
})
}
// newConfigMapsReflector 用于创建一个 cache.Reflector 对象,
// 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。
func newConfigMapsReflector(queue cache.Queue) *cache.Reflector {
lw := newConfigMapsListerWatcher() // 前面有说明
// 第 2 个参数是 expectedType, 用此参数限制进入队列的事件,
// 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用;
// 第 4 个参数是 resyncPeriod,
// 这里传了 0,表示从不重新同步(除非连接超时或者中断),
// 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致,
// 重新同步过程中会产生 SYNC 类型的事件。
return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0)
}
func main() {
fmt.Println("----- 2-reflector -----")
store := newStore()
queue := newQueue(store)
reflector := newConfigMapsReflector(queue)
stopCh := make(chan struct{})
defer close(stopCh)
// reflector 开始运行后,队列中就会推入新收到的事件
go reflector.Run(stopCh)
// 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作,
// 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。
processObj := func(obj interface{}) error {
// 最先收到的事件会被最先处理
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
if _, exists, err := store.Get(d.Object); err == nil && exists {
if err := store.Update(d.Object); err != nil {
return err
}
} else {
if err := store.Add(d.Object); err != nil {
return err
}
}
case cache.Deleted:
if err := store.Delete(d.Object); err != nil {
return err
}
}
configMap, ok := d.Object.(*corev1.ConfigMap)
if !ok {
return fmt.Errorf("not config: %T", d.Object)
}
fmt.Printf("%s: %s\n", d.Type, configMap.Name)
}
return nil
}
fmt.Println("Start syncing...")
// 持续运行直到 stopCh 关闭
wait.Until(func() {
for {
_, err := queue.Pop(processObj)
magicconch.Must(err)
}
}, time.Second, stopCh)
}
运行输出
----- 2-reflector -----
Start syncing...
Replaced: demo1
Replaced: demo
Deleted: demo
Added: demo
说明
1、创建Store结构体存储,具体在上面的Store已经说明。
2、创建Queue结构体,实际上就是创建DeltaFIFO对象,具体上上面的Queue也已经具体说明。
3、使用NewReflector函数创建Reflector结构体。
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
initConnBackoffManager wait.BackoffManager
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call.
paginatedResult bool
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
创建的时候需要上面的listwatch和Queue结构体做为参数。
4、调用Reflector的Run函数,最后会调用到Reflector的ListAndWatch方法进行监听获取资源。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
//list
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
//获取pod列表
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager.PageSize = 0
}
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
//获取resourceVersion
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
//将list转换成资源对象列表
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
//将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
//调用clientset客户端api与apiServer建立长连接,监控指定资源的变更
w, err := r.listerWatcher.Watch(options)
if err != nil {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
return err
}
////处理资源的变更事件
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
<-r.initConnBackoffManager.Backoff().C()
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
我们可以看出ListAndWatch代码会分为两部分,一部分是List,一部分是Watch。
list核心是
- 调用listerWatcher.List方法,获取资源下的所有对象的数据,这个方法会通过api调用到apiServer获取资源列表;
- 调用listMetaInterface.GetResourceVersion获取资源版本号;
- 调用meta.ExtractList方法将资源数据转换成资源对象列表;
- 将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中;
- 最后调用setLastSyncResourceVersion方法更新资源版本号;
前几个部分是不是很熟悉,就是我们在讲解listwatcher的核心步骤,这边就是对哪些使用的基本封装。
再来看watch核心
- 循环调用clientset客户端api与apiServer建立长连接,监控指定资源的变更,如果监控到有资源变更,那么会调用watchHandler处理资源的变更事件。
我们来看看watchHandler如何处理事件?
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
//获取事件
case event, ok := <-w.ResultChan():
//错误处理
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
//获取esourceVersion
newResourceVersion := meta.GetResourceVersion()
//根据事件类型做处理
switch event.Type {
//将添加资源事件添加到DeltaFIFO队列中
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//将更新资源事件添加到DeltaFIFO队列中
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//将删除资源事件添加到DeltaFIFO队列中
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
watchHandler方法会根据传入的资源类型调用不同的方法转换成不同的Delta然后存入到DeltaFIFO队列中。避免了阻塞问题,但对队列的消费仍然是同步的,需要再实现多 worker 才能提高效率,这一部分实现不包含在 cache 包中
Reflector 是保证 Informer 可靠性的核心组件,在丢失事件,收到异常事件,处理事件失败等多种异常情况下需要考虑的细节很多,感兴趣可以深入阅读源码的实现细节。
Controller
Controller对上面的代码进行了封装,我们也通过实例来看一下,一些创建函数沿用上面的实例。
package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
func newController() cache.Controller {
fmt.Println("----- 3-controller -----")
lw := newConfigMapsListerWatcher()
store := newStore()
queue := newQueue(store)
cfg := &cache.Config{
Queue: queue,
ListerWatcher: lw,
ObjectType: &corev1.ConfigMap{},
FullResyncPeriod: 0,
RetryOnError: false,
Process: func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
if _, exists, err := store.Get(d.Object); err == nil && exists {
if err := store.Update(d.Object); err != nil {
return err
}
} else {
if err := store.Add(d.Object); err != nil {
return err
}
}
case cache.Deleted:
if err := store.Delete(d.Object); err != nil {
return err
}
}
configMap, ok := d.Object.(*corev1.ConfigMap)
if !ok {
return fmt.Errorf("not config: %T", d.Object)
}
fmt.Printf("%s: %s\n", d.Type, configMap.Name)
}
return nil
},
}
return cache.New(cfg)
}
func main() {
controller := newController()
stopCh := make(chan struct{})
defer close(stopCh)
fmt.Println("Start syncing....")
go controller.Run(stopCh)
<-stopCh
}
运行结果
说明
1、创建了Config结构体。
// Config contains all the settings for one of these low-level controllers.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
Queue
// Something that can list and watch your objects.
ListerWatcher
// Something that can process a popped Deltas.
Process ProcessFunc
// ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
ObjectType runtime.Object
// FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration
// ShouldResync is periodically used by the reflector to determine
// whether to Resync the Queue. If ShouldResync is `nil` or
// returns true, it means the reflector should proceed with the
// resync.
ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
RetryOnError bool
// Called whenever the ListAndWatch drops the connection with an error.
WatchErrorHandler WatchErrorHandler
// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
}
可以看到基本成员就是我们之前创建的Queue,ListerWatcher等参数。
2、基于config调用cache的New函数创建Controller接口。
type Controller interface {
// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue
HasSynced() bool
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string
}
我们来看New函数,很简单,就是根据config创建了一个实现Controller接口的结构体controller。
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
3、然后调用controller的Run函数。
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
这个函数有没有很熟悉,创建Reflector,然后调用对应的Run函数,是不是就是我们在Reflector中解析的步骤,所以说Controller只是对Reflector进行了封装。
Informer
Informer指的是一类专门用于实现 Informer 机制的 Controller,其实也是对controller的封装,主要封装了转化的逻辑(obj 由 cache.Deltas 变成了 *corev1.ConfigMap)。
我们还是看实例,关于Informer的实例我们还是经常看见的,不过正常都是SharedInformer。
package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
func main() {
fmt.Println("----- 4-informer -----")
lw := newConfigMapsListerWatcher()
// 第一个返回的参数是 cache.Store,这里暂时用不到所以直接丢弃
_, controller := cache.NewInformer(lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("created: %s\n", configMap.Name)
},
UpdateFunc: func(old, new interface{}) {
configMap, ok := old.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("updated: %s\n", configMap.Name)
},
DeleteFunc: func(obj interface{}) {
configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("deleted: %s\n", configMap.Name)
},
})
stopCh := make(chan struct{})
defer close(stopCh)
fmt.Println("Start syncing....")
go controller.Run(stopCh)
<-stopCh
}
运行结果
----- 4-informer -----
Start syncing....
created: demo
created: demo1
deleted: demo
created: demo
说明
1、创建Indexer, Controller两个接口
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
我们可以看到NewInformer和controller在一个文件中,最后返回的都是Controller接口,所以Informer是一种controller。
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
最后还是调用controller的New函数创建了一个controller,这个过程其实就是上面我们创建controller的过程,informer直接对其进行了封装转化,我们只要关心handler就可以了。
2、调用controller的Run函数就和上面一样的逻辑了。
SharedInformer
在上面的 Informer 设计当中,处理程序作为一个参数传入 NewInformer,如果有另一个处理程序需要处理相同资源,需要另外创建一个 Informer 对象,而队列是不能复用的,因为队列不支持两个消费者同时消费,为了解决这个问题,cache 包中又设计了 SharedInformer,顾名思义,就是多个处理程序共享的 Informer
package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
func main() {
fmt.Println("----- 5-shared-informer -----")
lw := newConfigMapsListerWatcher()
sharedInformer := cache.NewSharedInformer(lw, &corev1.ConfigMap{}, 0)
// 添加一个处理程序
sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("created, printing namespace: %s\n", configMap.Namespace)
},
})
// 添加另一个处理程序
sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("created, printing name: %s\n", configMap.Name)
},
})
stopCh := make(chan struct{})
defer close(stopCh)
fmt.Println("Start syncing....")
go sharedInformer.Run(stopCh)
<-stopCh
}
这就是一个我们最常见的informer的使用了,运行结果
----- 5-shared-informer -----
Start syncing....
created, printing namespace: tmp
created, printing namespace: tmp
created, printing name: demo
created, printing name: demo1
说明
1、创建SharedInformer接口
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler(handler ResourceEventHandler)
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means
// this handler does not care about resyncs. The resync operation
// consists of delivering to the handler an update notification
// for every object in the informer's local cache; it does not add
// any interactions with the authoritative storage. Some
// informers do no resyncs at all, not even for handlers added
// with a non-zero resyncPeriod. For an informer that does
// resyncs, and for each handler that requests resyncs, that
// informer develops a nominal resync period that is no shorter
// than the requested period but may be longer. The actual time
// between any two resyncs may be longer than the nominal period
// because the implementation takes time to do work and there may
// be competing load and scheduling noise.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the informer's local cache as a Store.
GetStore() Store
// GetController is deprecated, it does nothing useful
GetController() Controller
// Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed.
Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// There's only one handler, so if you call this multiple times, last one
// wins; calling after the informer has been started returns an error.
//
// The handler is intended for visibility, not to e.g. pause the consumers.
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error
}
使用NewSharedInformer创建
// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
创建了一个实现SharedIndexInformer接口的结构体sharedIndexInformer。
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
我们再来看一下接口SharedIndexInformer,是基于SharedInformer接口之上新增了Index。
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
2、然后调用了sharedIndexInformer的AddEventHandler函数来新增处理程序,所有事件会被广播给每一个注册的处理程序。
3、最后调用sharedIndexInformer的Run函数
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash()
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
也就是上面的controller的Run的封装。
SharedIndexInformer
在前面使用的 Reflector, Informer 和 SharedInformer 中,在实现事件响应的同时,我们还维护了一份当前最新的资源状态,就是代码中用到的 Store。在 Reflector 演示代码中 Store 由我们自己构建并传入 NewReflector,在 Informer 演示代码中 Store 对象由 NewInformer 返回,在 SharedInformer 演示代码中 Store 对象也可以通过 GetStore 方法获取。 这份最新的资源状态是非常有用的,在很多控制逻辑中,除了事件本身涉及的资源外,还需要关心其他资源的状态,就需要用到 Store 对象。 但是,Store 对象只实现了基本的增删改查功能,就 “查” 而言,只实现了 Get, GetByKey, List, ListKeys 4 个方法,对象的 “key” 是用构建 Store 时指定的 KeyFunc 计算得出的,绝大多数情况下使用的都是 cache.MetaNamespaceKeyFunc 这个方法
这意味着我们可以
- 根据 namespace 加 name 查找某一个资源;
- 列出所有资源。
那么问题来了,如果我想
- 根据 MetaNamespace 之外的某个字段查找资源,比如 label, annotation, status 等等;
- 根据条件列出部分资源,比如某个 namespace 下的资源,就没有办法通过 Store 做到,只能用 ListerWatcher 直接调用接口,没有本地缓存。
Indexer 就是设计来解决这个问题,功能如其名,就是用来建立索引,我们还是通过实例。
package main
import (
"fmt"
"github.com/spongeprojects/magicconch"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
func main() {
fmt.Println("----- 6-indexer -----")
lw := newConfigMapsListerWatcher()
indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
// 仅演示用,只关心 indexer,不处理事件,所以传一个空的 HandlerFunc,
// 实际使用中一般不会这样做
indexer, informer := cache.NewIndexerInformer(
lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{}, indexers)
stopCh := make(chan struct{})
defer close(stopCh)
fmt.Println("Start syncing....")
go informer.Run(stopCh)
// 在 informer 首次同步完成后再操作
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
panic("timed out waiting for caches to sync")
}
// 获取 cache.NamespaceIndex 索引下,索引值为 "tmp" 中的所有键
keys, err := indexer.IndexKeys(cache.NamespaceIndex, "tmp")
magicconch.Must(err)
for _, k := range keys {
fmt.Println(k)
}
}
创建SharedIndexInformer的接口我们上面说过了,这边的运行情况
----- 6-indexer -----
Start syncing....
tmp/demo
tmp/demo1
说明
1、cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} 是 informers 包中默认的索引,通过这个索引可以根据 namespace 列出资源
informers
在上一部分的cache包中,我们知道有五个创建informer的函数
- New
- NewInformer
- NewIndexerInformer
- NewSharedInformer
- NewSharedIndexInformer
它们有着不同程度的抽象和封装,NewSharedIndexInformer 是其中抽象程度最低,封装程度最高的一个,但即使是 NewSharedIndexInformer,也没有封装具体的资源类型,需要接收 ListerWatcher 和 Indexers 作为参数,我们需要先构建 ListWatcher 和 Indexers。
这样的封装明显还不够好,informers 包采用了工厂模式:为每种内置的资源类型创建对应的 Informer 工厂类,要使用某种资源 Informer 的时候直接使用工厂类构建,其实着就是我们最常用的方式,详细的讲解我们在上一篇k8s-informer使用已经说过。
应用
其实informer在k8s中也是经常使用的,几乎每个组件中用到list-watch机制,我们最常使用的就是开发crd的控制器的时候使用,当然这块代码都是自动生成的,但是我们也要知道其作用和原理,详细说明看k8s控制器这篇文章。