这篇文章主要是对client-go中的informer机制的原理做源码解析。

cache

Informer 本身逻辑主要聚合在 cache 包中,cache 包设计了以下几个核心接口或类:

k8s.io/client-go/tools/cache$ tree .
.
├── OWNERS
├── controller.go
├── controller_test.go
├── delta_fifo.go
├── delta_fifo_test.go
├── doc.go
├── expiration_cache.go
├── expiration_cache_fakes.go
├── expiration_cache_test.go
├── fake_custom_store.go
├── fifo.go
├── fifo_test.go
├── heap.go
├── heap_test.go
├── index.go
├── index_test.go
├── listers.go
├── listwatch.go
├── main_test.go
├── mutation_cache.go
├── mutation_detector.go
├── mutation_detector_test.go
├── processor_listener_test.go
├── reflector.go
├── reflector_metrics.go
├── reflector_test.go
├── shared_informer.go
├── shared_informer_test.go
├── store.go
├── store_test.go
├── testing
│   ├── fake_controller_source.go
│   └── fake_controller_source_test.go
├── thread_safe_store.go
├── thread_safe_store_test.go
├── undelta_store.go
└── undelta_store_test.go

1 directory, 36 files

简单概括下:

  • ListerWatcher 是对原始接口的封装,包括 List 和 Watch 两个方法,是数据的源头。
  • Store 是缓存用的对象存储,包括对对象进行增删改查的基本方法,比如Add, Update, Delete, List, Get 等等,Store 有 Queue 和 Indexer 两个扩展接口。
  • Queue 扩展自 Store,在 Store 的基础上增加了 Pop 等几个方法,用于实现先进先出 (FIFO) 队列。在 cache 包中 Queue 被设计用于缓存变更 (Delta),变更就是引发状态改变的事件 (Event)。通过 Queue 将事件产生和事件处理解耦。
  • Reflector 是连接上述 ListerWatcher 和 Queue 的桥梁,将原始数据转换为统一的队列,方便控制器处理。
  • Indexer 也扩展自 Store,在 Store 的基础上增加了多索引设计,即允许对对象采用多种方式建立索引。在 cache 包中 Store 的作用之一是用于保存当前的资源状态,Indexer 继承了 Store 的这部分功能,在重新同步 (resync) 过程中起重要作用。
  • Controller 接口是对某一组控制逻辑的封装,最主要的一个方法是 Run(stopCh <-chan struct{}),一般设计为无限的循环,不断消费队列,直到 stopCh 被关闭。
  • Informer指的是一类专门用于实现 Informer 机制的 Controller。
  • SharedInformer 是对 Controller 的再次封装,目的是实现多个处理程序对同一个资源的事件响应,从 AddEventHandler 这个方法就可以大致明白 SharedInformer 和 Informer(一类专门用于实现 Informer 机制的 Controller) 的区别。
  • SharedIndexInformer 和 SharedInformer 作用是一样的,唯一区别在于 SharedIndexInformer 使用了 Indexer。

下面我们通过对这些接口的源码分析和实现来理解informer是如何才cache中实现的。

ListerWatcher

直接看源码,k8s.io/client-go/tools/cache/listwatch.go

// Lister is any object that knows how to perform an initial list.
type Lister interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
}

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
    Lister
    Watcher
}

可以看到ListerWatcher接口是一个组合接口,包含了Lister和Watcher这两个接口,分别包含List和Watch方法,这两个方法都是与资源类型无关的,操作的都是 runtime.Object 对象,在使用的时候一般需要进行反射或转换为具体的类型。

在这个代码文件中有一个结构体ListWatch实现了ListerWatcher接口,我们通过它来看看对应的基本使用方法。

type ListWatch struct {
    ListFunc  ListFunc
    WatchFunc WatchFunc
    // DisableChunking requests no chunking for this list watcher.
    DisableChunking bool
}

// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

使用实例

package main

import (
    "fmt"
    "github.com/spongeprojects/magicconch"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/meta"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/client-go/tools/cache"
    "os"
    "os/signal"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func mustClientset() kubernetes.Interface {
    kubeconfig := os.Getenv("KUBECONFIG")

    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    magicconch.Must(err)

    clientset, err := kubernetes.NewForConfig(config)
    magicconch.Must(err)

    return clientset
}




// newConfigMapsListerWatcher 用于创建 tmp namespace 下 configmaps 资源的 ListerWatcher 实例
func newConfigMapsListerWatcher() cache.ListerWatcher {
    clientset := mustClientset()              // 创建clientset
    client := clientset.CoreV1().RESTClient() // 客户端,请求器
    resource := "configmaps"                  // GET 请求参数之一
    namespace := "tmp"                        // GET 请求参数之一
    selector := fields.Everything()           // GET 请求参数之一
    lw := cache.NewListWatchFromClient(client, resource, namespace, selector)
    return lw
}

func main() {
    //clientset := mustClientset()
    //
    //namespaces, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
    //magicconch.Must(err)
    //
    //for _, namespace := range namespaces.Items {
    //  fmt.Println(namespace.Name)
    //}

    fmt.Println("----- 1-list-watcher -----")

    lw := newConfigMapsListerWatcher()

    // list 的类型为 runtime.Object, 需要经过反射或类型转换才能使用,
    // 传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector
    list, err := lw.List(metav1.ListOptions{})
    magicconch.Must(err)

    // meta 包封装了一些处理 runtime.Object 对象的方法,屏蔽了反射和类型转换的过程,
    // 提取出的 items 类型为 []runtime.Object
    items, err := meta.ExtractList(list)
    magicconch.Must(err)

    fmt.Println("Initial list:")

    for _, item := range items {
        configMap, ok := item.(*corev1.ConfigMap)
        if !ok {
            return
        }
        fmt.Println(configMap.Name)

        // 如果只关注 meta 信息,也可以不进行类型转换,而是使用 meta.Accessor 方法
        // accessor, err := meta.Accessor(item)
        // magicconch.Must(err)
        // fmt.Println(accessor.GetName())
    }

    listMetaInterface, err := meta.ListAccessor(list)
    magicconch.Must(err)

    // resourceVersion 在同步过程中非常重要,看下面它在 Watch 接口中的使用
    resourceVersion := listMetaInterface.GetResourceVersion()

    // w 的类型为 watch.Interface,提供 ResultChan 方法读取事件,
    // 和 List 一样,传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector,
    // ResourceVersion 是 Watch 时非常重要的参数,
    // 它代表一次客户端与服务器进行交互时对应的资源版本,
    // 结合另一个参数 ResourceVersionMatch,表示本次请求对 ResourceVersion 的筛选,
    // 比如以下请求表示:获取版本新于 resourceVersion 的事件。
    // 在考虑连接中断和定期重新同步 (resync) 的情况下,
    // 对 ResourceVersion 的管理就变得更为复杂,我们先不考虑这些情况。
    w, err := lw.Watch(metav1.ListOptions{
        ResourceVersion: resourceVersion,
    })
    magicconch.Must(err)

    stopCh := make(chan os.Signal)
    signal.Notify(stopCh, os.Interrupt)

    fmt.Println("Start watching...")

loop:
    for {
        select {
        case <-stopCh:
            fmt.Println("Interrupted")
            break loop
        case event, ok := <-w.ResultChan():
            if !ok {
                fmt.Println("Broken channel")
                break loop
            }
            configMap, ok := event.Object.(*corev1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("%s: %s\n", event.Type, configMap.Name)
        }
    }
}

运行输出

----- 1-list-watcher -----
Initial list:
demo
demo1
Start watching...
DELETED: demo
ADDED: demo

说明

1、创建结构体

// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
    optionsModifier := func(options *metav1.ListOptions) {
        options.FieldSelector = fieldSelector.String()
    }
    return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
    listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
        optionsModifier(&options)
        return c.Get().
            Namespace(namespace).
            Resource(resource).
            VersionedParams(&options, metav1.ParameterCodec).
            Do(context.TODO()).
            Get()
    }
    watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
        options.Watch = true
        optionsModifier(&options)
        return c.Get().
            Namespace(namespace).
            Resource(resource).
            VersionedParams(&options, metav1.ParameterCodec).
            Watch(context.TODO())
    }
    return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

获得带有listFunc和watchFunc函数的ListWatch结构体。

2、list相关资源

调用ListWatch结构体的list方法,其实就是调用刚刚创建结构体中的listFunc方法,通过client获取对应namespace的resource,关于client的get方法我们在之前已经说过。

3、获取resourceVersion

它代表一次客户端与服务器进行交互时对应的资源版本,对于下面的watch有着很重要的作用。

4、watch相关资源

和list一样就是调用了结构体ListWatch中的watchFunc函数,这个函数最后调用了

// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
    // We specifically don't want to rate limit watches, so we
    // don't use r.rateLimiter here.
    if r.err != nil {
        return nil, r.err
    }

    client := r.c.Client
    if client == nil {
        client = http.DefaultClient
    }

    isErrRetryableFunc := func(request *http.Request, err error) bool {
        // The watch stream mechanism handles many common partial data errors, so closed
        // connections can be retried in many cases.
        if net.IsProbableEOF(err) || net.IsTimeout(err) {
            return true
        }
        return false
    }
    var retryAfter *RetryAfter
    url := r.URL().String()
    for {
        req, err := r.newHTTPRequest(ctx)
        if err != nil {
            return nil, err
        }

        r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
        if retryAfter != nil {
            // We are retrying the request that we already send to apiserver
            // at least once before.
            // This request should also be throttled with the client-internal rate limiter.
            if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
                return nil, err
            }
            retryAfter = nil
        }

        resp, err := client.Do(req)
        updateURLMetrics(ctx, r, resp, err)
        if r.c.base != nil {
            if err != nil {
                r.backoff.UpdateBackoff(r.c.base, err, 0)
            } else {
                r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
            }
        }
        if err == nil && resp.StatusCode == http.StatusOK {
            return r.newStreamWatcher(resp)
        }

        done, transformErr := func() (bool, error) {
            defer readAndCloseResponseBody(resp)

            var retry bool
            retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
            if retry {
                err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
                if err == nil {
                    return false, nil
                }
                klog.V(4).Infof("Could not retry request - %v", err)
            }

            if resp == nil {
                // the server must have sent us an error in 'err'
                return true, nil
            }
            if result := r.transformResponse(resp, req); result.err != nil {
                return true, result.err
            }
            return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
        }()
        if done {
            if isErrRetryableFunc(req, err) {
                return watch.NewEmptyWatch(), nil
            }
            if err == nil {
                // if the server sent us an HTTP Response object,
                // we need to return the error object from that.
                err = transformErr
            }
            return nil, err
        }
    }
}

返回一个watch.Interface类型的接口,其实就是一个事件接口来接收所有的事件。

// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
    // Stops watching. Will close the channel returned by ResultChan(). Releases
    // any resources used by the watch.
    Stop()

    // Returns a chan which will receive all the events. If an error occurs
    // or Stop() is called, the implementation will close this channel and
    // release any resources used by the watch.
    ResultChan() <-chan Event
}

然后就是for循环不断重ResultChan读取event。

在上面代码使用的过程有以下两个问题

  • 以上对事件响应是同步的,如果执行复杂的操作会引起阻塞,需要引入队列。
  • 以上代码缺少重新连接和重新同步机制,有可能出现数据不一致问题。

Store

Store 是缓存用的对象存储,包括对对象进行增删改查的基本方法,我们可以通过这个接口定义来看

type Store interface {

    // Add adds the given object to the accumulator associated with the given object's key
    Add(obj interface{}) error

    // Update updates the given object in the accumulator associated with the given object's key
    Update(obj interface{}) error

    // Delete deletes the given object from the accumulator associated with the given object's key
    Delete(obj interface{}) error

    // List returns a list of all the currently non-empty accumulators
    List() []interface{}

    // ListKeys returns a list of all the keys currently associated with non-empty accumulators
    ListKeys() []string

    // Get returns the accumulator associated with the given object's key
    Get(obj interface{}) (item interface{}, exists bool, err error)

    // GetByKey returns the accumulator associated with the given key
    GetByKey(key string) (item interface{}, exists bool, err error)

    // Replace will delete the contents of the store, using instead the
    // given list. Store takes ownership of the list, you should not reference
    // it after calling this function.
    Replace([]interface{}, string) error

    // Resync is meaningless in the terms appearing here but has
    // meaning in some implementations that have non-trivial
    // additional behavior (e.g., DeltaFIFO).
    Resync() error
}

在对应结构体定义的文件中,有一个结构体实现了这个接口

type cache struct {
    // cacheStorage bears the burden of thread safety for the cache
    cacheStorage ThreadSafeStore
    // keyFunc is used to make the key for objects stored in and retrieved from items, and
    // should be deterministic.
    keyFunc KeyFunc
}

我们可以cache是有两个成员组成的,其中ThreadSafeStore接口是真正实现了增删改查的接口。

type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
    // Resync is a no-op and is deprecated
    Resync() error
}

也有一个实现的结构体

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

可以看出存储在items中,增删改查也是对这些进行操作。另一个KeyFunc就是获取key的方法。

我们可以通过NewStore创建上面的结构体

Queue

Queue 扩展自 Store,在 Store 的基础上增加了 Pop 等几个方法,用于实现先进先出 (FIFO) 队列。

type Queue interface {
    Store

    // Pop blocks until there is at least one key to process or the
    // Queue is closed.  In the latter case Pop returns with an error.
    // In the former case Pop atomically picks one key to process,
    // removes that (key, accumulator) association from the Store, and
    // processes the accumulator.  Pop returns the accumulator that
    // was processed and the result of processing.  The PopProcessFunc
    // may return an ErrRequeue{inner} and in this case Pop will (a)
    // return that (key, accumulator) association to the Queue as part
    // of the atomic processing and (b) return the inner error from
    // Pop.
    Pop(PopProcessFunc) (interface{}, error)

    // AddIfNotPresent puts the given accumulator into the Queue (in
    // association with the accumulator's key) if and only if that key
    // is not already associated with a non-empty accumulator.
    AddIfNotPresent(interface{}) error

    // HasSynced returns true if the first batch of keys have all been
    // popped.  The first batch of keys are those of the first Replace
    // operation if that happened before any Add, AddIfNotPresent,
    // Update, or Delete; otherwise the first batch is empty.
    HasSynced() bool

    // Close the queue
    Close()
}

可见是基于Store接口之上,创建的时候参数包含了之前创建的Store,新增了Pop,AddIfNotPresent,HasSynced,Close方法。

在fifo文件中,多个结构体实现了这个queue,最常用的就是DeltaFIFO

type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // `items` maps a key to a Deltas.
    // Each such Deltas has at least one Delta.
    items map[string]Deltas

    // `queue` maintains FIFO order of keys for consumption in Pop().
    // There are no duplicates in `queue`.
    // A key is in `queue` if and only if it is in `items`.
    queue []string

    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update/AddIfNotPresent was called first.
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    keyFunc KeyFunc

    // knownObjects list keys that are "known" --- affecting Delete(),
    // Replace(), and Resync()
    knownObjects KeyListerGetter

    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRED operations.
    closed bool

    // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
    // DeltaType when Replace() is called (to preserve backwards compat).
    emitDeltaTypeReplaced bool
}

Reflector

Reflector 被设计来解决listwatcher中提到的两个问题。我们直接重实例使用开始看起

package main

import (
    "fmt"
    "github.com/spongeprojects/magicconch"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/tools/cache"
    "time"
)

// newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储
func newStore() cache.Store {
    return cache.NewStore(cache.MetaNamespaceKeyFunc)
}

// newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列,
// 注意在初始化时 store 作为 KnownObjects 参数传入其中,
// 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态,
// 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。
// 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。
func newQueue(store cache.Store) cache.Queue {
    return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
        KnownObjects:          store,
        EmitDeltaTypeReplaced: true,
    })
}

// newConfigMapsReflector 用于创建一个 cache.Reflector 对象,
// 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。
func newConfigMapsReflector(queue cache.Queue) *cache.Reflector {
    lw := newConfigMapsListerWatcher() // 前面有说明
    // 第 2 个参数是 expectedType, 用此参数限制进入队列的事件,
    // 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用;
    // 第 4 个参数是 resyncPeriod,
    // 这里传了 0,表示从不重新同步(除非连接超时或者中断),
    // 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致,
    // 重新同步过程中会产生 SYNC 类型的事件。
    return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0)
}

func main() {
    fmt.Println("----- 2-reflector -----")

    store := newStore()
    queue := newQueue(store)
    reflector := newConfigMapsReflector(queue)

    stopCh := make(chan struct{})
    defer close(stopCh)

    // reflector 开始运行后,队列中就会推入新收到的事件
    go reflector.Run(stopCh)

    // 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作,
    // 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。
    processObj := func(obj interface{}) error {
        // 最先收到的事件会被最先处理
        for _, d := range obj.(cache.Deltas) {
            switch d.Type {
            case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
                if _, exists, err := store.Get(d.Object); err == nil && exists {
                    if err := store.Update(d.Object); err != nil {
                        return err
                    }
                } else {
                    if err := store.Add(d.Object); err != nil {
                        return err
                    }
                }
            case cache.Deleted:
                if err := store.Delete(d.Object); err != nil {
                    return err
                }
            }
            configMap, ok := d.Object.(*corev1.ConfigMap)
            if !ok {
                return fmt.Errorf("not config: %T", d.Object)
            }
            fmt.Printf("%s: %s\n", d.Type, configMap.Name)
        }
        return nil
    }

    fmt.Println("Start syncing...")

    // 持续运行直到 stopCh 关闭
    wait.Until(func() {
        for {
            _, err := queue.Pop(processObj)
            magicconch.Must(err)
        }
    }, time.Second, stopCh)
}

运行输出

----- 2-reflector -----
Start syncing...
Replaced: demo1
Replaced: demo
Deleted: demo
Added: demo

说明

1、创建Store结构体存储,具体在上面的Store已经说明。

2、创建Queue结构体,实际上就是创建DeltaFIFO对象,具体上上面的Queue也已经具体说明。

3、使用NewReflector函数创建Reflector结构体。

// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string

    // The name of the type we expect to place in the store. The name
    // will be the stringification of expectedGVK if provided, and the
    // stringification of expectedType otherwise. It is for display
    // only, and should not be used for parsing or comparison.
    expectedTypeName string
    // An example object of the type we expect to place in the store.
    // Only the type needs to be right, except that when that is
    // `unstructured.Unstructured` the object's `"apiVersion"` and
    // `"kind"` must also be right.
    expectedType reflect.Type
    // The GVK of the object we expect to place in the store if unstructured.
    expectedGVK *schema.GroupVersionKind
    // The destination to sync up with the watch source
    store Store
    // listerWatcher is used to perform lists and watches.
    listerWatcher ListerWatcher

    // backoff manages backoff of ListWatch
    backoffManager wait.BackoffManager
    // initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
    initConnBackoffManager wait.BackoffManager

    resyncPeriod time.Duration
    // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
    ShouldResync func() bool
    // clock allows tests to manipulate time
    clock clock.Clock
    // paginatedResult defines whether pagination should be forced for list calls.
    // It is set based on the result of the initial list call.
    paginatedResult bool
    // lastSyncResourceVersion is the resource version token last
    // observed when doing a sync with the underlying store
    // it is thread safe, but not synchronized with the underlying store
    lastSyncResourceVersion string
    // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
    // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
    isLastSyncResourceVersionUnavailable bool
    // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
    // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
    // it will turn off pagination to allow serving them from watch cache.
    // NOTE: It should be used carefully as paginated lists are always served directly from
    // etcd, which is significantly less efficient and may lead to serious performance and
    // scalability problems.
    WatchListPageSize int64
    // Called whenever the ListAndWatch drops the connection with an error.
    watchErrorHandler WatchErrorHandler
}

创建的时候需要上面的listwatch和Queue结构体做为参数。

4、调用Reflector的Run函数,最后会调用到Reflector的ListAndWatch方法进行监听获取资源。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    //list
    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                //获取pod列表
                return r.listerWatcher.List(opts)
            }))
            switch {
            case r.WatchListPageSize != 0:
                pager.PageSize = r.WatchListPageSize
            case r.paginatedResult:
                // We got a paginated result initially. Assume this resource and server honor
                // paging requests (i.e. watch cache is probably disabled) and leave the default
                // pager size set.
            case options.ResourceVersion != "" && options.ResourceVersion != "0":
                // User didn't explicitly request pagination.
                //
                // With ResourceVersion != "", we have a possibility to list from watch cache,
                // but we do that (for ResourceVersion != "0") only if Limit is unset.
                // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                // switch off pagination to force listing from watch cache (if enabled).
                // With the existing semantic of RV (result is at least as fresh as provided RV),
                // this is correct and doesn't lead to going back in time.
                //
                // We also don't turn off pagination for ResourceVersion="0", since watch cache
                // is ignoring Limit in that case anyway, and if watch cache is not enabled
                // we don't introduce regression.
                pager.PageSize = 0
            }

            list, paginatedResult, err = pager.List(context.Background(), options)
            if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                r.setIsLastSyncResourceVersionUnavailable(true)
                // Retry immediately if the resource version used to list is unavailable.
                // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                // continuation pages, but the pager might not be enabled, the full list might fail because the
                // resource version it is listing at is expired or the cache may not yet be synced to the provided
                // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                // the reflector makes forward progress.
                list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
            }
            close(listCh)
        }()
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
        }

        // We check if the list was paginated and if so set the paginatedResult based on that.
        // However, we want to do that only for the initial list (which is the only case
        // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
        // situations we may force listing directly from etcd (by setting ResourceVersion="")
        // which will return paginated result, even if watch cache is enabled. However, in
        // that case, we still want to prefer sending requests to watch cache if possible.
        //
        // Paginated result returned for request with ResourceVersion="0" mean that watch
        // cache is disabled and there are a lot of objects of a given type. In such case,
        // there is no need to prefer listing from watch cache.
        if options.ResourceVersion == "0" && paginatedResult {
            r.paginatedResult = true
        }

        r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v: %v", list, err)
        }

        //获取resourceVersion
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        //将list转换成资源对象列表
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
        }
        initTrace.Step("Objects extracted")
        //将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("unable to sync list result: %v", err)
        }
        initTrace.Step("SyncWith done")
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

        // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
        start := r.clock.Now()

        //调用clientset客户端api与apiServer建立长连接,监控指定资源的变更
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case begin exponentially backing off and resend watch request.
            // Do the same for "429" errors.
            if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
                <-r.initConnBackoffManager.Backoff().C()
                continue
            }
            return err
        }

        ////处理资源的变更事件
        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                    // has a semantic that it returns data at least as fresh as provided RV.
                    // So first try to LIST with setting RV to resource version of last observed object.
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                case apierrors.IsTooManyRequests(err):
                    klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
                    <-r.initConnBackoffManager.Backoff().C()
                    continue
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

我们可以看出ListAndWatch代码会分为两部分,一部分是List,一部分是Watch。

list核心是

  • 调用listerWatcher.List方法,获取资源下的所有对象的数据,这个方法会通过api调用到apiServer获取资源列表;
  • 调用listMetaInterface.GetResourceVersion获取资源版本号;
  • 调用meta.ExtractList方法将资源数据转换成资源对象列表;
  • 将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中;
  • 最后调用setLastSyncResourceVersion方法更新资源版本号;

前几个部分是不是很熟悉,就是我们在讲解listwatcher的核心步骤,这边就是对哪些使用的基本封装。

再来看watch核心

  • 循环调用clientset客户端api与apiServer建立长连接,监控指定资源的变更,如果监控到有资源变更,那么会调用watchHandler处理资源的变更事件。

我们来看看watchHandler如何处理事件?

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        //获取事件
        case event, ok := <-w.ResultChan():
            //错误处理
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrors.FromObject(event.Object)
            }
            if r.expectedType != nil {
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            if r.expectedGVK != nil {
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            //获取esourceVersion
            newResourceVersion := meta.GetResourceVersion()
            //根据事件类型做处理
            switch event.Type {
            //将添加资源事件添加到DeltaFIFO队列中
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            //将更新资源事件添加到DeltaFIFO队列中
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            //将删除资源事件添加到DeltaFIFO队列中
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            if rvu, ok := r.store.(ResourceVersionUpdater); ok {
                rvu.UpdateResourceVersion(newResourceVersion)
            }
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}

watchHandler方法会根据传入的资源类型调用不同的方法转换成不同的Delta然后存入到DeltaFIFO队列中。避免了阻塞问题,但对队列的消费仍然是同步的,需要再实现多 worker 才能提高效率,这一部分实现不包含在 cache 包中

Reflector 是保证 Informer 可靠性的核心组件,在丢失事件,收到异常事件,处理事件失败等多种异常情况下需要考虑的细节很多,感兴趣可以深入阅读源码的实现细节。

Controller

Controller对上面的代码进行了封装,我们也通过实例来看一下,一些创建函数沿用上面的实例。

package main

import (
    "fmt"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/tools/cache"
)

func newController() cache.Controller {
    fmt.Println("----- 3-controller -----")

    lw := newConfigMapsListerWatcher()
    store := newStore()
    queue := newQueue(store)
    cfg := &cache.Config{
        Queue:            queue,
        ListerWatcher:    lw,
        ObjectType:       &corev1.ConfigMap{},
        FullResyncPeriod: 0,
        RetryOnError:     false,
        Process: func(obj interface{}) error {
            for _, d := range obj.(cache.Deltas) {
                switch d.Type {
                case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
                    if _, exists, err := store.Get(d.Object); err == nil && exists {
                        if err := store.Update(d.Object); err != nil {
                            return err
                        }
                    } else {
                        if err := store.Add(d.Object); err != nil {
                            return err
                        }
                    }
                case cache.Deleted:
                    if err := store.Delete(d.Object); err != nil {
                        return err
                    }
                }
                configMap, ok := d.Object.(*corev1.ConfigMap)
                if !ok {
                    return fmt.Errorf("not config: %T", d.Object)
                }
                fmt.Printf("%s: %s\n", d.Type, configMap.Name)
            }
            return nil
        },
    }
    return cache.New(cfg)
}

func main() {
    controller := newController()

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go controller.Run(stopCh)

    <-stopCh
}

运行结果

说明

1、创建了Config结构体。

// Config contains all the settings for one of these low-level controllers.
type Config struct {
    // The queue for your objects - has to be a DeltaFIFO due to
    // assumptions in the implementation. Your Process() function
    // should accept the output of this Queue's Pop() method.
    Queue

    // Something that can list and watch your objects.
    ListerWatcher

    // Something that can process a popped Deltas.
    Process ProcessFunc

    // ObjectType is an example object of the type this controller is
    // expected to handle.  Only the type needs to be right, except
    // that when that is `unstructured.Unstructured` the object's
    // `"apiVersion"` and `"kind"` must also be right.
    ObjectType runtime.Object

    // FullResyncPeriod is the period at which ShouldResync is considered.
    FullResyncPeriod time.Duration

    // ShouldResync is periodically used by the reflector to determine
    // whether to Resync the Queue. If ShouldResync is `nil` or
    // returns true, it means the reflector should proceed with the
    // resync.
    ShouldResync ShouldResyncFunc

    // If true, when Process() returns an error, re-enqueue the object.
    // TODO: add interface to let you inject a delay/backoff or drop
    //       the object completely if desired. Pass the object in
    //       question to this interface as a parameter.  This is probably moot
    //       now that this functionality appears at a higher level.
    RetryOnError bool

    // Called whenever the ListAndWatch drops the connection with an error.
    WatchErrorHandler WatchErrorHandler

    // WatchListPageSize is the requested chunk size of initial and relist watch lists.
    WatchListPageSize int64
}

可以看到基本成员就是我们之前创建的Queue,ListerWatcher等参数。

2、基于config调用cache的New函数创建Controller接口。

type Controller interface {
    // Run does two things.  One is to construct and run a Reflector
    // to pump objects/notifications from the Config's ListerWatcher
    // to the Config's Queue and possibly invoke the occasional Resync
    // on that Queue.  The other is to repeatedly Pop from the Queue
    // and process with the Config's ProcessFunc.  Both of these
    // continue until `stopCh` is closed.
    Run(stopCh <-chan struct{})

    // HasSynced delegates to the Config's Queue
    HasSynced() bool

    // LastSyncResourceVersion delegates to the Reflector when there
    // is one, otherwise returns the empty string
    LastSyncResourceVersion() string
}

我们来看New函数,很简单,就是根据config创建了一个实现Controller接口的结构体controller。

func New(c *Config) Controller {
    ctlr := &controller{
        config: *c,
        clock:  &clock.RealClock{},
    }
    return ctlr
}

type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}

3、然后调用controller的Run函数。

func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.WatchListPageSize = c.config.WatchListPageSize
    r.clock = c.clock
    if c.config.WatchErrorHandler != nil {
        r.watchErrorHandler = c.config.WatchErrorHandler
    }

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group

    wg.StartWithChannel(stopCh, r.Run)

    wait.Until(c.processLoop, time.Second, stopCh)
    wg.Wait()
}

这个函数有没有很熟悉,创建Reflector,然后调用对应的Run函数,是不是就是我们在Reflector中解析的步骤,所以说Controller只是对Reflector进行了封装。

Informer

Informer指的是一类专门用于实现 Informer 机制的 Controller,其实也是对controller的封装,主要封装了转化的逻辑(obj 由 cache.Deltas 变成了 *corev1.ConfigMap)。

我们还是看实例,关于Informer的实例我们还是经常看见的,不过正常都是SharedInformer。

package main

import (
    "fmt"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/tools/cache"
)

func main() {
    fmt.Println("----- 4-informer -----")

    lw := newConfigMapsListerWatcher()
    // 第一个返回的参数是 cache.Store,这里暂时用不到所以直接丢弃
    _, controller := cache.NewInformer(lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("created: %s\n", configMap.Name)
        },
        UpdateFunc: func(old, new interface{}) {
            configMap, ok := old.(*corev1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("updated: %s\n", configMap.Name)
        },
        DeleteFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("deleted: %s\n", configMap.Name)
        },
    })

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go controller.Run(stopCh)

    <-stopCh
}

运行结果

----- 4-informer -----
Start syncing....
created: demo
created: demo1
deleted: demo
created: demo

说明

1、创建Indexer, Controller两个接口

func NewInformer(
    lw ListerWatcher,
    objType runtime.Object,
    resyncPeriod time.Duration,
    h ResourceEventHandler,
) (Store, Controller) {
    // This will hold the client state, as we know it.
    clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

    return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}

我们可以看到NewInformer和controller在一个文件中,最后返回的都是Controller接口,所以Informer是一种controller。

func newInformer(
    lw ListerWatcher,
    objType runtime.Object,
    resyncPeriod time.Duration,
    h ResourceEventHandler,
    clientState Store,
) Controller {
    // This will hold incoming changes. Note how we pass clientState in as a
    // KeyLister, that way resync operations will result in the correct set
    // of update/delete deltas.
    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KnownObjects:          clientState,
        EmitDeltaTypeReplaced: true,
    })

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    lw,
        ObjectType:       objType,
        FullResyncPeriod: resyncPeriod,
        RetryOnError:     false,

        Process: func(obj interface{}) error {
            // from oldest to newest
            for _, d := range obj.(Deltas) {
                switch d.Type {
                case Sync, Replaced, Added, Updated:
                    if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                        if err := clientState.Update(d.Object); err != nil {
                            return err
                        }
                        h.OnUpdate(old, d.Object)
                    } else {
                        if err := clientState.Add(d.Object); err != nil {
                            return err
                        }
                        h.OnAdd(d.Object)
                    }
                case Deleted:
                    if err := clientState.Delete(d.Object); err != nil {
                        return err
                    }
                    h.OnDelete(d.Object)
                }
            }
            return nil
        },
    }
    return New(cfg)
}

最后还是调用controller的New函数创建了一个controller,这个过程其实就是上面我们创建controller的过程,informer直接对其进行了封装转化,我们只要关心handler就可以了。

2、调用controller的Run函数就和上面一样的逻辑了。

SharedInformer

在上面的 Informer 设计当中,处理程序作为一个参数传入 NewInformer,如果有另一个处理程序需要处理相同资源,需要另外创建一个 Informer 对象,而队列是不能复用的,因为队列不支持两个消费者同时消费,为了解决这个问题,cache 包中又设计了 SharedInformer,顾名思义,就是多个处理程序共享的 Informer

package main

import (
    "fmt"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/tools/cache"
)

func main() {
    fmt.Println("----- 5-shared-informer -----")

    lw := newConfigMapsListerWatcher()
    sharedInformer := cache.NewSharedInformer(lw, &corev1.ConfigMap{}, 0)
    // 添加一个处理程序
    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("created, printing namespace: %s\n", configMap.Namespace)
        },
    })
    // 添加另一个处理程序
    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("created, printing name: %s\n", configMap.Name)
        },
    })

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go sharedInformer.Run(stopCh)

    <-stopCh
}

这就是一个我们最常见的informer的使用了,运行结果

----- 5-shared-informer -----
Start syncing....
created, printing namespace: tmp
created, printing namespace: tmp
created, printing name: demo
created, printing name: demo1

说明

1、创建SharedInformer接口

type SharedInformer interface {
    // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
    // period.  Events to a single handler are delivered sequentially, but there is no coordination
    // between different handlers.
    AddEventHandler(handler ResourceEventHandler)
    // AddEventHandlerWithResyncPeriod adds an event handler to the
    // shared informer with the requested resync period; zero means
    // this handler does not care about resyncs.  The resync operation
    // consists of delivering to the handler an update notification
    // for every object in the informer's local cache; it does not add
    // any interactions with the authoritative storage.  Some
    // informers do no resyncs at all, not even for handlers added
    // with a non-zero resyncPeriod.  For an informer that does
    // resyncs, and for each handler that requests resyncs, that
    // informer develops a nominal resync period that is no shorter
    // than the requested period but may be longer.  The actual time
    // between any two resyncs may be longer than the nominal period
    // because the implementation takes time to do work and there may
    // be competing load and scheduling noise.
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    // GetStore returns the informer's local cache as a Store.
    GetStore() Store
    // GetController is deprecated, it does nothing useful
    GetController() Controller
    // Run starts and runs the shared informer, returning after it stops.
    // The informer will be stopped when stopCh is closed.
    Run(stopCh <-chan struct{})
    // HasSynced returns true if the shared informer's store has been
    // informed by at least one full LIST of the authoritative state
    // of the informer's object collection.  This is unrelated to "resync".
    HasSynced() bool
    // LastSyncResourceVersion is the resource version observed when last synced with the underlying
    // store. The value returned is not synchronized with access to the underlying store and is not
    // thread-safe.
    LastSyncResourceVersion() string

    // The WatchErrorHandler is called whenever ListAndWatch drops the
    // connection with an error. After calling this handler, the informer
    // will backoff and retry.
    //
    // The default implementation looks at the error type and tries to log
    // the error message at an appropriate level.
    //
    // There's only one handler, so if you call this multiple times, last one
    // wins; calling after the informer has been started returns an error.
    //
    // The handler is intended for visibility, not to e.g. pause the consumers.
    // The handler should return quickly - any expensive processing should be
    // offloaded.
    SetWatchErrorHandler(handler WatchErrorHandler) error
}

使用NewSharedInformer创建

// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
    return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

创建了一个实现SharedIndexInformer接口的结构体sharedIndexInformer。

type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller

    processor             *sharedProcessor
    cacheMutationDetector MutationDetector

    listerWatcher ListerWatcher

    // objectType is an example object of the type this informer is
    // expected to handle.  Only the type needs to be right, except
    // that when that is `unstructured.Unstructured` the object's
    // `"apiVersion"` and `"kind"` must also be right.
    objectType runtime.Object

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    resyncCheckPeriod time.Duration
    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration
    // clock allows for testability
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    blockDeltas sync.Mutex

    // Called whenever the ListAndWatch drops the connection with an error.
    watchErrorHandler WatchErrorHandler
}

我们再来看一下接口SharedIndexInformer,是基于SharedInformer接口之上新增了Index。

type SharedIndexInformer interface {
    SharedInformer
    // AddIndexers add indexers to the informer before it starts.
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer
}

2、然后调用了sharedIndexInformer的AddEventHandler函数来新增处理程序,所有事件会被广播给每一个注册的处理程序。

3、最后调用sharedIndexInformer的Run函数

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash()

fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,
})

cfg := &Config{
    Queue:            fifo,
    ListerWatcher:    s.listerWatcher,
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,

    Process:           s.HandleDeltas,
    WatchErrorHandler: s.watchErrorHandler,
}

func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    s.controller = New(cfg)
    s.controller.(*controller).clock = s.clock
    s.started = true
}()

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait()              // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
    s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)

}

也就是上面的controller的Run的封装。

SharedIndexInformer

在前面使用的 Reflector, Informer 和 SharedInformer 中,在实现事件响应的同时,我们还维护了一份当前最新的资源状态,就是代码中用到的 Store。在 Reflector 演示代码中 Store 由我们自己构建并传入 NewReflector,在 Informer 演示代码中 Store 对象由 NewInformer 返回,在 SharedInformer 演示代码中 Store 对象也可以通过 GetStore 方法获取。 这份最新的资源状态是非常有用的,在很多控制逻辑中,除了事件本身涉及的资源外,还需要关心其他资源的状态,就需要用到 Store 对象。 但是,Store 对象只实现了基本的增删改查功能,就 “查” 而言,只实现了 Get, GetByKey, List, ListKeys 4 个方法,对象的 “key” 是用构建 Store 时指定的 KeyFunc 计算得出的,绝大多数情况下使用的都是 cache.MetaNamespaceKeyFunc 这个方法

这意味着我们可以

  • 根据 namespace 加 name 查找某一个资源;
  • 列出所有资源。

那么问题来了,如果我想

  • 根据 MetaNamespace 之外的某个字段查找资源,比如 label, annotation, status 等等;
  • 根据条件列出部分资源,比如某个 namespace 下的资源,就没有办法通过 Store 做到,只能用 ListerWatcher 直接调用接口,没有本地缓存。

Indexer 就是设计来解决这个问题,功能如其名,就是用来建立索引,我们还是通过实例。

package main

import (
    "fmt"
    "github.com/spongeprojects/magicconch"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/tools/cache"
)

func main() {
    fmt.Println("----- 6-indexer -----")

    lw := newConfigMapsListerWatcher()
    indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
    // 仅演示用,只关心 indexer,不处理事件,所以传一个空的 HandlerFunc,
    // 实际使用中一般不会这样做
    indexer, informer := cache.NewIndexerInformer(
        lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{}, indexers)

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go informer.Run(stopCh)

    // 在 informer 首次同步完成后再操作
    if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
        panic("timed out waiting for caches to sync")
    }

    // 获取 cache.NamespaceIndex 索引下,索引值为 "tmp" 中的所有键
    keys, err := indexer.IndexKeys(cache.NamespaceIndex, "tmp")
    magicconch.Must(err)
    for _, k := range keys {
        fmt.Println(k)
    }
}

创建SharedIndexInformer的接口我们上面说过了,这边的运行情况

----- 6-indexer -----
Start syncing....
tmp/demo
tmp/demo1

说明

1、cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} 是 informers 包中默认的索引,通过这个索引可以根据 namespace 列出资源

informers

在上一部分的cache包中,我们知道有五个创建informer的函数

  • New
  • NewInformer
  • NewIndexerInformer
  • NewSharedInformer
  • NewSharedIndexInformer

它们有着不同程度的抽象和封装,NewSharedIndexInformer 是其中抽象程度最低,封装程度最高的一个,但即使是 NewSharedIndexInformer,也没有封装具体的资源类型,需要接收 ListerWatcher 和 Indexers 作为参数,我们需要先构建 ListWatcher 和 Indexers。

这样的封装明显还不够好,informers 包采用了工厂模式:为每种内置的资源类型创建对应的 Informer 工厂类,要使用某种资源 Informer 的时候直接使用工厂类构建,其实着就是我们最常用的方式,详细的讲解我们在上一篇k8s-informer使用已经说过。

应用

其实informer在k8s中也是经常使用的,几乎每个组件中用到list-watch机制,我们最常使用的就是开发crd的控制器的时候使用,当然这块代码都是自动生成的,但是我们也要知道其作用和原理,详细说明看k8s控制器这篇文章。